問題描述
根據(jù)“將 Apache Kafka MirrorMaker 與事件中心配合使用”一文宋舷,成功配置了Mirror Maker來發(fā)送數(shù)據(jù)到Event Hub中腌零。為什么只能成功運(yùn)行一會(10分鐘 ~ 2小時左右)就會出現(xiàn)Timeout Exception,然后Kafka MirrorMaker就會中斷退出呢?
異常消息為:
[2022-05-25 14:29:21,683] INFO [Producer clientId=mirror_maker_producer] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
(org.apache.kafka.clients.producer.KafkaProducer)
[2022-05-25 14:29:21,683] DEBUG [Producer clientId=mirror_maker_producer] Kafka producer has been closed (org.apache.kafka.clients.producer.KafkaProducer)
[2022-05-25 14:29:21,683] ERROR Error when sending message to topic xxxxxxxxxxxx with key: 16 bytes, value: 875 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 18 record(s) for xxxxxxxxxxxx-4: 79823 ms has passed since last append
[2022-05-25 14:29:21,683] INFO Closing producer due to send failure. (kafka.tools.MirrorMaker$)
[2022-05-25 14:29:21,683] INFO [Producer clientId=mirror_maker_producer] Closing the Kafka producer with timeoutMillis = 0 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2022-05-25 14:29:21,683] INFO [Producer clientId=mirror_maker_producer] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
(org.apache.kafka.clients.producer.KafkaProducer)
[2022-05-25 14:29:21,683] DEBUG [Producer clientId=mirror_maker_producer] Kafka producer has been closed (org.apache.kafka.clients.producer.KafkaProducer)
[2022-05-25 14:29:21,683] ERROR Error when sending message to topic xxxxxxxxxxxx with key: 16 bytes, value: 875 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 18 record(s) for xxxxxxxxxxxx-4: 79823 ms has passed since last append
[2022-05-25 14:29:21,683] INFO Closing producer due to send failure. (kafka.tools.MirrorMaker$)
[2022-05-25 14:29:21,683] INFO [Producer clientId=mirror_maker_producer] Closing the Kafka producer with timeoutMillis = 0 ms. (org.apache.kafka.clients.producer.KafkaProducer)
問題解析
根據(jù)錯誤消息 " Expiring 18 record(s) for xxxxxxxxxxxx-4: 79823 ms has passed since last append " 在網(wǎng)上進(jìn)行搜索喇聊,對發(fā)生問題的解釋有:
1) Stack Overflow上的解釋:https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has
The error indicates that some records are put into the queue at a faster rate than they can be sent from the client.
錯誤表示Mirror Maker中消息進(jìn)入Queue中的速度快于從當(dāng)前客戶端發(fā)送到服務(wù)端的速度(服務(wù)端是 Event Hub)
When your Producer sends messages, they are stored in buffer (before sending them to the target broker) and the records are grouped together into batches in order to increase throughput. When a new record is added to the batch, it must be sent within a -configurable- time window which is controlled by
request.timeout.ms
(the default is set to 30 seconds). If the batch is in the queue for longer time, aTimeoutException
is thrown and the batch records will then be removed from the queue and won't be delivered to the broker.當(dāng)Mirror Maker(生產(chǎn)者)發(fā)送消息時,它們被存儲在緩沖區(qū)中(在將它們發(fā)送到目標(biāo)代理之前),并且記錄被分組到一起以增加吞吐量。 將新記錄添加到批次時净捅,它必須在由 request.timeout.ms 控制的時間窗口內(nèi)發(fā)送(默認(rèn)設(shè)置為 30 秒)。 如果批處理在隊列中的時間較長邦鲫,則會引發(fā) TimeoutException灸叼,然后批處理記錄將從隊列中刪除并且不會傳遞給代理神汹。
Increasing the value of
request.timeout.ms
should do the trick for you.增加 request.timeout.ms 的值應(yīng)該可以解決問題庆捺。
2) 博客園解釋:https://blog.csdn.net/weixin_43432984/article/details/109180842
當(dāng)每一批消息滿了(batch.size)且 requestTimeoutMs < (now - this.lastAppendTime)) 這一批消息就會被標(biāo)記為過期且不會放到 RecordAccumulator 中(不會再次重試發(fā)送)
調(diào)大batch.size 參數(shù)和request.timeout.ms 參數(shù)可解決問題
3) 為什么一出現(xiàn)異常就馬上停止運(yùn)行呢?
因為Mirror Maker的配置參數(shù)屁魏,abort.on.send.failure 默認(rèn)為true滔以,決定生產(chǎn)者寫入失敗時的處理機(jī)制就是Abort,終止發(fā)送氓拼。
從 maybeExpire 函數(shù)的源碼中發(fā)現(xiàn)異常消息產(chǎn)生的根源:
所以根據(jù)以上的信息你画,只需要修改 batch.size 和 request.timeout.ms 參數(shù)即可。
**修改前:**
request.timeout.ms = 60000
batch.size = 16384
**修改后:**
request.timeout.ms=180000
batch.size=50000
使用修改后的參數(shù)運(yùn)行Mirror Maker桃漾,發(fā)送數(shù)據(jù)到Azure Event Hub正常坏匪,連續(xù)運(yùn)行2天沒見Timeout 異常彩郊。問題解決鲁猩!
參考資料
記一次Kafka Producer TimeoutException排查:https://blog.csdn.net/weixin_43432984/article/details/109180842
How to fix kafka.common.errors.TimeoutException: Expiring 1 record(s) xxx ms has passed since batch creation plus linger time:https://stackoverflow.com/questions/56807188/how-to-fix-kafka-common-errors-timeoutexception-expiring-1-records-xxx-ms-has
將 Apache Kafka MirrorMaker 與事件中心配合使用:https://docs.azure.cn/zh-cn/event-hubs/event-hubs-kafka-mirror-maker-tutorial
kafka-mirror-maker.sh腳本: https://blog.csdn.net/qq_41154944/article/details/108282641
當(dāng)在復(fù)雜的環(huán)境中面臨問題,格物之道需:濁而靜之徐清钠怯,安以動之徐生恋追。 云中凭迹,恰是如此!
分類: 【Azure 事件中心】
標(biāo)簽: Azure Developer, 事件中心 Azure Event Hub, Kafka Mirror Maker, Expiring 18 record(s) for, xxxxxxx: 79823 ms has passed since last append, Timeout Exception: