Apache Doris 代碼倉庫地址:apache/incubator-doris 歡迎大家關(guān)注加星
1.概要
Routine load 功能為用戶提供了一種自動(dòng)從指定數(shù)據(jù)源進(jìn)行數(shù)據(jù)導(dǎo)入的功能扒披。
Routine Load 是支持用戶提交一個(gè)常駐的導(dǎo)入任務(wù),通過不斷的從指定的數(shù)據(jù)源讀取數(shù)據(jù),將數(shù)據(jù)導(dǎo)入到 Doris 中。目前僅支持通過無認(rèn)證或者 SSL 認(rèn)證方式,從 Kakfa 導(dǎo)入的數(shù)據(jù)偎行。
Routine load是一種同步的數(shù)據(jù)導(dǎo)入方式。
Routine load 支持導(dǎo)入的數(shù)據(jù)類型: 文本 和 JSON兩種格式
2. 原理
FE 通過 JobScheduler 將一個(gè)導(dǎo)入作業(yè)拆分成若干個(gè) Task(一般是和Kafka的Partition數(shù)量一致)。每個(gè) Task 負(fù)責(zé)導(dǎo)入指定的一部分?jǐn)?shù)據(jù)烹俗。Task 被 TaskScheduler 分配到指定的 BE 上執(zhí)行爆侣。
在 BE 上,一個(gè) Task 被視為一個(gè)普通的導(dǎo)入任務(wù)幢妄,通過 Stream Load 的導(dǎo)入機(jī)制進(jìn)行導(dǎo)入兔仰。導(dǎo)入完成后,向 FE 匯報(bào)蕉鸳。
FE 中的 JobScheduler 根據(jù)匯報(bào)結(jié)果斋陪,繼續(xù)生成后續(xù)新的 Task,或者對(duì)失敗的 Task 進(jìn)行重試置吓。
整個(gè)例行導(dǎo)入作業(yè)通過不斷的產(chǎn)生新的 Task无虚,來完成數(shù)據(jù)不間斷的導(dǎo)入
3. 使用方式
3.1 使用限制
- 支持無認(rèn)證的 Kafka 訪問,以及通過 SSL 方式認(rèn)證的 Kafka 集群衍锚。
- 支持的消息格式為 csv, json 文本格式友题。csv 每一個(gè) message 為一行,且行尾不包含換行符戴质。
- 僅支持 Kafka 0.10.0.0(含) 以上版本
3.2 Routine Load SQL語法
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
3.2.1 Routine load 作業(yè)參數(shù)說明
-
[db.]job_name
導(dǎo)入作業(yè)的名稱度宦,在同一個(gè) database 內(nèi),相同名稱只能有一個(gè) job 在運(yùn)行告匠。
-
tbl_name
指定需要導(dǎo)入的表的名稱戈抄。
merge_type
數(shù)據(jù)的合并類型,一共支持三種類型APPEND后专、DELETE划鸽、MERGE 其中,APPEND是默認(rèn)值戚哎,表示這批數(shù)據(jù)全部需要追加到現(xiàn)有數(shù)據(jù)中裸诽,DELETE 表示刪除與這批數(shù)據(jù)key相同的所有行,MERGE 語義 需要與delete on條件聯(lián)合使用型凳,表示滿足delete 條件的數(shù)據(jù)按照DELETE 語義處理其余的按照APPEND 語義處理, 語法為[WITH MERGE|APPEND|DELETE]
3.2.2 load_properties參數(shù)說明
這部分參數(shù)用于描述導(dǎo)入數(shù)據(jù)丈冬。語法:
[column_separator],
[columns_mapping],
[where_predicates],
[delete_on_predicates],
[source_sequence],
[partitions],
[preceding_predicates]
- column_separator:
指定列分隔符,如:
COLUMNS TERMINATED BY ","
這個(gè)只在文本數(shù)據(jù)導(dǎo)入的時(shí)候需要指定甘畅,JSON格式的數(shù)據(jù)導(dǎo)入不需要指定這個(gè)參數(shù)埂蕊。
默認(rèn)為:\t
- columns_mapping:
指定源數(shù)據(jù)中列的映射關(guān)系,以及定義衍生列的生成方式疏唾。
- 映射列:
按順序指定蓄氧,源數(shù)據(jù)中各個(gè)列,對(duì)應(yīng)目的表中的哪些列荸实。對(duì)于希望跳過的列匀们,可以指定一個(gè)不存在的列名。假設(shè)目的表有三列 k1, k2, v1准给。源數(shù)據(jù)有4列泄朴,其中第1重抖、2、4列分別對(duì)應(yīng) k2, k1, v1祖灰。則書寫如下:
COLUMNS (k2, k1, xxx, v1)
其中 xxx 為不存在的一列钟沛,用于跳過源數(shù)據(jù)中的第三列。
- 衍生列:
以 col_name = expr 的形式表示的列局扶,我們稱為衍生列恨统。即支持通過 expr 計(jì)算得出目的表中對(duì)應(yīng)列的值。 衍生列通常排列在映射列之后三妈,雖然這不是強(qiáng)制的規(guī)定畜埋,但是 Doris 總是先解析映射列,再解析衍生列畴蒲。 接上一個(gè)示例悠鞍,假設(shè)目的表還有第4列 v2,v2 由 k1 和 k2 的和產(chǎn)生模燥。則可以書寫如下:
COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
再舉例咖祭,假設(shè)用戶需要導(dǎo)入只包含 k1
一列的表,列類型為 int
蔫骂。并且需要將源文件中的對(duì)應(yīng)列進(jìn)行處理:將負(fù)數(shù)轉(zhuǎn)換為正數(shù)么翰,而將正數(shù)乘以 100。這個(gè)功能可以通過 case when
函數(shù)實(shí)現(xiàn)辽旋,正確寫法應(yīng)如下:
COLUMNS (xx, k1 = case when xx < 0 then cast(-xx as varchar) else cast((xx + '100') as varchar) end)
- where_predicates
用于指定過濾條件浩嫌,以過濾掉不需要的列。過濾列可以是映射列或衍生列戴已。 例如我們只希望導(dǎo)入 k1 大于 100 并且 k2 等于 1000 的列固该,則書寫如下:
WHERE k1 > 100 and k2 = 1000
- partitions
指定導(dǎo)入目的表的哪些 partition 中。如果不指定糖儡,則會(huì)自動(dòng)導(dǎo)入到對(duì)應(yīng)的 partition 中。
示例:
PARTITION(p1, p2, p3)
- delete_on_predicates
表示刪除條件怔匣,僅在 merge type 為MERGE 時(shí)有意義握联,語法與where 相同
- source_sequence:
只適用于UNIQUE_KEYS,相同key列下,保證value列按照source_sequence列進(jìn)行REPLACE, source_sequence可以是數(shù)據(jù)源中的列每瞒,也可以是表結(jié)構(gòu)中的一列金闽。
- preceding_predicates
PRECEDING FILTER predicate
用于過濾原始數(shù)據(jù)。原始數(shù)據(jù)是未經(jīng)列映射剿骨、轉(zhuǎn)換的數(shù)據(jù)代芜。用戶可以在對(duì)轉(zhuǎn)換前的數(shù)據(jù)前進(jìn)行一次過濾,選取期望的數(shù)據(jù)浓利,再進(jìn)行轉(zhuǎn)換挤庇。
3.3.3 job_properties參數(shù)說明
用于指定例行導(dǎo)入作業(yè)的通用參數(shù)钞速。
語法:
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)
目前支持以下參數(shù):
-
desired_concurrent_number
期望的并發(fā)度。一個(gè)例行導(dǎo)入作業(yè)會(huì)被分成多個(gè)子任務(wù)執(zhí)行嫡秕。這個(gè)參數(shù)指定一個(gè)作業(yè)最多有多少任務(wù)可以同時(shí)執(zhí)行渴语。必須大于0。默認(rèn)為3昆咽。 這個(gè)并發(fā)度并不是實(shí)際的并發(fā)度驾凶,實(shí)際的并發(fā)度,會(huì)通過集群的節(jié)點(diǎn)數(shù)掷酗、負(fù)載情況调违,以及數(shù)據(jù)源的情況綜合考慮。
例如:"desired_concurrent_number" = "3"
一個(gè)作業(yè)泻轰,最多有多少 task 同時(shí)在執(zhí)行翰萨。對(duì)于 Kafka 導(dǎo)入而言,當(dāng)前的實(shí)際并發(fā)度計(jì)算如下:
Min(partition num, desired_concurrent_number, alive_backend_num, Config.max_routine_load_task_concurrrent_num)
其中
Config.max_routine_load_task_concurrrent_num
是系統(tǒng)的一個(gè)默認(rèn)的最大并發(fā)數(shù)限制糕殉。這是一個(gè) FE 配置亩鬼,可以通過改配置調(diào)整。默認(rèn)為 5阿蝶。其中 partition num 指訂閱的 Kafka topic 的 partition 數(shù)量雳锋。
alive_backend_num
是當(dāng)前正常的 BE 節(jié)點(diǎn)數(shù)。 -
max_batch_interval/max_batch_rows/max_batch_size
這三個(gè)參數(shù)分別表示:
1)每個(gè)子任務(wù)最大執(zhí)行時(shí)間羡洁,單位是秒玷过。范圍為 5 到 60。默認(rèn)為10筑煮。
2)每個(gè)子任務(wù)最多讀取的行數(shù)辛蚊。必須大于等于200000。默認(rèn)是200000真仲。
3)每個(gè)子任務(wù)最多讀取的字節(jié)數(shù)袋马。單位是字節(jié),范圍是 100MB 到 1GB秸应。默認(rèn)是 100MB虑凛。這三個(gè)參數(shù),用于控制一個(gè)子任務(wù)的執(zhí)行時(shí)間和處理量软啼。當(dāng)任意一個(gè)達(dá)到閾值桑谍,則任務(wù)結(jié)束。
例如:"max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200"
-
max_error_number
采樣窗口內(nèi)祸挪,允許的最大錯(cuò)誤行數(shù)锣披。必須大于等于0。默認(rèn)是 0,即不允許有錯(cuò)誤行雹仿。 采樣窗口為 max_batch_rows * 10增热。即如果在采樣窗口內(nèi),錯(cuò)誤行數(shù)大于 max_error_number盅粪,則會(huì)導(dǎo)致例行作業(yè)被暫停钓葫,需要人工介入檢查數(shù)據(jù)質(zhì)量問題。 被 where 條件過濾掉的行不算錯(cuò)誤行
-
strict_mode
是否開啟嚴(yán)格模式票顾,默認(rèn)為關(guān)閉础浮。如果開啟后,非空原始數(shù)據(jù)的列類型變換如果結(jié)果為 NULL奠骄,則會(huì)被過濾豆同。指定方式為 "strict_mode" = "true"
-
timezone
指定導(dǎo)入作業(yè)所使用的時(shí)區(qū)。默認(rèn)為使用 Session 的 timezone 參數(shù)含鳞。該參數(shù)會(huì)影響所有導(dǎo)入涉及的和時(shí)區(qū)有關(guān)的函數(shù)結(jié)果
-
format
指定導(dǎo)入數(shù)據(jù)格式鹅髓,默認(rèn)是csv皆愉,支持json格式
-
jsonpaths
jsonpaths: 導(dǎo)入json方式分為:簡(jiǎn)單模式和匹配模式薄湿。如果設(shè)置了jsonpath則為匹配模式導(dǎo)入枝缔,否則為簡(jiǎn)單模式導(dǎo)入,具體可參考示例
-
strip_outer_array
布爾類型熔吗,為true表示json數(shù)據(jù)以數(shù)組對(duì)象開始且將數(shù)組對(duì)象中進(jìn)行展平辆床,默認(rèn)值是false
-
json_root
json_root為合法的jsonpath字符串,用于指定json document的根節(jié)點(diǎn)桅狠,默認(rèn)值為""
send_batch_parallelism
整型讼载,用于設(shè)置發(fā)送批處理數(shù)據(jù)的并行度,如果并行度的值超過 BE 配置中的 max_send_batch_parallelism_per_job
中跌,那么作為協(xié)調(diào)點(diǎn)的 BE 將使用 max_send_batch_parallelism_per_job
的值
3.3.4 數(shù)據(jù)源參數(shù)說明
數(shù)據(jù)源的類型咨堤。當(dāng)前支持:Kafka
指定數(shù)據(jù)源相關(guān)的信息。
語法:
(
"key1" = "val1",
"key2" = "val2"
)
-
kafka_broker_list
Kafka 的 broker 連接信息漩符。格式為 ip:host一喘。多個(gè)broker之間以逗號(hào)分隔。
示例:"kafka_broker_list" = "broker1:9092,broker2:9092"
-
kafka_topic
指定要訂閱的 Kafka 的 topic陨仅。
示例:"kafka_topic" = "my_topic"
-
kafka_partitions/kafka_offsets
指定需要訂閱的 kafka partition津滞,以及對(duì)應(yīng)的每個(gè) partition 的起始 offset。
offset 可以指定從大于等于 0 的具體 offset灼伤,或者:
- OFFSET_BEGINNING: 從有數(shù)據(jù)的位置開始訂閱。
- OFFSET_END: 從末尾開始訂閱咪鲜。
- 時(shí)間戳狐赡,格式必須如:"2021-05-11 10:00:00",系統(tǒng)會(huì)自動(dòng)定位到大于等于該時(shí)間戳的第一個(gè)消息的offset疟丙。注意颖侄,時(shí)間戳格式的offset不能和數(shù)字類型混用鸟雏,只能選其一。
如果沒有指定览祖,則默認(rèn)從 OFFSET_END 開始訂閱 topic 下的所有 partition孝鹊。
示例:
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
"kafka_partitions" = "0,1",
"kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11 11:00:00"
-
property
指定自定義kafka參數(shù)。
- 功能等同于kafka shell中 "--property" 參數(shù)展蒂。
- 當(dāng)參數(shù)的 value 為一個(gè)文件時(shí)又活,需要在 value 前加上關(guān)鍵詞:"FILE:"。
- 關(guān)于如何創(chuàng)建文件锰悼,請(qǐng)參閱 "HELP CREATE FILE;"
- 更多支持的自定義參數(shù)柳骄,請(qǐng)參閱 librdkafka 的官方 CONFIGURATION 文檔中,client 端的配置項(xiàng)箕般。
示例:
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"
1.使用 SSL 連接 Kafka 時(shí)耐薯,需要指定以下參數(shù):
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg"
其中: "property.security.protocol" 和 "property.ssl.ca.location" 為必須,用于指明連接方式為 SSL丝里,以及 CA
證書的位置曲初。
如果 Kafka server 端開啟了 client 認(rèn)證,則還需設(shè)置:
"property.ssl.certificate.location"
"property.ssl.key.location"
"property.ssl.key.password"
分別用于指定 client 的 public key杯聚,private key 以及 private key 的密碼
2.指定kafka partition的默認(rèn)起始o(jì)ffset
如果沒有指定kafka_partitions/kafka_offsets,默認(rèn)消費(fèi)所有分區(qū),此時(shí)可以指定kafka_default_offsets指定起始
offset臼婆。默認(rèn)為 OFFSET_END,即從末尾開始訂閱械媒。
值為
1) OFFSET_BEGINNING: 從有數(shù)據(jù)的位置開始訂閱目锭。
2) ND: 從末尾開始訂閱。
3) 時(shí)間戳纷捞,格式同 kafka_offsets
示例:
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
"property.kafka_default_offsets" = "2021-05-11 10:00:00"
3.3.5 導(dǎo)入數(shù)據(jù)格式樣例
- 整型類(TINYINT/SMALLINT/INT/BIGINT/LARGEINT):1, 1000, 1234
- 浮點(diǎn)類(FLOAT/DOUBLE/DECIMAL):1.1, 0.23, .356
- 日期類(DATE/DATETIME):2017-10-03, 2017-06-13 12:34:03痢虹。
- 字符串類(CHAR/VARCHAR)(無引號(hào)):I am a student, a
- NULL值:\N
3.3 查看作業(yè)狀態(tài)
查看作業(yè)狀態(tài)的具體命令和示例可以通過 HELP SHOW ROUTINE LOAD;
命令查看。
查看任務(wù)運(yùn)行狀態(tài)的具體命令和示例可以通過 HELP SHOW ROUTINE LOAD TASK;
命令查看主儡。
只能查看當(dāng)前正在運(yùn)行中的任務(wù)奖唯,已結(jié)束和未開始的任務(wù)無法查看
3.4 修改作業(yè)屬性
用戶可以修改已經(jīng)創(chuàng)建的作業(yè)。具體說明可以通過 HELP ALTER ROUTINE LOAD;
命令查看糜值。
3.5 作業(yè)控制
用戶可以通過 STOP/PAUSE/RESUME
三個(gè)命令來控制作業(yè)的停止丰捷,暫停和重啟〖呕悖可以通過 HELP STOP ROUTINE LOAD;
, HELP PAUSE ROUTINE LOAD;
以及 HELP RESUME ROUTINE LOAD;
三個(gè)命令查看幫助和示例病往。
4. 使用示例
4.1 創(chuàng)建Doris數(shù)據(jù)表
CREATE TABLE `example_table` (
`id` int,
`name` varchar(11),
`age` int,
`address` varchar(50)
)
DISTRIBUTED BY HASH(id) BUCKETS 2
PROPERTIES(
"replication_num" = "3"
);
4.2 創(chuàng)建Routine Load 任務(wù)
這個(gè)示例是以JSON格式為例
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number"="2",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
"strip_outer_array" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "test-dev-bigdata5:9092,test-dev-bigdata6:9092,test-dev-bigdata7:9092",
"kafka_topic" = "test_doris_kafka_load",
"property.group.id" = "test1",
"property.client.id" = "test1",
"kafka_partitions" = "0",
"kafka_offsets" = "0"
);
文本數(shù)據(jù)格式的示例:
CREATE ROUTINE LOAD example_db.test_job ON example_tbl
COLUMNS TERMINATED BY ",",
COLUMNS(k1,k2,source_sequence,v1,v2),
ORDER BY source_sequence
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "30",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
) FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
4.3 示例數(shù)據(jù)
[{
"category": "11",
"title": "SayingsoftheCentury",
"price": 895,
"timestamp": 1589191587
},
{
"category": "22",
"author": "2avc",
"price": 895,
"timestamp": 1589191487
},
{
"category": "33",
"author": "3avc",
"title": "SayingsoftheCentury",
"timestamp": 1589191387
}
]
5.注意事項(xiàng)
5.1 例行導(dǎo)入作業(yè)和 ALTER TABLE 操作的關(guān)系
- 例行導(dǎo)入不會(huì)阻塞 SCHEMA CHANGE 和 ROLLUP 操作。但是注意如果 SCHEMA CHANGE 完成后骄瓣,列映射關(guān)系無法匹配停巷,則會(huì)導(dǎo)致作業(yè)的錯(cuò)誤數(shù)據(jù)激增,最終導(dǎo)致作業(yè)暫停。建議通過在例行導(dǎo)入作業(yè)中顯式指定列映射關(guān)系畔勤,以及通過增加 Nullable 列或帶 Default 值的列來減少這類問題蕾各。
- 刪除表的 Partition 可能會(huì)導(dǎo)致導(dǎo)入數(shù)據(jù)無法找到對(duì)應(yīng)的 Partition,作業(yè)進(jìn)入暫停庆揪。
5.2 例行導(dǎo)入作業(yè)和其他導(dǎo)入作業(yè)的關(guān)系(LOAD, DELETE, INSERT)
- 例行導(dǎo)入和其他 LOAD 作業(yè)以及 INSERT 操作沒有沖突式曲。
- 當(dāng)執(zhí)行 DELETE 操作時(shí),對(duì)應(yīng)表分區(qū)不能有任何正在執(zhí)行的導(dǎo)入任務(wù)缸榛。所以在執(zhí)行 DELETE 操作前吝羞,可能需要先暫停例行導(dǎo)入作業(yè),并等待已下發(fā)的 task 全部完成后仔掸,才可以執(zhí)行 DELETE脆贵。
5.3 例行導(dǎo)入作業(yè)和 DROP DATABASE/TABLE 操作的關(guān)系
當(dāng)例行導(dǎo)入對(duì)應(yīng)的 database 或 table 被刪除后,作業(yè)會(huì)自動(dòng) CANCEL
5.4 kafka 類型的例行導(dǎo)入作業(yè)和 kafka topic 的關(guān)系
當(dāng)用戶在創(chuàng)建例行導(dǎo)入聲明的 kafka_topic
在kafka集群中不存在時(shí)起暮。
- 如果用戶 kafka 集群的 broker 設(shè)置了
auto.create.topics.enable = true
卖氨,則kafka_topic
會(huì)先被自動(dòng)創(chuàng)建,自動(dòng)創(chuàng)建的 partition 個(gè)數(shù)是由用戶方的kafka集群中的 broker 配置num.partitions
決定的负懦。例行作業(yè)會(huì)正常的不斷讀取該 topic 的數(shù)據(jù)筒捺。 - 如果用戶 kafka 集群的 broker 設(shè)置了
auto.create.topics.enable = false
, 則 topic 不會(huì)被自動(dòng)創(chuàng)建,例行作業(yè)會(huì)在沒有讀取任何數(shù)據(jù)之前就被暫停纸厉,狀態(tài)為PAUSED
系吭。
所以,如果用戶希望當(dāng) kafka topic 不存在的時(shí)候颗品,被例行作業(yè)自動(dòng)創(chuàng)建的話肯尺,只需要將用戶方的kafka集群中的 broker 設(shè)置 auto.create.topics.enable = true
即可。
5.5 網(wǎng)絡(luò)問題
- 創(chuàng)建Routine load 任務(wù)中指定的 Broker list 必須能夠被Doris服務(wù)訪問
- Kafka 中如果配置了
advertised.listeners
,advertised.listeners
中的地址必須能夠被Doris服務(wù)訪問 - 連接kafka集群的時(shí)候建議換成Kafka集群對(duì)應(yīng)的主機(jī)名
5.6 關(guān)于指定消費(fèi)的 Partition 和 Offset
oris 支持指定 Partition 和 Offset 開始消費(fèi)躯枢。新版中還支持了指定時(shí)間點(diǎn)進(jìn)行消費(fèi)的功能则吟。這里說明下對(duì)應(yīng)參數(shù)的配置關(guān)系。
有三個(gè)相關(guān)參數(shù):
-
kafka_partitions
:指定待消費(fèi)的 partition 列表锄蹂,如:"0, 1, 2, 3"氓仲。 -
kafka_offsets
:指定每個(gè)分區(qū)的起始o(jì)ffset,必須和kafka_partitions
列表個(gè)數(shù)對(duì)應(yīng)得糜。如:"1000, 1000, 2000, 2000" -
property.kafka_default_offset
:指定分區(qū)默認(rèn)的起始o(jì)ffset