復制自:https://liyuj.gitee.io/confluent/Kafka-ErrorHandlingDeadLetterQueues.html
Kafka連接器是Kafka的一部分,是在Kafka和其它技術之間構建流式管道的一個強有力的框架。它可用于將數據從多個地方(包括數據庫、消息隊列和文本文件)流式注入到Kafka揩局,以及從Kafka將數據流式傳輸到目標端(如文檔存儲灵莲、NoSQL弛针、數據庫、對象存儲等)中棠绘。
現實世界并不完美翠语,出錯是難免的,因此在出錯時Kafka的管道能盡可能優(yōu)雅地處理是最好的财边。一個常見的場景是獲取與特定序列化格式不匹配的主題的消息(比如預期為Avro時實際為JSON肌括,反之亦然)。自從Kafka 2.0版本發(fā)布以來酣难,Kafka連接器包含了錯誤處理選項谍夭,即將消息路由到死信隊列的功能,這是構建數據管道的常用技術憨募。
在本文中將介紹幾種處理問題的常見模式紧索,并說明如何實現。
#失敗后立即停止
有時可能希望在發(fā)生錯誤時立即停止處理菜谣,可能遇到質量差的數據是由于上游的原因導致的珠漂,必須由上游來解決晚缩,繼續(xù)嘗試處理其它的消息已經沒有意義。
這是Kafka連接器的默認行為媳危,也可以使用下面的配置項顯式地指定:
errors.tolerance=none
在本示例中荞彼,該連接器配置為從主題中讀取JSON格式數據,然后將其寫入純文本文件待笑。注意這里為了演示使用的是FileStreamSinkConnector連接器鸣皂,不建議在生產中使用。
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
? ? ? ? "name": "file_sink_01",
? ? ? ? "config": {
? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
? ? ? ? ? ? ? ? "topics":"test_topic_json",
? ? ? ? ? ? ? ? "value.converter":"org.apache.kafka.connect.json.JsonConverter",
? ? ? ? ? ? ? ? "value.converter.schemas.enable": false,
? ? ? ? ? ? ? ? "key.converter":"org.apache.kafka.connect.json.JsonConverter",
? ? ? ? ? ? ? ? "key.converter.schemas.enable": false,
? ? ? ? ? ? ? ? "file":"/data/file_sink_01.txt"
? ? ? ? ? ? ? ? }
? ? ? ? }'
主題中的某些JSON格式消息是無效的暮蹂,連接器會立即終止寞缝,進入以下的FAILED狀態(tài):
$curl-s"http://localhost:8083/connectors/file_sink_01/status"|\jq -c -M'[.name,.tasks[].state]'["file_sink_01","FAILED"]
查看Kafka連接器工作節(jié)點的日志,可以看到錯誤已經記錄并且任務已經終止:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
…
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
…
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('b' (code 98)): was expecting double-quote to start field name
at [Source: (byte[])"{brokenjson-:"bar 1"}"; line: 1, column: 3]
要修復管道仰泻,需要解決源主題上的消息問題荆陆。除非事先指定,Kafka連接器是不會簡單地“跳過”無效消息的我纪。如果是配置錯誤(例如指定了錯誤的序列化轉換器)慎宾,那最好了,改正之后重新啟動連接器即可浅悉。不過如果確實是該主題的無效消息趟据,那么需要找到一種方式,即不要阻止所有其它有效消息的處理术健。
#靜默忽略無效的消息
如果只是希望處理一直持續(xù)下去:
errors.tolerance=all
1
在實際中大概如下:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
? ? ? ? "name": "file_sink_05",
? ? ? ? "config": {
? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
? ? ? ? ? ? ? ? "topics":"test_topic_json",
? ? ? ? ? ? ? ? "value.converter":"org.apache.kafka.connect.json.JsonConverter",
? ? ? ? ? ? ? ? "value.converter.schemas.enable": false,
? ? ? ? ? ? ? ? "key.converter":"org.apache.kafka.connect.json.JsonConverter",
? ? ? ? ? ? ? ? "key.converter.schemas.enable": false,
? ? ? ? ? ? ? ? "file":"/data/file_sink_05.txt",
? ? ? ? ? ? ? ? "errors.tolerance": "all"
? ? ? ? ? ? ? ? }
? ? ? ? }'
啟動連接器之后(還是原來的源主題汹碱,其中既有有效的,也有無效的消息)荞估,就可以持續(xù)地運行:
$ curl -s "http://localhost:8083/connectors/file_sink_05/status"| \
? ? jq -c -M '[.name,.tasks[].state]'
["file_sink_05","RUNNING"]
這時即使連接器讀取的源主題上有無效的消息咳促,也不會有錯誤寫入Kafka連接器工作節(jié)點的輸出,而有效的消息會按照預期寫入輸出文件:
$ head data/file_sink_05.txt
{foo=bar 1}
{foo=bar 2}
{foo=bar 3}
…
#是否可以感知數據的丟失勘伺?
配置了errors.tolerance = all之后跪腹,Kafka連接器就會忽略掉無效的消息,并且默認也不會記錄被丟棄的消息飞醉。如果確認配置errors.tolerance = all冲茸,那么就需要仔細考慮是否以及如何知道實際上發(fā)生的消息丟失。在實踐中這意味著基于可用指標的監(jiān)控/報警缅帘,和/或失敗消息的記錄轴术。
確定是否有消息被丟棄的最簡單方法,是將源主題上的消息數與寫入目標端的數量進行對比:
$ kafkacat -b localhost:9092 -t test_topic_json -o beginning -C -e -q -X enable.partition.eof=true | wc -l
? ? 150
$ wc -l data/file_sink_05.txt
? ? 100 data/file_sink_05.txt
這個做法雖然不是很優(yōu)雅钦无,但是確實能看出發(fā)生了消息的丟失逗栽,并且因為日志中沒有記錄,所以用戶仍然對此一無所知失暂。
一個更加可靠的辦法是彼宠,使用JMX指標來主動監(jiān)控和報警錯誤消息率:
這時可以看到發(fā)生了錯誤鳄虱,但是并不知道那些消息發(fā)生了錯誤,不過這是用戶想要的兵志。其實即使之后這些被丟棄的消息被寫入了/dev/null醇蝴,實際上也是可以知道的,這也正是死信隊列概念出現的點想罕。
#將消息路由到死信隊列
Kafka連接器可以配置為將無法處理的消息(例如上面提到的反序列化錯誤)發(fā)送到一個單獨的Kafka主題悠栓,即死信隊列。有效消息會正常處理按价,管道也會繼續(xù)運行惭适。然后可以從死信隊列中檢查無效消息,并根據需要忽略或修復并重新處理楼镐。
進行如下的配置可以啟用死信隊列:
errors.tolerance=allerrors.deadletterqueue.topic.name=
如果運行于單節(jié)點Kafka集群癞志,還需要配置errors.deadletterqueue.topic.replication.factor = 1,其默認值為3框产。
具有此配置的連接器配置示例大致如下:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
? ? ? ? "name": "file_sink_02",
? ? ? ? "config": {
? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
? ? ? ? ? ? ? ? "topics":"test_topic_json",
? ? ? ? ? ? ? ? "value.converter":"org.apache.kafka.connect.json.JsonConverter",
? ? ? ? ? ? ? ? "value.converter.schemas.enable": false,
? ? ? ? ? ? ? ? "key.converter":"org.apache.kafka.connect.json.JsonConverter",
? ? ? ? ? ? ? ? "key.converter.schemas.enable": false,
? ? ? ? ? ? ? ? "file": "/data/file_sink_02.txt",
? ? ? ? ? ? ? ? "errors.tolerance": "all",
? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.name":"dlq_file_sink_02",
? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.replication.factor": 1
? ? ? ? ? ? ? ? }
? ? ? ? }'
使用和之前相同的源主題凄杯,然后處理混合有有效和無效的JSON數據,會看到新的連接器可以穩(wěn)定運行:
$curl-s"http://localhost:8083/connectors/file_sink_02/status"|\jq -c -M'[.name,.tasks[].state]'["file_sink_02","RUNNING"]
源主題中的有效記錄將寫入目標文件:
$headdata/file_sink_02.txt{foo=bar1}{foo=bar2}{foo=bar3}[…]
這樣管道可以繼續(xù)正常運行秉宿,并且還有了死信隊列主題中的數據戒突,這可以從指標數據中看出:
檢查主題本身也可以看出來:
ksql> LIST TOPICS;
Kafka Topic? ? ? ? ? ? | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
---------------------------------------------------------------------------------------------------
dlq_file_sink_02? ? ? | false? ? ? | 1? ? ? ? ? | 1? ? ? ? ? ? ? ? ? | 0? ? ? ? | 0
test_topic_json? ? ? ? | false? ? ? | 1? ? ? ? ? | 1? ? ? ? ? ? ? ? ? | 1? ? ? ? | 1
---------------------------------------------------------------------------------------------------
ksql> PRINT 'dlq_file_sink_02' FROM BEGINNING;
Format:STRING
1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 1"}
1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 2"}
1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 3"}
…
從輸出中可以看出,消息的時間戳為(1/24/19 5:16:03 PM UTC)描睦,鍵為(NULL)膊存,然后為值。這時可以看到值是無效的JSON格式{foo:"bar 1"}(foo也應加上引號)忱叭,因此JsonConverter在處理時會拋出異常隔崎,因此最終會輸出到死信主題。
但是只有看到消息才能知道它是無效的JSON韵丑,即便如此爵卒,也只能假設消息被拒絕的原因,要確定Kafka連接器將消息視為無效的實際原因撵彻,有兩個方法:
死信隊列的消息頭钓株;
Kafka連接器的工作節(jié)點日志。
下面會分別介紹千康。
#記錄消息的失敗原因:消息頭
消息頭是使用Kafka消息的鍵享幽、值和時間戳存儲的附加元數據铲掐,是在Kafka 0.11版本中引入的拾弃。Kafka連接器可以將有關消息拒絕原因的信息寫入消息本身的消息頭中。這個做法比寫入日志文件更好摆霉,因為它將原因直接與消息聯(lián)系起來豪椿。
配置如下的參數奔坟,可以在死信隊列的消息頭中包含拒絕原因:
errors.deadletterqueue.context.headers.enable=true
配置示例大致如下:
curl-X POST http://localhost:8083/connectors -H"Content-Type: application/json"-d'{
? ? ? ? "name": "file_sink_03",
? ? ? ? "config": {
? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
? ? ? ? ? ? ? ? "topics":"test_topic_json",
? ? ? ? ? ? ? ? "value.converter":"org.apache.kafka.connect.json.JsonConverter",
? ? ? ? ? ? ? ? "value.converter.schemas.enable": false,
? ? ? ? ? ? ? ? "key.converter":"org.apache.kafka.connect.json.JsonConverter",
? ? ? ? ? ? ? ? "key.converter.schemas.enable": false,
? ? ? ? ? ? ? ? "file": "/data/file_sink_03.txt",
? ? ? ? ? ? ? ? "errors.tolerance": "all",
? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.name":"dlq_file_sink_03",
? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.replication.factor": 1,
? ? ? ? ? ? ? ? "errors.deadletterqueue.context.headers.enable":true
? ? ? ? ? ? ? ? }
? ? ? ? }'
和之前一致,連接器可以正常運行(因為配置了errors.tolerance=all)搭盾。
$curl-s"http://localhost:8083/connectors/file_sink_03/status"|\jq -c -M'[.name,.tasks[].state]'["file_sink_03","RUNNING"]
源主題中的有效消息會正常寫入目標文件:
$headdata/file_sink_03.txt{foo=bar1}{foo=bar2}{foo=bar3}[…]
可以使用任何消費者工具來檢查死信隊列上的消息(之前使用了KSQL)咳秉,不過這里會使用kafkacat,然后馬上就會看到原因鸯隅,最簡單的操作大致如下:
kafkacat -b localhost:9092 -t dlq_file_sink_03% Auto-selecting Consumer mode(use -P or -C to override){foo:"bar 1"}{foo:"bar 2"}…
不過kafkacat有更強大的功能澜建,可以看到比消息本身更多的信息:
kafkacat -b localhost:9092 -t dlq_file_sink_03 -C -o-1 -c1\-f'\nKey (%K bytes): %k? Value (%S bytes): %s? Timestamp: %T? Partition: %p? Offset: %o? Headers: %h\n'
這個命令將獲取最后一條消息(-o-1,針對偏移量蝌以,使用最后一條消息)炕舵,只讀取一條消息(-c1),并且通過-f參數對其進行格式化跟畅,以更易于理解:
Key (-1 bytes):
? Value (13 bytes): {foo:"bar 5"}
? Timestamp: 1548350164096
? Partition: 0
? Offset: 34
? Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
due to serialization error:
[…]
也可以只顯示消息頭咽筋,并使用一些簡單的技巧將其拆分,這樣可以更清楚地看到該問題的更多信息:
$ kafkacat -b localhost:9092 -t dlq_file_sink_03 -C -o-1 -c1 -f '%h'|tr ',' '\n'
__connect.errors.topic=test_topic_json
__connect.errors.partition=0
__connect.errors.offset=94
__connect.errors.connector.name=file_sink_03
__connect.errors.task.id=0
__connect.errors.stage=VALUE_CONVERTER
__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter
__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException
__connect.errors.exception.message=Converting byte[] to Kafka Connect data failed due to serialization error:
Kafka連接器處理的每條消息都來自源主題和該主題中的特定點(偏移量)徊件,消息頭已經準確地說明了這一點奸攻。因此可以使用它來回到原始主題并在需要時檢查原始消息,由于死信隊列已經有一個消息的副本虱痕,這個檢查更像是一個保險的做法睹耐。
根據從上面的消息頭中獲取的詳細信息,可以再檢查一下源消息:
__connect.errors.topic=test_topic_json
__connect.errors.offset=94
將這些值分別插入到kafkacat的代表主題和偏移的-t和-o參數中皆疹,可以得到:
$ kafkacat -b localhost:9092 -C\-t test_topic_json -o94\-f'\nKey (%K bytes): %k? Value (%S bytes): %s? Timestamp: %T? Partition: %p? Offset: %o? Topic: %t\n'
Key (-1 bytes):
? Value (13 bytes): {foo:"bar 5"}
? Timestamp: 1548350164096
? Partition: 0
? Offset: 94
? Topic: test_topic_json
與死信隊列中的上述消息相比疏橄,可以看到完全相同,甚至包括時間戳略就,唯一的區(qū)別是主題捎迫、偏移量和消息頭。
#記錄消息的失敗原因:日志
記錄消息的拒絕原因的第二個選項是將其寫入日志表牢。根據安裝方式不同窄绒,Kafka連接器會將其寫入標準輸出或日志文件。無論哪種方式都會為每個失敗的消息生成一堆詳細輸出崔兴。進行如下配置可啟用此功能:
errors.log.enable=true
通過配置errors.log.include.messages = true彰导,還可以在輸出中包含有關消息本身的元數據。此元數據中包括一些和上面提到的消息頭中一樣的項目敲茄,包括源消息的主題和偏移量位谋。注意它不包括消息鍵或值本身。
這時的連接器配置如下:
curl-X POST http://localhost:8083/connectors -H"Content-Type: application/json"-d'{
? ? ? ? "name": "file_sink_04",
? ? ? ? "config": {
? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
? ? ? ? ? ? ? ? "topics":"test_topic_json",
? ? ? ? ? ? ? ? "value.converter":"org.apache.kafka.connect.json.JsonConverter",
? ? ? ? ? ? ? ? "value.converter.schemas.enable": false,
? ? ? ? ? ? ? ? "key.converter":"org.apache.kafka.connect.json.JsonConverter",
? ? ? ? ? ? ? ? "key.converter.schemas.enable": false,
? ? ? ? ? ? ? ? "file": "/data/file_sink_04.txt",
? ? ? ? ? ? ? ? "errors.tolerance": "all",
? ? ? ? ? ? ? ? "errors.log.enable":true,
? ? ? ? ? ? ? ? "errors.log.include.messages":true
? ? ? ? ? ? ? ? }
? ? ? ? }'
連接器是可以成功運行的:
$curl-s"http://localhost:8083/connectors/file_sink_04/status"|\jq -c -M'[.name,.tasks[].state]'["file_sink_04","RUNNING"]Valid records from thesourcetopic get written to the target file:$headdata/file_sink_04.txt{foo=bar1}{foo=bar2}{foo=bar3}[…]
這時去看Kafka連接器的工作節(jié)點日志堰燎,會發(fā)現每個失敗的消息都有錯誤記錄:
ERROR Error encountered in task file_sink_04-0. Executing stage 'VALUE_CONVERTER' with class 'org.apache.kafka.connect.json.JsonConverter', where consumed record is {topic='test_topic_json', partition=0, offset=94, timestamp=1548350164096, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
[…]
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('f' (code 102)): was expecting double-quote to start field name
at [Source: (byte[])"{foo:"bar 5"}"; line: 1, column: 3]
可以看到錯誤本身掏父,還有就是和錯誤有關的信息:
{topic='test_topic_json', partition=0, offset=94, timestamp=1548350164096, timestampType=CreateTime}
如上所示,可以在kafkacat等工具中使用該主題和偏移量來檢查源主題上的消息秆剪。根據拋出的異常也可能會看到記錄的源消息:
Caused by: org.apache.kafka.common.errors.SerializationException:
…
at [Source: (byte[])"{foo:"bar 5"}"; line: 1, column: 3]
#處理死信隊列的消息
雖然設置了一個死信隊列赊淑,但是如何處理那些“死信”呢爵政?因為它只是一個Kafka主題,所以可以像使用任何其它主題一樣使用標準的Kafka工具陶缺。上面已經看到了钾挟,比如可以使用kafkacat來檢查消息頭,并且對于消息的內容及其元數據的一般檢查kafkacat也可以做饱岸。當然根據被拒絕的原因掺出,也可以選擇對消息進行重播。
一個場景是連接器正在使用Avro轉換器苫费,但是主題上的卻是JSON格式消息(因此被寫入死信隊列)蛛砰。可能由于遺留原因JSON和Avro格式的生產者都在寫入源主題黍衙,這個問題得解決泥畅,但是目前只需要將管道流中的數據寫入接收器即可。
首先琅翻,從初始的接收器讀取源主題開始位仁,使用Avro反序列化并路由到死信隊列:
curl-X POST http://localhost:8083/connectors -H"Content-Type: application/json"-d'{
? ? ? ? "name": "file_sink_06__01-avro",
? ? ? ? "config": {
? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
? ? ? ? ? ? ? ? "topics":"test_topic_avro",
? ? ? ? ? ? ? ? "file":"/data/file_sink_06.txt",
? ? ? ? ? ? ? ? "key.converter": "io.confluent.connect.avro.AvroConverter",
? ? ? ? ? ? ? ? "key.converter.schema.registry.url": "http://schema-registry:8081",
? ? ? ? ? ? ? ? "value.converter": "io.confluent.connect.avro.AvroConverter",
? ? ? ? ? ? ? ? "value.converter.schema.registry.url": "http://schema-registry:8081",
? ? ? ? ? ? ? ? "errors.tolerance":"all",
? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.name":"dlq_file_sink_06__01",
? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.replication.factor":1,
? ? ? ? ? ? ? ? "errors.deadletterqueue.context.headers.enable":true,
? ? ? ? ? ? ? ? "errors.retry.delay.max.ms": 60000,
? ? ? ? ? ? ? ? "errors.retry.timeout": 300000
? ? ? ? ? ? ? ? }
? ? ? ? }'
另外再創(chuàng)建第二個接收器,將第一個接收器的死信隊列作為源主題方椎,并嘗試將記錄反序列化為JSON聂抢,在這里要更改的是value.converter、key.converter棠众、源主題名和死信隊列名(如果此連接器需要將任何消息路由到死信隊列琳疏,要避免遞歸)。
curl-X POST http://localhost:8083/connectors -H"Content-Type: application/json"-d'{
? ? ? ? "name": "file_sink_06__02-json",
? ? ? ? "config": {
? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
? ? ? ? ? ? ? ? "topics":"dlq_file_sink_06__01",
? ? ? ? ? ? ? ? "file":"/data/file_sink_06.txt",
? ? ? ? ? ? ? ? "value.converter":"org.apache.kafka.connect.json.JsonConverter",
? ? ? ? ? ? ? ? "value.converter.schemas.enable": false,
? ? ? ? ? ? ? ? "key.converter":"org.apache.kafka.connect.json.JsonConverter",
? ? ? ? ? ? ? ? "key.converter.schemas.enable": false,
? ? ? ? ? ? ? ? "errors.tolerance":"all",
? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.name":"dlq_file_sink_06__02",
? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.replication.factor":1,
? ? ? ? ? ? ? ? "errors.deadletterqueue.context.headers.enable":true,
? ? ? ? ? ? ? ? "errors.retry.delay.max.ms": 60000,
? ? ? ? ? ? ? ? "errors.retry.timeout": 300000
? ? ? ? ? ? ? ? }
? ? ? ? }'
現在可以驗證一下闸拿。
首先空盼,源主題收到20條Avro消息,之后可以看到20條消息被讀取并被原始Avro接收器接收:
然后發(fā)送8條JSON消息新荤,這時8條消息被發(fā)送到死信隊列揽趾,然后被JSON接收器接收:
現在再發(fā)送5條格式錯誤的JSON消息,之后可以看到兩者都有失敗的消息苛骨,有2點可以確認:
從Avro接收器發(fā)送到死信隊列的消息數與成功發(fā)送的JSON消息數之間有差異篱瞎;
消息被發(fā)送到JSON接收器的死信隊列。
#通過KSQL監(jiān)控死信隊列
除了使用JMX監(jiān)控死信隊列之外痒芝,還可以利用KSQL的聚合能力編寫一個簡單的流應用來監(jiān)控消息寫入隊列的速率:
-- 為每個死信隊列主題注冊流俐筋。CREATESTREAM dlq_file_sink_06__01(MSGVARCHAR)WITH(KAFKA_TOPIC='dlq_file_sink_06__01',VALUE_FORMAT='DELIMITED');CREATESTREAM dlq_file_sink_06__02(MSGVARCHAR)WITH(KAFKA_TOPIC='dlq_file_sink_06__02'严衬,VALUE_FORMAT='DELIMITED');-- 從主題的開頭消費數據SET'auto.offset.reset'='earliest';-- 使用其它列創(chuàng)建監(jiān)控流澄者,可用于后續(xù)聚合查詢CREATESTREAM DLQ_MONITORWITH(VALUE_FORMAT='AVRO')AS\SELECT'dlq_file_sink_06__01'ASSINK_NAME,\'Records: 'ASGROUP_COL,\? ? ? ? MSG \FROMdlq_file_sink_06__01;-- 使用來自第二個死信隊列的消息注入相同的監(jiān)控流INSERTINTODLQ_MONITOR \SELECT'dlq_file_sink_06__02'ASSINK_NAME,\'Records: 'ASGROUP_COL,\? ? ? ? MSG \FROMdlq_file_sink_06__02;-- 在每個死信隊列每分鐘的時間窗口內,創(chuàng)建消息的聚合視圖CREATETABLEDLQ_MESSAGE_COUNT_PER_MINAS\SELECTTIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss')ASSTART_TS,\? ? ? ? SINK_NAME,\? ? ? ? GROUP_COL,\COUNT(*)ASDLQ_MESSAGE_COUNT \FROMDLQ_MONITOR \? ? ? ? ? WINDOW TUMBLING(SIZE1MINUTE)\GROUPBYSINK_NAME,\? ? ? ? ? GROUP_COL;
這個聚合表可以以交互式的方式進行查詢,下面顯示了一分鐘內每個死信隊列中的消息數量:
ksql> SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_MESSAGE_COUNT_PER_MIN;
2019-02-01 02:56:00 | dlq_file_sink_06__01 | 9
2019-02-01 03:10:00 | dlq_file_sink_06__01 | 8
2019-02-01 03:12:00 | dlq_file_sink_06__01 | 5
2019-02-01 02:56:00 | dlq_file_sink_06__02 | 5
2019-02-01 03:12:00 | dlq_file_sink_06__02 | 5
因為這個表的下面是Kafka主題闷哆,所以可以將其路由到期望的任何監(jiān)控儀表盤,還可以用于驅動告警单起。假定有幾條錯誤消息是可以接受的抱怔,但是一分鐘內超過5條消息就是個大問題需要關注:
CREATETABLEDLQ_BREACHAS\SELECTSTART_TS,SINK_NAME,DLQ_MESSAGE_COUNT \FROMDLQ_MESSAGE_COUNT_PER_MIN \WHEREDLQ_MESSAGE_COUNT>5;
現在又有了一個報警服務可以訂閱的DLQ_BREACH主題,當收到任何消息時嘀倒,可以觸發(fā)適當的操作(例如通知)屈留。
ksql> SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_BREACH;
2019-02-01 02:56:00 | dlq_file_sink_06__01 | 9
2019-02-01 03:10:00 | dlq_file_sink_06__01 | 8
#Kafka連接器哪里沒有提供錯誤處理?
Kafka連接器的錯誤處理方式测蘑,如下表所示:
連接器生命周期階段描述是否處理錯誤灌危?
開始首次啟動連接器時,其將執(zhí)行必要的初始化碳胳,例如連接到數據存儲無
拉扔买(針對源連接器)從源數據存儲讀取消息無
格式轉換從Kafka主題讀寫數據并對JSON/Avro格式進行序列化/反序列化有
單消息轉換應用任何已配置的單消息轉換有
接收(針對接收連接器)將消息寫入目標數據存儲無
注意源連接器沒有死信隊列。
#錯誤處理配置流程
關于連接器錯誤處理的配置挨约,可以按照如下的流程一步步進階:
#總結
處理錯誤是任何穩(wěn)定可靠的數據管道的重要組成部分味混,根據數據的使用方式,可以有兩個選項诫惭。如果管道任何錯誤的消息都不能接受翁锡,表明上游存在嚴重的問題,那么就應該立即停止處理(這是Kafka連接器的默認行為)夕土。
另一方面馆衔,如果只是想將數據流式傳輸到存儲以進行分析或非關鍵性處理,那么只要不傳播錯誤怨绣,保持管道穩(wěn)定運行則更為重要角溃。這時就可以定義錯誤的處理方式,推薦的方式是使用死信隊列并密切監(jiān)視來自Kafka連接器的可用JMX指標篮撑。