librdkafka簡介
librdkafka是用c語言實現(xiàn)的一個高性能的kafka客戶端田度,因為性能強大,開發(fā)者們基于librdkafka開發(fā)了各種語言的kafka客戶端僵腺,比如librdkafkad(c++),, node-rdkafka(Node.js), confulent-kafka-python(Python)等。
librdkafka的高性能主要體現(xiàn)在其多線程的設(shè)計以及盡可能的降低內(nèi)存拷貝。
librdkakfa API 簡介
librdkafka github地址:https://github.com/edenhill/librdkafka ,
其中脖岛,C語言API可以參考src/rdkafka.h頭文件,簡要介紹幾個關(guān)鍵的對象
- rd_kafka_t: kafka客戶端對象
- rd_kafka_conf_t: kafka客戶端配置對象
- rd_kafka_topic_t: kafka topic對象
創(chuàng)建這幾個對象所使用的函數(shù):
- rd_kafka_new()
- rd_kafka_conf_new()
- rd_kafka_topic_new()
librdkafka支持多種協(xié)議以控制kafka服務(wù)器的訪問權(quán)限颊亮,如SASL_PALIN, PLAINTEXT, SASL_SSL等柴梆,在使用librdkafka時需要通過security.protocol參數(shù)指定協(xié)議類型,再輔以相應(yīng)協(xié)議所需的其它參數(shù)完成權(quán)限認(rèn)證终惑。
如果使用SASL協(xié)議進(jìn)行權(quán)限認(rèn)證绍在,需要對librdkafka添加SASL庫依賴并重新編譯。例如:在CentOS下安裝如下依賴包:
yum -y install cyrus-sasl cyrus-sasl-devel
經(jīng)過重新編譯librdkafka后雹有,進(jìn)入examples目錄下揣苏,執(zhí)行
./rdkafka_example -X builtin.features
結(jié)果為:
builtin.features = gzip,snappy,ssl,sasl,regex
可以看到librdkafka已經(jīng)有了sasl特性,后續(xù)可以通過sasl協(xié)議進(jìn)行訪問認(rèn)證件舵。
producer 代碼示例
初始化producer
int KafkaApi::init_producer(const std::string &brokers,
const std::string &username,
const std::string &password) {
char errstr[512];
/* Kafka configuration */
if (NULL == conf_) {
conf_ = rd_kafka_conf_new();
}
rd_kafka_conf_set(conf_, "queued.min.messages", "20", NULL, 0);
rd_kafka_conf_set(conf_, "bootstrap.servers", brokers.c_str(), errstr,
sizeof(errstr));
rd_kafka_conf_set(conf_, "security.protocol", "sasl_plaintext", errstr,
sizeof(errstr));
rd_kafka_conf_set(conf_, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr));
rd_kafka_conf_set(conf_, "sasl.username", username.c_str(), errstr,
sizeof(errstr));
rd_kafka_conf_set(conf_, "sasl.password", password.c_str(), errstr,
sizeof(errstr));
rd_kafka_conf_set(conf_, "api.version.request", "true", errstr,
sizeof(errstr));
rd_kafka_conf_set_dr_msg_cb(conf_, dr_msg_cb_trampoline);
/* Create Kafka handle */
if (!(rk_ = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr)))) {
fprintf(stderr, "%% Failed to init producer: %s\n", errstr);
exit(1);
}
return 0;
}
初始化過程介紹:
首先通過rd_kafka_conf_new()函數(shù)創(chuàng)建rd_kafka_conf_t對象
-
設(shè)置rd_kafka_conf_t對象,設(shè)置kafka客戶端參數(shù),示例參數(shù)為:
- bootstrap.servers: broker地址列表
- security.protocol: 安全協(xié)議類型脯厨,示例為SASL_PLAINTEXT
- sasl.mechanisms: sasl協(xié)議機制铅祸,示例為PLAIN, 表示普通文本
- sasl.username: 認(rèn)證用戶名
- sasl.password: 認(rèn)證密碼
- api.version.request: 可選,librdkafka與kafka服務(wù)器版本適配參數(shù)合武,該參數(shù)為true表示允許librdkafka向broker發(fā)送請求詢問broker支持的API版本列表(Apache Kafka v0.10.0版本后支持),以完成版本適配临梗,更多版本適配要點見https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility
- 設(shè)置發(fā)送消息的回調(diào)函數(shù),因為librdkafka發(fā)送消息為非阻塞的稼跳,需要通過rd_kafka_poll()方法輪詢消息是否發(fā)送成功盟庞,并設(shè)置響應(yīng)的回調(diào)函數(shù)確認(rèn)消息是否發(fā)送成功
調(diào)用rd_kafka_new()函數(shù)創(chuàng)建rd_kafka_t對象
發(fā)送消息
int KafkaApi::send_message(const std::string &topic, const char *data,
const int &data_len) {
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk_, topic.c_str(), NULL);
if (!rkt) {
COMMLIB_LOG_ERR("kafka: create topic failed, err:%s",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
return rt::KDFKA_PRODUCE_ERR;
}
int ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
const_cast<char *>(data), data_len, NULL, 0, NULL);
if (ret == -1) {
COMMLIB_LOG_ERR("kafka: send message failed, err:%s",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
return rt::KDFKA_PRODUCE_ERR;
}
COMMLIB_LOG_DEBUG("produce message [%s]", data);
rd_kafka_poll(rk_, 0);
return rt::SUCCESS;
}
發(fā)送消息過程介紹:
通過rd_kafka_topic_new()方法創(chuàng)建rd_kafka_topic_t對象,注意topic是自動創(chuàng)建的(需要broker端設(shè)置能否自動創(chuàng)建topic的參數(shù):auto.create.topics.enable=true), 除此之外汤善,topic能否創(chuàng)建成功還與認(rèn)證用戶的權(quán)限有關(guān)什猖,如果認(rèn)證用戶在broker端為super.users,則topic能夠自動創(chuàng)建成功红淡,否則則會報錯: 用戶無權(quán)限不狮,需要先給用戶添加ACL權(quán)限才行;最后一點在旱,對于已經(jīng)存在的topic, rd_kafka_topic_new()方法仍然返回的是舊的對象
-
發(fā)送消息通過調(diào)用rd_kafka_produce()函數(shù)完成摇零,該函數(shù)的參數(shù)為:
- rd_kafka_topic_t對象
- partition: RD_KAFKA_PARTITION_UA表示為不設(shè)置
- msgflags: 可設(shè)置為0或RD_KAFKA_MSG_F_COPY, RD_KAFKA_MSG_F_FREE, RD_KAFKA_MSG_F_BLOCK, RD_KAFKA_MSG_F_COPY表示發(fā)送的消息內(nèi)容參數(shù)為值傳遞,rd_kafka_produce()函數(shù)返回之后將不會仍持有消息內(nèi)容的引用
- payload, 消息內(nèi)容指針
- len, 消息長度
- key, 消息的key
- msg_opaque: 每條消息的透明度指針桶蝎,在消息發(fā)送的回調(diào)函數(shù)中使用
調(diào)用rd_kafka_poll()函數(shù)驻仅,使得消息發(fā)送的回調(diào)函數(shù)能夠觸發(fā)谅畅, 該函數(shù)第一個參數(shù)為rd_kafka_t對象,第二個參數(shù)為timeout_ms噪服,設(shè)置為0表示為非阻塞
注意事項
在使用librdkafka帶鑒權(quán)認(rèn)證訪問kafka服務(wù)器的過程中毡泻,解決消息發(fā)送失敗問題的關(guān)鍵點有:
- librdkafka的SASL依賴有沒有添加
- SASL認(rèn)證的參數(shù)配置有沒有正確,需要確認(rèn)用戶在broker端是否已經(jīng)添加芯咧,以及確認(rèn)用戶擁有的權(quán)限
- api.version.request參數(shù)牙捉,該參數(shù)設(shè)置不正確,將直接導(dǎo)致消息發(fā)送失敗敬飒,使用過程中需要注意librdkafka的版本與broker的版本