librdkafka帶鑒權(quán)認(rèn)證訪問kafka服務(wù)器

librdkafka簡介

librdkafka是用c語言實現(xiàn)的一個高性能的kafka客戶端田度,因為性能強大,開發(fā)者們基于librdkafka開發(fā)了各種語言的kafka客戶端僵腺,比如librdkafkad(c++),, node-rdkafka(Node.js), confulent-kafka-python(Python)等。
librdkafka的高性能主要體現(xiàn)在其多線程的設(shè)計以及盡可能的降低內(nèi)存拷貝。

librdkakfa API 簡介

librdkafka github地址:https://github.com/edenhill/librdkafka ,
其中脖岛,C語言API可以參考src/rdkafka.h頭文件,簡要介紹幾個關(guān)鍵的對象

  • rd_kafka_t: kafka客戶端對象
  • rd_kafka_conf_t: kafka客戶端配置對象
  • rd_kafka_topic_t: kafka topic對象

創(chuàng)建這幾個對象所使用的函數(shù):

  • rd_kafka_new()
  • rd_kafka_conf_new()
  • rd_kafka_topic_new()

librdkafka支持多種協(xié)議以控制kafka服務(wù)器的訪問權(quán)限颊亮,如SASL_PALIN, PLAINTEXT, SASL_SSL等柴梆,在使用librdkafka時需要通過security.protocol參數(shù)指定協(xié)議類型,再輔以相應(yīng)協(xié)議所需的其它參數(shù)完成權(quán)限認(rèn)證终惑。

如果使用SASL協(xié)議進(jìn)行權(quán)限認(rèn)證绍在,需要對librdkafka添加SASL庫依賴并重新編譯。例如:在CentOS下安裝如下依賴包:

yum -y install cyrus-sasl cyrus-sasl-devel

經(jīng)過重新編譯librdkafka后雹有,進(jìn)入examples目錄下揣苏,執(zhí)行

./rdkafka_example -X builtin.features

結(jié)果為:

 builtin.features = gzip,snappy,ssl,sasl,regex

可以看到librdkafka已經(jīng)有了sasl特性,后續(xù)可以通過sasl協(xié)議進(jìn)行訪問認(rèn)證件舵。

producer 代碼示例

初始化producer


int KafkaApi::init_producer(const std::string &brokers,
                            const std::string &username,
                            const std::string &password) {
  char errstr[512];
  /* Kafka configuration */
  if (NULL == conf_) {
    conf_ = rd_kafka_conf_new();
  }

  rd_kafka_conf_set(conf_, "queued.min.messages", "20", NULL, 0);
  rd_kafka_conf_set(conf_, "bootstrap.servers", brokers.c_str(), errstr,
                    sizeof(errstr));
  rd_kafka_conf_set(conf_, "security.protocol", "sasl_plaintext", errstr,
                    sizeof(errstr));
  rd_kafka_conf_set(conf_, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr));
  rd_kafka_conf_set(conf_, "sasl.username", username.c_str(), errstr,
                    sizeof(errstr));
  rd_kafka_conf_set(conf_, "sasl.password", password.c_str(), errstr,
                    sizeof(errstr));
  rd_kafka_conf_set(conf_, "api.version.request", "true", errstr,
                    sizeof(errstr));
  rd_kafka_conf_set_dr_msg_cb(conf_, dr_msg_cb_trampoline);

  /* Create Kafka handle */
  if (!(rk_ = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr)))) {
    fprintf(stderr, "%% Failed to init producer: %s\n", errstr);
    exit(1);
  }

  return 0;
}

初始化過程介紹:

  1. 首先通過rd_kafka_conf_new()函數(shù)創(chuàng)建rd_kafka_conf_t對象

  2. 設(shè)置rd_kafka_conf_t對象,設(shè)置kafka客戶端參數(shù),示例參數(shù)為:

    • bootstrap.servers: broker地址列表
    • security.protocol: 安全協(xié)議類型脯厨,示例為SASL_PLAINTEXT
    • sasl.mechanisms: sasl協(xié)議機制铅祸,示例為PLAIN, 表示普通文本
    • sasl.username: 認(rèn)證用戶名
    • sasl.password: 認(rèn)證密碼
    • api.version.request: 可選,librdkafka與kafka服務(wù)器版本適配參數(shù)合武,該參數(shù)為true表示允許librdkafka向broker發(fā)送請求詢問broker支持的API版本列表(Apache Kafka v0.10.0版本后支持),以完成版本適配临梗,更多版本適配要點見https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility
    • 設(shè)置發(fā)送消息的回調(diào)函數(shù),因為librdkafka發(fā)送消息為非阻塞的稼跳,需要通過rd_kafka_poll()方法輪詢消息是否發(fā)送成功盟庞,并設(shè)置響應(yīng)的回調(diào)函數(shù)確認(rèn)消息是否發(fā)送成功
  3. 調(diào)用rd_kafka_new()函數(shù)創(chuàng)建rd_kafka_t對象

發(fā)送消息

int KafkaApi::send_message(const std::string &topic, const char *data,
                           const int &data_len) {
  rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk_, topic.c_str(), NULL);

  if (!rkt) {
    COMMLIB_LOG_ERR("kafka: create topic failed, err:%s",
                    rd_kafka_err2str(rd_kafka_errno2err(errno)));
    return rt::KDFKA_PRODUCE_ERR;
  }

  int ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
                             const_cast<char *>(data), data_len, NULL, 0, NULL);
  if (ret == -1) {
    COMMLIB_LOG_ERR("kafka: send message failed, err:%s",
                    rd_kafka_err2str(rd_kafka_errno2err(errno)));
    return rt::KDFKA_PRODUCE_ERR;
  }

  COMMLIB_LOG_DEBUG("produce message [%s]", data);
  rd_kafka_poll(rk_, 0);
  return rt::SUCCESS;
}

發(fā)送消息過程介紹:

  1. 通過rd_kafka_topic_new()方法創(chuàng)建rd_kafka_topic_t對象,注意topic是自動創(chuàng)建的(需要broker端設(shè)置能否自動創(chuàng)建topic的參數(shù):auto.create.topics.enable=true), 除此之外汤善,topic能否創(chuàng)建成功還與認(rèn)證用戶的權(quán)限有關(guān)什猖,如果認(rèn)證用戶在broker端為super.users,則topic能夠自動創(chuàng)建成功红淡,否則則會報錯: 用戶無權(quán)限不狮,需要先給用戶添加ACL權(quán)限才行;最后一點在旱,對于已經(jīng)存在的topic, rd_kafka_topic_new()方法仍然返回的是舊的對象

  2. 發(fā)送消息通過調(diào)用rd_kafka_produce()函數(shù)完成摇零,該函數(shù)的參數(shù)為:

    • rd_kafka_topic_t對象
    • partition: RD_KAFKA_PARTITION_UA表示為不設(shè)置
    • msgflags: 可設(shè)置為0或RD_KAFKA_MSG_F_COPY, RD_KAFKA_MSG_F_FREE, RD_KAFKA_MSG_F_BLOCK, RD_KAFKA_MSG_F_COPY表示發(fā)送的消息內(nèi)容參數(shù)為值傳遞,rd_kafka_produce()函數(shù)返回之后將不會仍持有消息內(nèi)容的引用
    • payload, 消息內(nèi)容指針
    • len, 消息長度
    • key, 消息的key
    • msg_opaque: 每條消息的透明度指針桶蝎,在消息發(fā)送的回調(diào)函數(shù)中使用
  3. 調(diào)用rd_kafka_poll()函數(shù)驻仅,使得消息發(fā)送的回調(diào)函數(shù)能夠觸發(fā)谅畅, 該函數(shù)第一個參數(shù)為rd_kafka_t對象,第二個參數(shù)為timeout_ms噪服,設(shè)置為0表示為非阻塞

注意事項

在使用librdkafka帶鑒權(quán)認(rèn)證訪問kafka服務(wù)器的過程中毡泻,解決消息發(fā)送失敗問題的關(guān)鍵點有:

  • librdkafka的SASL依賴有沒有添加
  • SASL認(rèn)證的參數(shù)配置有沒有正確,需要確認(rèn)用戶在broker端是否已經(jīng)添加芯咧,以及確認(rèn)用戶擁有的權(quán)限
  • api.version.request參數(shù)牙捉,該參數(shù)設(shè)置不正確,將直接導(dǎo)致消息發(fā)送失敗敬飒,使用過程中需要注意librdkafka的版本與broker的版本
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末邪铲,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子无拗,更是在濱河造成了極大的恐慌带到,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件英染,死亡現(xiàn)場離奇詭異揽惹,居然都是意外死亡,警方通過查閱死者的電腦和手機四康,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進(jìn)店門搪搏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人闪金,你說我怎么就攤上這事疯溺。” “怎么了哎垦?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵囱嫩,是天一觀的道長。 經(jīng)常有香客問我漏设,道長墨闲,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任郑口,我火速辦了婚禮鸳碧,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘潘酗。我一直安慰自己杆兵,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布仔夺。 她就那樣靜靜地躺著琐脏,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上日裙,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天吹艇,我揣著相機與錄音,去河邊找鬼昂拂。 笑死受神,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的格侯。 我是一名探鬼主播鼻听,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼联四!你這毒婦竟也來了撑碴?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤朝墩,失蹤者是張志新(化名)和其女友劉穎醉拓,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體收苏,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡亿卤,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了鹿霸。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片排吴。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖懦鼠,靈堂內(nèi)的尸體忽然破棺而出傍念,到底是詐尸還是另有隱情,我是刑警寧澤葛闷,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布钝吮,位于F島的核電站赚抡,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏攒霹。R本人自食惡果不足惜忧陪,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一扣泊、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧嘶摊,春花似錦延蟹、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春沥匈,著一層夾襖步出監(jiān)牢的瞬間蔗喂,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工高帖, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留缰儿,地道東北人。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓散址,卻偏偏與公主長得像乖阵,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子预麸,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,721評論 13 425
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理瞪浸,服務(wù)發(fā)現(xiàn),斷路器师崎,智...
    卡卡羅2017閱讀 134,656評論 18 139
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,829評論 4 54
  • kafka的定義:是一個分布式消息系統(tǒng)默终,由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,320評論 1 15
  • 1.用戶管理 獲取超級管理員權(quán)限 創(chuàng)建用戶 1.命令 -h 獲取命令幫助,ls查看當(dāng)前目錄文件 2.創(chuàng)建用戶 注:...
    cry_0416閱讀 248評論 0 0