Apache Doris Routine Load數(shù)據(jù)導(dǎo)入使用方法

Apache Doris 代碼倉庫地址:apache/incubator-doris 歡迎大家關(guān)注加星


1.概要

Routine load 功能為用戶提供了一種自動(dòng)從指定數(shù)據(jù)源進(jìn)行數(shù)據(jù)導(dǎo)入的功能扒披。
Routine Load 是支持用戶提交一個(gè)常駐的導(dǎo)入任務(wù),通過不斷的從指定的數(shù)據(jù)源讀取數(shù)據(jù),將數(shù)據(jù)導(dǎo)入到 Doris 中。目前僅支持通過無認(rèn)證或者 SSL 認(rèn)證方式,從 Kakfa 導(dǎo)入的數(shù)據(jù)偎行。
Routine load是一種同步的數(shù)據(jù)導(dǎo)入方式。
Routine load 支持導(dǎo)入的數(shù)據(jù)類型: 文本 和 JSON兩種格式

2. 原理

image-20210926092117050.png

FE 通過 JobScheduler 將一個(gè)導(dǎo)入作業(yè)拆分成若干個(gè) Task(一般是和Kafka的Partition數(shù)量一致)。每個(gè) Task 負(fù)責(zé)導(dǎo)入指定的一部分?jǐn)?shù)據(jù)烹俗。Task 被 TaskScheduler 分配到指定的 BE 上執(zhí)行爆侣。

在 BE 上,一個(gè) Task 被視為一個(gè)普通的導(dǎo)入任務(wù)幢妄,通過 Stream Load 的導(dǎo)入機(jī)制進(jìn)行導(dǎo)入兔仰。導(dǎo)入完成后,向 FE 匯報(bào)蕉鸳。

FE 中的 JobScheduler 根據(jù)匯報(bào)結(jié)果斋陪,繼續(xù)生成后續(xù)新的 Task,或者對(duì)失敗的 Task 進(jìn)行重試置吓。

整個(gè)例行導(dǎo)入作業(yè)通過不斷的產(chǎn)生新的 Task无虚,來完成數(shù)據(jù)不間斷的導(dǎo)入

3. 使用方式

3.1 使用限制

  1. 支持無認(rèn)證的 Kafka 訪問,以及通過 SSL 方式認(rèn)證的 Kafka 集群衍锚。
  2. 支持的消息格式為 csv, json 文本格式友题。csv 每一個(gè) message 為一行,且行尾不包含換行符戴质。
  3. 僅支持 Kafka 0.10.0.0(含) 以上版本

3.2 Routine Load SQL語法

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]

3.2.1 Routine load 作業(yè)參數(shù)說明

  1. [db.]job_name

    導(dǎo)入作業(yè)的名稱度宦,在同一個(gè) database 內(nèi),相同名稱只能有一個(gè) job 在運(yùn)行告匠。

  2. tbl_name

    指定需要導(dǎo)入的表的名稱戈抄。

  3. merge_type
    數(shù)據(jù)的合并類型,一共支持三種類型APPEND后专、DELETE划鸽、MERGE 其中,APPEND是默認(rèn)值戚哎,表示這批數(shù)據(jù)全部需要追加到現(xiàn)有數(shù)據(jù)中裸诽,DELETE 表示刪除與這批數(shù)據(jù)key相同的所有行,MERGE 語義 需要與delete on條件聯(lián)合使用型凳,表示滿足delete 條件的數(shù)據(jù)按照DELETE 語義處理其余的按照APPEND 語義處理, 語法為[WITH MERGE|APPEND|DELETE]

3.2.2 load_properties參數(shù)說明

這部分參數(shù)用于描述導(dǎo)入數(shù)據(jù)丈冬。語法:
[column_separator],
[columns_mapping],
[where_predicates],
[delete_on_predicates],
[source_sequence],
[partitions],
[preceding_predicates]

  1. column_separator:

指定列分隔符,如:

COLUMNS TERMINATED BY ","

這個(gè)只在文本數(shù)據(jù)導(dǎo)入的時(shí)候需要指定甘畅,JSON格式的數(shù)據(jù)導(dǎo)入不需要指定這個(gè)參數(shù)埂蕊。

默認(rèn)為:\t

  1. columns_mapping:

指定源數(shù)據(jù)中列的映射關(guān)系,以及定義衍生列的生成方式疏唾。

  • 映射列:

按順序指定蓄氧,源數(shù)據(jù)中各個(gè)列,對(duì)應(yīng)目的表中的哪些列荸实。對(duì)于希望跳過的列匀们,可以指定一個(gè)不存在的列名。假設(shè)目的表有三列 k1, k2, v1准给。源數(shù)據(jù)有4列泄朴,其中第1重抖、2、4列分別對(duì)應(yīng) k2, k1, v1祖灰。則書寫如下:

COLUMNS (k2, k1, xxx, v1)

其中 xxx 為不存在的一列钟沛,用于跳過源數(shù)據(jù)中的第三列。

  • 衍生列:

以 col_name = expr 的形式表示的列局扶,我們稱為衍生列恨统。即支持通過 expr 計(jì)算得出目的表中對(duì)應(yīng)列的值。 衍生列通常排列在映射列之后三妈,雖然這不是強(qiáng)制的規(guī)定畜埋,但是 Doris 總是先解析映射列,再解析衍生列畴蒲。 接上一個(gè)示例悠鞍,假設(shè)目的表還有第4列 v2,v2 由 k1 和 k2 的和產(chǎn)生模燥。則可以書寫如下:

COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);

再舉例咖祭,假設(shè)用戶需要導(dǎo)入只包含 k1 一列的表,列類型為 int蔫骂。并且需要將源文件中的對(duì)應(yīng)列進(jìn)行處理:將負(fù)數(shù)轉(zhuǎn)換為正數(shù)么翰,而將正數(shù)乘以 100。這個(gè)功能可以通過 case when 函數(shù)實(shí)現(xiàn)辽旋,正確寫法應(yīng)如下:

COLUMNS (xx, k1 = case when xx < 0 then cast(-xx as varchar) else cast((xx + '100') as varchar) end)
  1. where_predicates

用于指定過濾條件浩嫌,以過濾掉不需要的列。過濾列可以是映射列或衍生列戴已。 例如我們只希望導(dǎo)入 k1 大于 100 并且 k2 等于 1000 的列固该,則書寫如下:

WHERE k1 > 100 and k2 = 1000
  1. partitions

指定導(dǎo)入目的表的哪些 partition 中。如果不指定糖儡,則會(huì)自動(dòng)導(dǎo)入到對(duì)應(yīng)的 partition 中。

示例:

PARTITION(p1, p2, p3)
  1. delete_on_predicates

表示刪除條件怔匣,僅在 merge type 為MERGE 時(shí)有意義握联,語法與where 相同

  1. source_sequence:

只適用于UNIQUE_KEYS,相同key列下,保證value列按照source_sequence列進(jìn)行REPLACE, source_sequence可以是數(shù)據(jù)源中的列每瞒,也可以是表結(jié)構(gòu)中的一列金闽。

  1. preceding_predicates
PRECEDING FILTER predicate

用于過濾原始數(shù)據(jù)。原始數(shù)據(jù)是未經(jīng)列映射剿骨、轉(zhuǎn)換的數(shù)據(jù)代芜。用戶可以在對(duì)轉(zhuǎn)換前的數(shù)據(jù)前進(jìn)行一次過濾,選取期望的數(shù)據(jù)浓利,再進(jìn)行轉(zhuǎn)換挤庇。

3.3.3 job_properties參數(shù)說明

用于指定例行導(dǎo)入作業(yè)的通用參數(shù)钞速。
語法:

PROPERTIES (
    "key1" = "val1",
    "key2" = "val2"
)

目前支持以下參數(shù):

  1. desired_concurrent_number

    期望的并發(fā)度。一個(gè)例行導(dǎo)入作業(yè)會(huì)被分成多個(gè)子任務(wù)執(zhí)行嫡秕。這個(gè)參數(shù)指定一個(gè)作業(yè)最多有多少任務(wù)可以同時(shí)執(zhí)行渴语。必須大于0。默認(rèn)為3昆咽。 這個(gè)并發(fā)度并不是實(shí)際的并發(fā)度驾凶,實(shí)際的并發(fā)度,會(huì)通過集群的節(jié)點(diǎn)數(shù)掷酗、負(fù)載情況调违,以及數(shù)據(jù)源的情況綜合考慮。
    例如:

    "desired_concurrent_number" = "3"
    

    一個(gè)作業(yè)泻轰,最多有多少 task 同時(shí)在執(zhí)行翰萨。對(duì)于 Kafka 導(dǎo)入而言,當(dāng)前的實(shí)際并發(fā)度計(jì)算如下:

    Min(partition num, desired_concurrent_number, alive_backend_num, Config.max_routine_load_task_concurrrent_num)
    

    其中 Config.max_routine_load_task_concurrrent_num 是系統(tǒng)的一個(gè)默認(rèn)的最大并發(fā)數(shù)限制糕殉。這是一個(gè) FE 配置亩鬼,可以通過改配置調(diào)整。默認(rèn)為 5阿蝶。

    其中 partition num 指訂閱的 Kafka topic 的 partition 數(shù)量雳锋。alive_backend_num 是當(dāng)前正常的 BE 節(jié)點(diǎn)數(shù)。

  2. max_batch_interval/max_batch_rows/max_batch_size

    這三個(gè)參數(shù)分別表示:
    1)每個(gè)子任務(wù)最大執(zhí)行時(shí)間羡洁,單位是秒玷过。范圍為 5 到 60。默認(rèn)為10筑煮。
    2)每個(gè)子任務(wù)最多讀取的行數(shù)辛蚊。必須大于等于200000。默認(rèn)是200000真仲。
    3)每個(gè)子任務(wù)最多讀取的字節(jié)數(shù)袋马。單位是字節(jié),范圍是 100MB 到 1GB秸应。默認(rèn)是 100MB虑凛。

    這三個(gè)參數(shù),用于控制一個(gè)子任務(wù)的執(zhí)行時(shí)間和處理量软啼。當(dāng)任意一個(gè)達(dá)到閾值桑谍,則任務(wù)結(jié)束。
    例如:

    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200"
    
  3. max_error_number

    采樣窗口內(nèi)祸挪,允許的最大錯(cuò)誤行數(shù)锣披。必須大于等于0。默認(rèn)是 0,即不允許有錯(cuò)誤行雹仿。 采樣窗口為 max_batch_rows * 10增热。即如果在采樣窗口內(nèi),錯(cuò)誤行數(shù)大于 max_error_number盅粪,則會(huì)導(dǎo)致例行作業(yè)被暫停钓葫,需要人工介入檢查數(shù)據(jù)質(zhì)量問題。 被 where 條件過濾掉的行不算錯(cuò)誤行

  4. strict_mode

    是否開啟嚴(yán)格模式票顾,默認(rèn)為關(guān)閉础浮。如果開啟后,非空原始數(shù)據(jù)的列類型變換如果結(jié)果為 NULL奠骄,則會(huì)被過濾豆同。指定方式為 "strict_mode" = "true"

  5. timezone

    指定導(dǎo)入作業(yè)所使用的時(shí)區(qū)。默認(rèn)為使用 Session 的 timezone 參數(shù)含鳞。該參數(shù)會(huì)影響所有導(dǎo)入涉及的和時(shí)區(qū)有關(guān)的函數(shù)結(jié)果

  6. format

    指定導(dǎo)入數(shù)據(jù)格式鹅髓,默認(rèn)是csv皆愉,支持json格式

  7. jsonpaths

    jsonpaths: 導(dǎo)入json方式分為:簡(jiǎn)單模式和匹配模式薄湿。如果設(shè)置了jsonpath則為匹配模式導(dǎo)入枝缔,否則為簡(jiǎn)單模式導(dǎo)入,具體可參考示例

  8. strip_outer_array

    布爾類型熔吗,為true表示json數(shù)據(jù)以數(shù)組對(duì)象開始且將數(shù)組對(duì)象中進(jìn)行展平辆床,默認(rèn)值是false

  9. json_root

    json_root為合法的jsonpath字符串,用于指定json document的根節(jié)點(diǎn)桅狠,默認(rèn)值為""

  10. send_batch_parallelism

整型讼载,用于設(shè)置發(fā)送批處理數(shù)據(jù)的并行度,如果并行度的值超過 BE 配置中的 max_send_batch_parallelism_per_job中跌,那么作為協(xié)調(diào)點(diǎn)的 BE 將使用 max_send_batch_parallelism_per_job 的值

3.3.4 數(shù)據(jù)源參數(shù)說明

數(shù)據(jù)源的類型咨堤。當(dāng)前支持:Kafka

指定數(shù)據(jù)源相關(guān)的信息。

語法:

(
    "key1" = "val1",
    "key2" = "val2"
)
  1. kafka_broker_list

    Kafka 的 broker 連接信息漩符。格式為 ip:host一喘。多個(gè)broker之間以逗號(hào)分隔。
    示例:

    "kafka_broker_list" = "broker1:9092,broker2:9092"
    
  2. kafka_topic

    指定要訂閱的 Kafka 的 topic陨仅。
    示例:

    "kafka_topic" = "my_topic"
    
  3. kafka_partitions/kafka_offsets

    指定需要訂閱的 kafka partition津滞,以及對(duì)應(yīng)的每個(gè) partition 的起始 offset。

    offset 可以指定從大于等于 0 的具體 offset灼伤,或者:

    • OFFSET_BEGINNING: 從有數(shù)據(jù)的位置開始訂閱。
    • OFFSET_END: 從末尾開始訂閱咪鲜。
    • 時(shí)間戳狐赡,格式必須如:"2021-05-11 10:00:00",系統(tǒng)會(huì)自動(dòng)定位到大于等于該時(shí)間戳的第一個(gè)消息的offset疟丙。注意颖侄,時(shí)間戳格式的offset不能和數(shù)字類型混用鸟雏,只能選其一。

    如果沒有指定览祖,則默認(rèn)從 OFFSET_END 開始訂閱 topic 下的所有 partition孝鹊。
    示例:

"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
"kafka_partitions" = "0,1",
"kafka_offsets" = "2021-05-11 10:00:00, 2021-05-11 11:00:00"
  1. property

    指定自定義kafka參數(shù)。

    • 功能等同于kafka shell中 "--property" 參數(shù)展蒂。
    • 當(dāng)參數(shù)的 value 為一個(gè)文件時(shí)又活,需要在 value 前加上關(guān)鍵詞:"FILE:"。
    • 關(guān)于如何創(chuàng)建文件锰悼,請(qǐng)參閱 "HELP CREATE FILE;"
    • 更多支持的自定義參數(shù)柳骄,請(qǐng)參閱 librdkafka 的官方 CONFIGURATION 文檔中,client 端的配置項(xiàng)箕般。

示例:

"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"

1.使用 SSL 連接 Kafka 時(shí)耐薯,需要指定以下參數(shù):

"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg"

其中: "property.security.protocol" 和 "property.ssl.ca.location" 為必須,用于指明連接方式為 SSL丝里,以及 CA

證書的位置曲初。

如果 Kafka server 端開啟了 client 認(rèn)證,則還需設(shè)置:

"property.ssl.certificate.location"
"property.ssl.key.location"
"property.ssl.key.password"

分別用于指定 client 的 public key杯聚,private key 以及 private key 的密碼

2.指定kafka partition的默認(rèn)起始o(jì)ffset

如果沒有指定kafka_partitions/kafka_offsets,默認(rèn)消費(fèi)所有分區(qū),此時(shí)可以指定kafka_default_offsets指定起始

offset臼婆。默認(rèn)為 OFFSET_END,即從末尾開始訂閱械媒。
值為

              1) OFFSET_BEGINNING: 從有數(shù)據(jù)的位置開始訂閱目锭。
              2) ND: 從末尾開始訂閱。
              3) 時(shí)間戳纷捞,格式同 kafka_offsets

示例:

"property.kafka_default_offsets" = "OFFSET_BEGINNING"
"property.kafka_default_offsets" = "2021-05-11 10:00:00"

3.3.5 導(dǎo)入數(shù)據(jù)格式樣例

  1. 整型類(TINYINT/SMALLINT/INT/BIGINT/LARGEINT):1, 1000, 1234
  2. 浮點(diǎn)類(FLOAT/DOUBLE/DECIMAL):1.1, 0.23, .356
  3. 日期類(DATE/DATETIME):2017-10-03, 2017-06-13 12:34:03痢虹。
  4. 字符串類(CHAR/VARCHAR)(無引號(hào)):I am a student, a
  5. NULL值:\N

3.3 查看作業(yè)狀態(tài)

查看作業(yè)狀態(tài)的具體命令和示例可以通過 HELP SHOW ROUTINE LOAD; 命令查看。

查看任務(wù)運(yùn)行狀態(tài)的具體命令和示例可以通過 HELP SHOW ROUTINE LOAD TASK; 命令查看主儡。

只能查看當(dāng)前正在運(yùn)行中的任務(wù)奖唯,已結(jié)束和未開始的任務(wù)無法查看

3.4 修改作業(yè)屬性

用戶可以修改已經(jīng)創(chuàng)建的作業(yè)。具體說明可以通過 HELP ALTER ROUTINE LOAD; 命令查看糜值。

3.5 作業(yè)控制

用戶可以通過 STOP/PAUSE/RESUME 三個(gè)命令來控制作業(yè)的停止丰捷,暫停和重啟〖呕悖可以通過 HELP STOP ROUTINE LOAD;, HELP PAUSE ROUTINE LOAD; 以及 HELP RESUME ROUTINE LOAD; 三個(gè)命令查看幫助和示例病往。

4. 使用示例

4.1 創(chuàng)建Doris數(shù)據(jù)表

CREATE TABLE `example_table` (
  `id` int,
  `name` varchar(11),  
  `age` int,
  `address` varchar(50)
) 
DISTRIBUTED BY HASH(id) BUCKETS 2
PROPERTIES( 
"replication_num" = "3"
);

4.2 創(chuàng)建Routine Load 任務(wù)

這個(gè)示例是以JSON格式為例

CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
PROPERTIES
(
  "desired_concurrent_number"="2",
  "max_batch_interval" = "20",
  "max_batch_rows" = "300000",
  "max_batch_size" = "209715200",
  "strict_mode" = "false",
  "format" = "json",
  "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
  "strip_outer_array" = "true"
)
FROM KAFKA
(
  "kafka_broker_list" = "test-dev-bigdata5:9092,test-dev-bigdata6:9092,test-dev-bigdata7:9092",
  "kafka_topic" = "test_doris_kafka_load",
  "property.group.id" = "test1", 
  "property.client.id" = "test1",
  "kafka_partitions" = "0",
  "kafka_offsets" = "0"
);

文本數(shù)據(jù)格式的示例:

CREATE ROUTINE LOAD example_db.test_job ON example_tbl
COLUMNS TERMINATED BY ",",
COLUMNS(k1,k2,source_sequence,v1,v2),
ORDER BY source_sequence
PROPERTIES
(
    "desired_concurrent_number"="3",
    "max_batch_interval" = "30",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200"
) FROM KAFKA
(
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic",
    "kafka_partitions" = "0,1,2,3",
    "kafka_offsets" = "101,0,0,200"
);   

4.3 示例數(shù)據(jù)

[{
        "category": "11",
        "title": "SayingsoftheCentury",
        "price": 895,
        "timestamp": 1589191587
    },
    {
        "category": "22",
        "author": "2avc",
        "price": 895,
        "timestamp": 1589191487
    },
    {
        "category": "33",
        "author": "3avc",
        "title": "SayingsoftheCentury",
        "timestamp": 1589191387
    }
] 

5.注意事項(xiàng)

5.1 例行導(dǎo)入作業(yè)和 ALTER TABLE 操作的關(guān)系

  • 例行導(dǎo)入不會(huì)阻塞 SCHEMA CHANGE 和 ROLLUP 操作。但是注意如果 SCHEMA CHANGE 完成后骄瓣,列映射關(guān)系無法匹配停巷,則會(huì)導(dǎo)致作業(yè)的錯(cuò)誤數(shù)據(jù)激增,最終導(dǎo)致作業(yè)暫停。建議通過在例行導(dǎo)入作業(yè)中顯式指定列映射關(guān)系畔勤,以及通過增加 Nullable 列或帶 Default 值的列來減少這類問題蕾各。
  • 刪除表的 Partition 可能會(huì)導(dǎo)致導(dǎo)入數(shù)據(jù)無法找到對(duì)應(yīng)的 Partition,作業(yè)進(jìn)入暫停庆揪。

5.2 例行導(dǎo)入作業(yè)和其他導(dǎo)入作業(yè)的關(guān)系(LOAD, DELETE, INSERT)

  • 例行導(dǎo)入和其他 LOAD 作業(yè)以及 INSERT 操作沒有沖突式曲。
  • 當(dāng)執(zhí)行 DELETE 操作時(shí),對(duì)應(yīng)表分區(qū)不能有任何正在執(zhí)行的導(dǎo)入任務(wù)缸榛。所以在執(zhí)行 DELETE 操作前吝羞,可能需要先暫停例行導(dǎo)入作業(yè),并等待已下發(fā)的 task 全部完成后仔掸,才可以執(zhí)行 DELETE脆贵。

5.3 例行導(dǎo)入作業(yè)和 DROP DATABASE/TABLE 操作的關(guān)系

當(dāng)例行導(dǎo)入對(duì)應(yīng)的 database 或 table 被刪除后,作業(yè)會(huì)自動(dòng) CANCEL

5.4 kafka 類型的例行導(dǎo)入作業(yè)和 kafka topic 的關(guān)系

當(dāng)用戶在創(chuàng)建例行導(dǎo)入聲明的 kafka_topic 在kafka集群中不存在時(shí)起暮。

  • 如果用戶 kafka 集群的 broker 設(shè)置了 auto.create.topics.enable = true卖氨,則 kafka_topic 會(huì)先被自動(dòng)創(chuàng)建,自動(dòng)創(chuàng)建的 partition 個(gè)數(shù)是由用戶方的kafka集群中的 broker 配置 num.partitions 決定的负懦。例行作業(yè)會(huì)正常的不斷讀取該 topic 的數(shù)據(jù)筒捺。
  • 如果用戶 kafka 集群的 broker 設(shè)置了 auto.create.topics.enable = false, 則 topic 不會(huì)被自動(dòng)創(chuàng)建,例行作業(yè)會(huì)在沒有讀取任何數(shù)據(jù)之前就被暫停纸厉,狀態(tài)為 PAUSED系吭。

所以,如果用戶希望當(dāng) kafka topic 不存在的時(shí)候颗品,被例行作業(yè)自動(dòng)創(chuàng)建的話肯尺,只需要將用戶方的kafka集群中的 broker 設(shè)置 auto.create.topics.enable = true 即可。

5.5 網(wǎng)絡(luò)問題

  1. 創(chuàng)建Routine load 任務(wù)中指定的 Broker list 必須能夠被Doris服務(wù)訪問
  2. Kafka 中如果配置了advertised.listeners, advertised.listeners 中的地址必須能夠被Doris服務(wù)訪問
  3. 連接kafka集群的時(shí)候建議換成Kafka集群對(duì)應(yīng)的主機(jī)名

5.6 關(guān)于指定消費(fèi)的 Partition 和 Offset

oris 支持指定 Partition 和 Offset 開始消費(fèi)躯枢。新版中還支持了指定時(shí)間點(diǎn)進(jìn)行消費(fèi)的功能则吟。這里說明下對(duì)應(yīng)參數(shù)的配置關(guān)系。

有三個(gè)相關(guān)參數(shù):

  • kafka_partitions:指定待消費(fèi)的 partition 列表锄蹂,如:"0, 1, 2, 3"氓仲。
  • kafka_offsets:指定每個(gè)分區(qū)的起始o(jì)ffset,必須和 kafka_partitions 列表個(gè)數(shù)對(duì)應(yīng)得糜。如:"1000, 1000, 2000, 2000"
  • property.kafka_default_offset:指定分區(qū)默認(rèn)的起始o(jì)ffset
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末敬扛,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子朝抖,更是在濱河造成了極大的恐慌啥箭,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,546評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件治宣,死亡現(xiàn)場(chǎng)離奇詭異捉蚤,居然都是意外死亡抬驴,警方通過查閱死者的電腦和手機(jī)炼七,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門缆巧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人豌拙,你說我怎么就攤上這事陕悬。” “怎么了按傅?”我有些...
    開封第一講書人閱讀 164,911評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵捉超,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我唯绍,道長(zhǎng)拼岳,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,737評(píng)論 1 294
  • 正文 為了忘掉前任况芒,我火速辦了婚禮惜纸,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘绝骚。我一直安慰自己耐版,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評(píng)論 6 392
  • 文/花漫 我一把揭開白布压汪。 她就那樣靜靜地躺著粪牲,像睡著了一般。 火紅的嫁衣襯著肌膚如雪止剖。 梳的紋絲不亂的頭發(fā)上腺阳,一...
    開封第一講書人閱讀 51,598評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音穿香,去河邊找鬼亭引。 笑死,一個(gè)胖子當(dāng)著我的面吹牛扔水,可吹牛的內(nèi)容都是我干的痛侍。 我是一名探鬼主播,決...
    沈念sama閱讀 40,338評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼魔市,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼主届!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起待德,我...
    開封第一講書人閱讀 39,249評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤君丁,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后将宪,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體绘闷,經(jīng)...
    沈念sama閱讀 45,696評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡橡庞,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了印蔗。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片扒最。...
    茶點(diǎn)故事閱讀 40,013評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖华嘹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情耙厚,我是刑警寧澤强挫,帶...
    沈念sama閱讀 35,731評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站薛躬,受9級(jí)特大地震影響俯渤,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜型宝,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評(píng)論 3 330
  • 文/蒙蒙 一八匠、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧诡曙,春花似錦臀叙、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至慎璧,卻和暖如春床嫌,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背胸私。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工厌处, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人岁疼。 一個(gè)月前我還...
    沈念sama閱讀 48,203評(píng)論 3 370
  • 正文 我出身青樓阔涉,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親捷绒。 傳聞我的和親對(duì)象是個(gè)殘疾皇子瑰排,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容