Kafka連接器深度解讀之錯誤處理和死信隊列

復制自:https://liyuj.gitee.io/confluent/Kafka-ErrorHandlingDeadLetterQueues.html

Kafka連接器是Kafka的一部分,是在Kafka和其它技術之間構建流式管道的一個強有力的框架。它可用于將數據從多個地方(包括數據庫、消息隊列和文本文件)流式注入到Kafka揩局,以及從Kafka將數據流式傳輸到目標端(如文檔存儲灵莲、NoSQL弛针、數據庫、對象存儲等)中棠绘。

現實世界并不完美翠语,出錯是難免的,因此在出錯時Kafka的管道能盡可能優(yōu)雅地處理是最好的财边。一個常見的場景是獲取與特定序列化格式不匹配的主題的消息(比如預期為Avro時實際為JSON肌括,反之亦然)。自從Kafka 2.0版本發(fā)布以來酣难,Kafka連接器包含了錯誤處理選項谍夭,即將消息路由到死信隊列的功能,這是構建數據管道的常用技術憨募。

在本文中將介紹幾種處理問題的常見模式紧索,并說明如何實現。

#失敗后立即停止

有時可能希望在發(fā)生錯誤時立即停止處理菜谣,可能遇到質量差的數據是由于上游的原因導致的珠漂,必須由上游來解決晚缩,繼續(xù)嘗試處理其它的消息已經沒有意義。

這是Kafka連接器的默認行為媳危,也可以使用下面的配置項顯式地指定:

errors.tolerance=none

在本示例中荞彼,該連接器配置為從主題中讀取JSON格式數據,然后將其寫入純文本文件待笑。注意這里為了演示使用的是FileStreamSinkConnector連接器鸣皂,不建議在生產中使用。

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{

? ? ? ? "name": "file_sink_01",

? ? ? ? "config": {

? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",

? ? ? ? ? ? ? ? "topics":"test_topic_json",

? ? ? ? ? ? ? ? "value.converter":"org.apache.kafka.connect.json.JsonConverter",

? ? ? ? ? ? ? ? "value.converter.schemas.enable": false,

? ? ? ? ? ? ? ? "key.converter":"org.apache.kafka.connect.json.JsonConverter",

? ? ? ? ? ? ? ? "key.converter.schemas.enable": false,

? ? ? ? ? ? ? ? "file":"/data/file_sink_01.txt"

? ? ? ? ? ? ? ? }

? ? ? ? }'

主題中的某些JSON格式消息是無效的暮蹂,連接器會立即終止寞缝,進入以下的FAILED狀態(tài):

$curl-s"http://localhost:8083/connectors/file_sink_01/status"|\jq -c -M'[.name,.tasks[].state]'["file_sink_01","FAILED"]

查看Kafka連接器工作節(jié)點的日志,可以看到錯誤已經記錄并且任務已經終止:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)

Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('b' (code 98)): was expecting double-quote to start field name

at [Source: (byte[])"{brokenjson-:"bar 1"}"; line: 1, column: 3]

要修復管道仰泻,需要解決源主題上的消息問題荆陆。除非事先指定,Kafka連接器是不會簡單地“跳過”無效消息的我纪。如果是配置錯誤(例如指定了錯誤的序列化轉換器)慎宾,那最好了,改正之后重新啟動連接器即可浅悉。不過如果確實是該主題的無效消息趟据,那么需要找到一種方式,即不要阻止所有其它有效消息的處理术健。

#靜默忽略無效的消息

如果只是希望處理一直持續(xù)下去:

errors.tolerance=all

1

在實際中大概如下:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{

? ? ? ? "name": "file_sink_05",

? ? ? ? "config": {

? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",

? ? ? ? ? ? ? ? "topics":"test_topic_json",

? ? ? ? ? ? ? ? "value.converter":"org.apache.kafka.connect.json.JsonConverter",

? ? ? ? ? ? ? ? "value.converter.schemas.enable": false,

? ? ? ? ? ? ? ? "key.converter":"org.apache.kafka.connect.json.JsonConverter",

? ? ? ? ? ? ? ? "key.converter.schemas.enable": false,

? ? ? ? ? ? ? ? "file":"/data/file_sink_05.txt",

? ? ? ? ? ? ? ? "errors.tolerance": "all"

? ? ? ? ? ? ? ? }

? ? ? ? }'

啟動連接器之后(還是原來的源主題汹碱,其中既有有效的,也有無效的消息)荞估,就可以持續(xù)地運行:

$ curl -s "http://localhost:8083/connectors/file_sink_05/status"| \

? ? jq -c -M '[.name,.tasks[].state]'

["file_sink_05","RUNNING"]

這時即使連接器讀取的源主題上有無效的消息咳促,也不會有錯誤寫入Kafka連接器工作節(jié)點的輸出,而有效的消息會按照預期寫入輸出文件:

$ head data/file_sink_05.txt

{foo=bar 1}

{foo=bar 2}

{foo=bar 3}

#是否可以感知數據的丟失勘伺?

配置了errors.tolerance = all之后跪腹,Kafka連接器就會忽略掉無效的消息,并且默認也不會記錄被丟棄的消息飞醉。如果確認配置errors.tolerance = all冲茸,那么就需要仔細考慮是否以及如何知道實際上發(fā)生的消息丟失。在實踐中這意味著基于可用指標的監(jiān)控/報警缅帘,和/或失敗消息的記錄轴术。

確定是否有消息被丟棄的最簡單方法,是將源主題上的消息數與寫入目標端的數量進行對比:

$ kafkacat -b localhost:9092 -t test_topic_json -o beginning -C -e -q -X enable.partition.eof=true | wc -l

? ? 150

$ wc -l data/file_sink_05.txt

? ? 100 data/file_sink_05.txt

這個做法雖然不是很優(yōu)雅钦无,但是確實能看出發(fā)生了消息的丟失逗栽,并且因為日志中沒有記錄,所以用戶仍然對此一無所知失暂。

一個更加可靠的辦法是彼宠,使用JMX指標來主動監(jiān)控和報警錯誤消息率:

這時可以看到發(fā)生了錯誤鳄虱,但是并不知道那些消息發(fā)生了錯誤,不過這是用戶想要的兵志。其實即使之后這些被丟棄的消息被寫入了/dev/null醇蝴,實際上也是可以知道的,這也正是死信隊列概念出現的點想罕。

#將消息路由到死信隊列

Kafka連接器可以配置為將無法處理的消息(例如上面提到的反序列化錯誤)發(fā)送到一個單獨的Kafka主題悠栓,即死信隊列。有效消息會正常處理按价,管道也會繼續(xù)運行惭适。然后可以從死信隊列中檢查無效消息,并根據需要忽略或修復并重新處理楼镐。

進行如下的配置可以啟用死信隊列:

errors.tolerance=allerrors.deadletterqueue.topic.name=

如果運行于單節(jié)點Kafka集群癞志,還需要配置errors.deadletterqueue.topic.replication.factor = 1,其默認值為3框产。

具有此配置的連接器配置示例大致如下:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{

? ? ? ? "name": "file_sink_02",

? ? ? ? "config": {

? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",

? ? ? ? ? ? ? ? "topics":"test_topic_json",

? ? ? ? ? ? ? ? "value.converter":"org.apache.kafka.connect.json.JsonConverter",

? ? ? ? ? ? ? ? "value.converter.schemas.enable": false,

? ? ? ? ? ? ? ? "key.converter":"org.apache.kafka.connect.json.JsonConverter",

? ? ? ? ? ? ? ? "key.converter.schemas.enable": false,

? ? ? ? ? ? ? ? "file": "/data/file_sink_02.txt",

? ? ? ? ? ? ? ? "errors.tolerance": "all",

? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.name":"dlq_file_sink_02",

? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.replication.factor": 1

? ? ? ? ? ? ? ? }

? ? ? ? }'

使用和之前相同的源主題凄杯,然后處理混合有有效和無效的JSON數據,會看到新的連接器可以穩(wěn)定運行:

$curl-s"http://localhost:8083/connectors/file_sink_02/status"|\jq -c -M'[.name,.tasks[].state]'["file_sink_02","RUNNING"]

源主題中的有效記錄將寫入目標文件:

$headdata/file_sink_02.txt{foo=bar1}{foo=bar2}{foo=bar3}[…]

這樣管道可以繼續(xù)正常運行秉宿,并且還有了死信隊列主題中的數據戒突,這可以從指標數據中看出:

檢查主題本身也可以看出來:

ksql> LIST TOPICS;

Kafka Topic? ? ? ? ? ? | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups

---------------------------------------------------------------------------------------------------

dlq_file_sink_02? ? ? | false? ? ? | 1? ? ? ? ? | 1? ? ? ? ? ? ? ? ? | 0? ? ? ? | 0

test_topic_json? ? ? ? | false? ? ? | 1? ? ? ? ? | 1? ? ? ? ? ? ? ? ? | 1? ? ? ? | 1

---------------------------------------------------------------------------------------------------

ksql> PRINT 'dlq_file_sink_02' FROM BEGINNING;

Format:STRING

1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 1"}

1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 2"}

1/24/19 5:16:03 PM UTC , NULL , {foo:"bar 3"}

從輸出中可以看出,消息的時間戳為(1/24/19 5:16:03 PM UTC)描睦,鍵為(NULL)膊存,然后為值。這時可以看到值是無效的JSON格式{foo:"bar 1"}(foo也應加上引號)忱叭,因此JsonConverter在處理時會拋出異常隔崎,因此最終會輸出到死信主題。

但是只有看到消息才能知道它是無效的JSON韵丑,即便如此爵卒,也只能假設消息被拒絕的原因,要確定Kafka連接器將消息視為無效的實際原因撵彻,有兩個方法:

死信隊列的消息頭钓株;

Kafka連接器的工作節(jié)點日志。

下面會分別介紹千康。

#記錄消息的失敗原因:消息頭

消息頭是使用Kafka消息的鍵享幽、值和時間戳存儲的附加元數據铲掐,是在Kafka 0.11版本中引入的拾弃。Kafka連接器可以將有關消息拒絕原因的信息寫入消息本身的消息頭中。這個做法比寫入日志文件更好摆霉,因為它將原因直接與消息聯(lián)系起來豪椿。

配置如下的參數奔坟,可以在死信隊列的消息頭中包含拒絕原因:

errors.deadletterqueue.context.headers.enable=true

配置示例大致如下:

curl-X POST http://localhost:8083/connectors -H"Content-Type: application/json"-d'{

? ? ? ? "name": "file_sink_03",

? ? ? ? "config": {

? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",

? ? ? ? ? ? ? ? "topics":"test_topic_json",

? ? ? ? ? ? ? ? "value.converter":"org.apache.kafka.connect.json.JsonConverter",

? ? ? ? ? ? ? ? "value.converter.schemas.enable": false,

? ? ? ? ? ? ? ? "key.converter":"org.apache.kafka.connect.json.JsonConverter",

? ? ? ? ? ? ? ? "key.converter.schemas.enable": false,

? ? ? ? ? ? ? ? "file": "/data/file_sink_03.txt",

? ? ? ? ? ? ? ? "errors.tolerance": "all",

? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.name":"dlq_file_sink_03",

? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.replication.factor": 1,

? ? ? ? ? ? ? ? "errors.deadletterqueue.context.headers.enable":true

? ? ? ? ? ? ? ? }

? ? ? ? }'

和之前一致,連接器可以正常運行(因為配置了errors.tolerance=all)搭盾。

$curl-s"http://localhost:8083/connectors/file_sink_03/status"|\jq -c -M'[.name,.tasks[].state]'["file_sink_03","RUNNING"]

源主題中的有效消息會正常寫入目標文件:

$headdata/file_sink_03.txt{foo=bar1}{foo=bar2}{foo=bar3}[…]

可以使用任何消費者工具來檢查死信隊列上的消息(之前使用了KSQL)咳秉,不過這里會使用kafkacat,然后馬上就會看到原因鸯隅,最簡單的操作大致如下:

kafkacat -b localhost:9092 -t dlq_file_sink_03% Auto-selecting Consumer mode(use -P or -C to override){foo:"bar 1"}{foo:"bar 2"}…

不過kafkacat有更強大的功能澜建,可以看到比消息本身更多的信息:

kafkacat -b localhost:9092 -t dlq_file_sink_03 -C -o-1 -c1\-f'\nKey (%K bytes): %k? Value (%S bytes): %s? Timestamp: %T? Partition: %p? Offset: %o? Headers: %h\n'

這個命令將獲取最后一條消息(-o-1,針對偏移量蝌以,使用最后一條消息)炕舵,只讀取一條消息(-c1),并且通過-f參數對其進行格式化跟畅,以更易于理解:

Key (-1 bytes):

? Value (13 bytes): {foo:"bar 5"}

? Timestamp: 1548350164096

? Partition: 0

? Offset: 34

? Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU

E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co

nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed

due to serialization error:

[…]

也可以只顯示消息頭咽筋,并使用一些簡單的技巧將其拆分,這樣可以更清楚地看到該問題的更多信息:

$ kafkacat -b localhost:9092 -t dlq_file_sink_03 -C -o-1 -c1 -f '%h'|tr ',' '\n'

__connect.errors.topic=test_topic_json

__connect.errors.partition=0

__connect.errors.offset=94

__connect.errors.connector.name=file_sink_03

__connect.errors.task.id=0

__connect.errors.stage=VALUE_CONVERTER

__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter

__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException

__connect.errors.exception.message=Converting byte[] to Kafka Connect data failed due to serialization error:

Kafka連接器處理的每條消息都來自源主題和該主題中的特定點(偏移量)徊件,消息頭已經準確地說明了這一點奸攻。因此可以使用它來回到原始主題并在需要時檢查原始消息,由于死信隊列已經有一個消息的副本虱痕,這個檢查更像是一個保險的做法睹耐。

根據從上面的消息頭中獲取的詳細信息,可以再檢查一下源消息:

__connect.errors.topic=test_topic_json

__connect.errors.offset=94

將這些值分別插入到kafkacat的代表主題和偏移的-t和-o參數中皆疹,可以得到:

$ kafkacat -b localhost:9092 -C\-t test_topic_json -o94\-f'\nKey (%K bytes): %k? Value (%S bytes): %s? Timestamp: %T? Partition: %p? Offset: %o? Topic: %t\n'

Key (-1 bytes):

? Value (13 bytes): {foo:"bar 5"}

? Timestamp: 1548350164096

? Partition: 0

? Offset: 94

? Topic: test_topic_json

與死信隊列中的上述消息相比疏橄,可以看到完全相同,甚至包括時間戳略就,唯一的區(qū)別是主題捎迫、偏移量和消息頭。

#記錄消息的失敗原因:日志

記錄消息的拒絕原因的第二個選項是將其寫入日志表牢。根據安裝方式不同窄绒,Kafka連接器會將其寫入標準輸出或日志文件。無論哪種方式都會為每個失敗的消息生成一堆詳細輸出崔兴。進行如下配置可啟用此功能:

errors.log.enable=true

通過配置errors.log.include.messages = true彰导,還可以在輸出中包含有關消息本身的元數據。此元數據中包括一些和上面提到的消息頭中一樣的項目敲茄,包括源消息的主題和偏移量位谋。注意它不包括消息鍵或值本身。

這時的連接器配置如下:

curl-X POST http://localhost:8083/connectors -H"Content-Type: application/json"-d'{

? ? ? ? "name": "file_sink_04",

? ? ? ? "config": {

? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",

? ? ? ? ? ? ? ? "topics":"test_topic_json",

? ? ? ? ? ? ? ? "value.converter":"org.apache.kafka.connect.json.JsonConverter",

? ? ? ? ? ? ? ? "value.converter.schemas.enable": false,

? ? ? ? ? ? ? ? "key.converter":"org.apache.kafka.connect.json.JsonConverter",

? ? ? ? ? ? ? ? "key.converter.schemas.enable": false,

? ? ? ? ? ? ? ? "file": "/data/file_sink_04.txt",

? ? ? ? ? ? ? ? "errors.tolerance": "all",

? ? ? ? ? ? ? ? "errors.log.enable":true,

? ? ? ? ? ? ? ? "errors.log.include.messages":true

? ? ? ? ? ? ? ? }

? ? ? ? }'

連接器是可以成功運行的:

$curl-s"http://localhost:8083/connectors/file_sink_04/status"|\jq -c -M'[.name,.tasks[].state]'["file_sink_04","RUNNING"]Valid records from thesourcetopic get written to the target file:$headdata/file_sink_04.txt{foo=bar1}{foo=bar2}{foo=bar3}[…]

這時去看Kafka連接器的工作節(jié)點日志堰燎,會發(fā)現每個失敗的消息都有錯誤記錄:

ERROR Error encountered in task file_sink_04-0. Executing stage 'VALUE_CONVERTER' with class 'org.apache.kafka.connect.json.JsonConverter', where consumed record is {topic='test_topic_json', partition=0, offset=94, timestamp=1548350164096, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)

[…]

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('f' (code 102)): was expecting double-quote to start field name

at [Source: (byte[])"{foo:"bar 5"}"; line: 1, column: 3]

可以看到錯誤本身掏父,還有就是和錯誤有關的信息:

{topic='test_topic_json', partition=0, offset=94, timestamp=1548350164096, timestampType=CreateTime}

如上所示,可以在kafkacat等工具中使用該主題和偏移量來檢查源主題上的消息秆剪。根據拋出的異常也可能會看到記錄的源消息:

Caused by: org.apache.kafka.common.errors.SerializationException:

at [Source: (byte[])"{foo:"bar 5"}"; line: 1, column: 3]

#處理死信隊列的消息

雖然設置了一個死信隊列赊淑,但是如何處理那些“死信”呢爵政?因為它只是一個Kafka主題,所以可以像使用任何其它主題一樣使用標準的Kafka工具陶缺。上面已經看到了钾挟,比如可以使用kafkacat來檢查消息頭,并且對于消息的內容及其元數據的一般檢查kafkacat也可以做饱岸。當然根據被拒絕的原因掺出,也可以選擇對消息進行重播。

一個場景是連接器正在使用Avro轉換器苫费,但是主題上的卻是JSON格式消息(因此被寫入死信隊列)蛛砰。可能由于遺留原因JSON和Avro格式的生產者都在寫入源主題黍衙,這個問題得解決泥畅,但是目前只需要將管道流中的數據寫入接收器即可。

首先琅翻,從初始的接收器讀取源主題開始位仁,使用Avro反序列化并路由到死信隊列:

curl-X POST http://localhost:8083/connectors -H"Content-Type: application/json"-d'{

? ? ? ? "name": "file_sink_06__01-avro",

? ? ? ? "config": {

? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",

? ? ? ? ? ? ? ? "topics":"test_topic_avro",

? ? ? ? ? ? ? ? "file":"/data/file_sink_06.txt",

? ? ? ? ? ? ? ? "key.converter": "io.confluent.connect.avro.AvroConverter",

? ? ? ? ? ? ? ? "key.converter.schema.registry.url": "http://schema-registry:8081",

? ? ? ? ? ? ? ? "value.converter": "io.confluent.connect.avro.AvroConverter",

? ? ? ? ? ? ? ? "value.converter.schema.registry.url": "http://schema-registry:8081",

? ? ? ? ? ? ? ? "errors.tolerance":"all",

? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.name":"dlq_file_sink_06__01",

? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.replication.factor":1,

? ? ? ? ? ? ? ? "errors.deadletterqueue.context.headers.enable":true,

? ? ? ? ? ? ? ? "errors.retry.delay.max.ms": 60000,

? ? ? ? ? ? ? ? "errors.retry.timeout": 300000

? ? ? ? ? ? ? ? }

? ? ? ? }'

另外再創(chuàng)建第二個接收器,將第一個接收器的死信隊列作為源主題方椎,并嘗試將記錄反序列化為JSON聂抢,在這里要更改的是value.converter、key.converter棠众、源主題名和死信隊列名(如果此連接器需要將任何消息路由到死信隊列琳疏,要避免遞歸)。

curl-X POST http://localhost:8083/connectors -H"Content-Type: application/json"-d'{

? ? ? ? "name": "file_sink_06__02-json",

? ? ? ? "config": {

? ? ? ? ? ? ? ? "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",

? ? ? ? ? ? ? ? "topics":"dlq_file_sink_06__01",

? ? ? ? ? ? ? ? "file":"/data/file_sink_06.txt",

? ? ? ? ? ? ? ? "value.converter":"org.apache.kafka.connect.json.JsonConverter",

? ? ? ? ? ? ? ? "value.converter.schemas.enable": false,

? ? ? ? ? ? ? ? "key.converter":"org.apache.kafka.connect.json.JsonConverter",

? ? ? ? ? ? ? ? "key.converter.schemas.enable": false,

? ? ? ? ? ? ? ? "errors.tolerance":"all",

? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.name":"dlq_file_sink_06__02",

? ? ? ? ? ? ? ? "errors.deadletterqueue.topic.replication.factor":1,

? ? ? ? ? ? ? ? "errors.deadletterqueue.context.headers.enable":true,

? ? ? ? ? ? ? ? "errors.retry.delay.max.ms": 60000,

? ? ? ? ? ? ? ? "errors.retry.timeout": 300000

? ? ? ? ? ? ? ? }

? ? ? ? }'

現在可以驗證一下闸拿。

首先空盼,源主題收到20條Avro消息,之后可以看到20條消息被讀取并被原始Avro接收器接收:

然后發(fā)送8條JSON消息新荤,這時8條消息被發(fā)送到死信隊列揽趾,然后被JSON接收器接收:

現在再發(fā)送5條格式錯誤的JSON消息,之后可以看到兩者都有失敗的消息苛骨,有2點可以確認:

從Avro接收器發(fā)送到死信隊列的消息數與成功發(fā)送的JSON消息數之間有差異篱瞎;

消息被發(fā)送到JSON接收器的死信隊列。

#通過KSQL監(jiān)控死信隊列

除了使用JMX監(jiān)控死信隊列之外痒芝,還可以利用KSQL的聚合能力編寫一個簡單的流應用來監(jiān)控消息寫入隊列的速率:

-- 為每個死信隊列主題注冊流俐筋。CREATESTREAM dlq_file_sink_06__01(MSGVARCHAR)WITH(KAFKA_TOPIC='dlq_file_sink_06__01',VALUE_FORMAT='DELIMITED');CREATESTREAM dlq_file_sink_06__02(MSGVARCHAR)WITH(KAFKA_TOPIC='dlq_file_sink_06__02'严衬,VALUE_FORMAT='DELIMITED');-- 從主題的開頭消費數據SET'auto.offset.reset'='earliest';-- 使用其它列創(chuàng)建監(jiān)控流澄者,可用于后續(xù)聚合查詢CREATESTREAM DLQ_MONITORWITH(VALUE_FORMAT='AVRO')AS\SELECT'dlq_file_sink_06__01'ASSINK_NAME,\'Records: 'ASGROUP_COL,\? ? ? ? MSG \FROMdlq_file_sink_06__01;-- 使用來自第二個死信隊列的消息注入相同的監(jiān)控流INSERTINTODLQ_MONITOR \SELECT'dlq_file_sink_06__02'ASSINK_NAME,\'Records: 'ASGROUP_COL,\? ? ? ? MSG \FROMdlq_file_sink_06__02;-- 在每個死信隊列每分鐘的時間窗口內,創(chuàng)建消息的聚合視圖CREATETABLEDLQ_MESSAGE_COUNT_PER_MINAS\SELECTTIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss')ASSTART_TS,\? ? ? ? SINK_NAME,\? ? ? ? GROUP_COL,\COUNT(*)ASDLQ_MESSAGE_COUNT \FROMDLQ_MONITOR \? ? ? ? ? WINDOW TUMBLING(SIZE1MINUTE)\GROUPBYSINK_NAME,\? ? ? ? ? GROUP_COL;

這個聚合表可以以交互式的方式進行查詢,下面顯示了一分鐘內每個死信隊列中的消息數量:

ksql> SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_MESSAGE_COUNT_PER_MIN;

2019-02-01 02:56:00 | dlq_file_sink_06__01 | 9

2019-02-01 03:10:00 | dlq_file_sink_06__01 | 8

2019-02-01 03:12:00 | dlq_file_sink_06__01 | 5

2019-02-01 02:56:00 | dlq_file_sink_06__02 | 5

2019-02-01 03:12:00 | dlq_file_sink_06__02 | 5

因為這個表的下面是Kafka主題闷哆,所以可以將其路由到期望的任何監(jiān)控儀表盤,還可以用于驅動告警单起。假定有幾條錯誤消息是可以接受的抱怔,但是一分鐘內超過5條消息就是個大問題需要關注:

CREATETABLEDLQ_BREACHAS\SELECTSTART_TS,SINK_NAME,DLQ_MESSAGE_COUNT \FROMDLQ_MESSAGE_COUNT_PER_MIN \WHEREDLQ_MESSAGE_COUNT>5;

現在又有了一個報警服務可以訂閱的DLQ_BREACH主題,當收到任何消息時嘀倒,可以觸發(fā)適當的操作(例如通知)屈留。

ksql> SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_BREACH;

2019-02-01 02:56:00 | dlq_file_sink_06__01 | 9

2019-02-01 03:10:00 | dlq_file_sink_06__01 | 8

#Kafka連接器哪里沒有提供錯誤處理?

Kafka連接器的錯誤處理方式测蘑,如下表所示:

連接器生命周期階段描述是否處理錯誤灌危?

開始首次啟動連接器時,其將執(zhí)行必要的初始化碳胳,例如連接到數據存儲無

拉扔买(針對源連接器)從源數據存儲讀取消息無

格式轉換從Kafka主題讀寫數據并對JSON/Avro格式進行序列化/反序列化有

單消息轉換應用任何已配置的單消息轉換有

接收(針對接收連接器)將消息寫入目標數據存儲無

注意源連接器沒有死信隊列。

#錯誤處理配置流程

關于連接器錯誤處理的配置挨约,可以按照如下的流程一步步進階:

#總結

處理錯誤是任何穩(wěn)定可靠的數據管道的重要組成部分味混,根據數據的使用方式,可以有兩個選項诫惭。如果管道任何錯誤的消息都不能接受翁锡,表明上游存在嚴重的問題,那么就應該立即停止處理(這是Kafka連接器的默認行為)夕土。

另一方面馆衔,如果只是想將數據流式傳輸到存儲以進行分析或非關鍵性處理,那么只要不傳播錯誤怨绣,保持管道穩(wěn)定運行則更為重要角溃。這時就可以定義錯誤的處理方式,推薦的方式是使用死信隊列并密切監(jiān)視來自Kafka連接器的可用JMX指標篮撑。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末开镣,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子咽扇,更是在濱河造成了極大的恐慌邪财,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件质欲,死亡現場離奇詭異树埠,居然都是意外死亡,警方通過查閱死者的電腦和手機嘶伟,發(fā)現死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門怎憋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事绊袋”显龋” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵癌别,是天一觀的道長皂岔。 經常有香客問我,道長展姐,這世上最難降的妖魔是什么躁垛? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮圾笨,結果婚禮上教馆,老公的妹妹穿的比我還像新娘。我一直安慰自己擂达,他們只是感情好土铺,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著板鬓,像睡著了一般舒憾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上穗熬,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天镀迂,我揣著相機與錄音,去河邊找鬼唤蔗。 笑死探遵,一個胖子當著我的面吹牛,可吹牛的內容都是我干的妓柜。 我是一名探鬼主播箱季,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼棍掐!你這毒婦竟也來了藏雏?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤作煌,失蹤者是張志新(化名)和其女友劉穎掘殴,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體粟誓,經...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡奏寨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了鹰服。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片病瞳。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡揽咕,死狀恐怖,靈堂內的尸體忽然破棺而出套菜,到底是詐尸還是另有隱情亲善,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布逗柴,位于F島的核電站蛹头,受9級特大地震影響,放射性物質發(fā)生泄漏嚎于。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一挟冠、第九天 我趴在偏房一處隱蔽的房頂上張望于购。 院中可真熱鬧,春花似錦知染、人聲如沸肋僧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽嫌吠。三九已至,卻和暖如春掺炭,著一層夾襖步出監(jiān)牢的瞬間辫诅,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工涧狮, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留炕矮,地道東北人。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓者冤,卻偏偏與公主長得像肤视,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子涉枫,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345