0X01 背景
大數(shù)據(jù)過(guò)濾、導(dǎo)入盹沈,用celery下發(fā)任務(wù)硼控,任務(wù)內(nèi)容為kafka生產(chǎn)一些數(shù)據(jù)。
0X02 問(wèn)題
使用confluent_kafka或python-kafka模塊向kafka生產(chǎn)數(shù)據(jù)动壤,本地調(diào)試時(shí)代碼可以正常生產(chǎn)消息,但是套上celery后淮逻,kafka就無(wú)法將新消息生產(chǎn)到topic隊(duì)列中了琼懊,具體表現(xiàn)為調(diào)用Producer函數(shù)后無(wú)任何反應(yīng)。
https://www.cnblogs.com/dplearning/p/7520211.html
https://blog.csdn.net/weixin_34050427/article/details/85940461
0X03 解決方案
- 更換為pykafka庫(kù)(缺點(diǎn):pykafka不支持client.id)
- 更換為python-kafka庫(kù)(缺點(diǎn):每個(gè)tasker都需要實(shí)例化一個(gè)Producer爬早,否則依然會(huì)發(fā)生confluent_kafka那種無(wú)響應(yīng)的情況)
如:
#coding: utf-8
from kafka import KafkaProducer
import json
import time
@celery.task(max_retries=1, default_retry_delay=3, ignore_result=True)
def WriteToKafka(key):
"""
將傳入數(shù)據(jù)寫(xiě)入Kafka
"""
prod = KafkaProducer(
bootstrap_servers=BROKER, client_id="xxx", retries=2)
datas = redis_client.smembers(key)
for data in datas:
# 數(shù)據(jù)包格式化
json_data = {}
future = prod.send('xxx',
json.dumps(json_data).encode("utf-8")).add_errback(on_send_error)
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
# 清理使用過(guò)的Redis Key
redis_client.delete(key)
prod.flush()
0X04 后記
由于pykafka不支持設(shè)置client.id哼丈,所以我這里只能使用python-kafka來(lái)解決該問(wèn)題。
雖然將Producer實(shí)例化放入task函數(shù)中會(huì)導(dǎo)致多次建立kafka鏈接筛严,但是可以通過(guò)數(shù)據(jù)打包的方式讓一個(gè)tasker執(zhí)行更多的任務(wù)醉旦,通過(guò)減少tasker的調(diào)用量來(lái)減少Producer實(shí)例化的次數(shù),從而提高效率。
有一個(gè)奇怪的問(wèn)題髓抑,當(dāng)我嘗試使用confluent_kafka庫(kù)咙崎,并模仿python-kafka的解決方法——在task函數(shù)中實(shí)例化Producer時(shí),celery上面執(zhí)行依然無(wú)響應(yīng)吨拍,懷疑是confluent_kafka庫(kù)的bug
0X05 參考內(nèi)容
https://github.com/celery/celery/issues/4021
https://github.com/dpkp/kafka-python/issues/1098
博客文章遷移: 2019-09-27 11:01