avro是什么這里不贅述了态秧。不懂的同學(xué)請(qǐng)翻閱資料段化。
kafka c++客戶端的安裝請(qǐng)參考我的另外一篇《kafka c++客戶端安裝指南》
為了方便,kafka c++客戶端直接用了cppkafka金砍。
安裝avro c++
下載地址 https://avro.apache.org/releases.html
我安裝的版本是1.8.3址否,大家自行參考
安裝目錄在lang/c++下餐蔬,使用cmake編譯。編譯完成后會(huì)生成
動(dòng)態(tài)庫(kù)和靜態(tài)庫(kù)
libavrocpp_s.a
libavrocpp.so.1.8.3-SNAPSHOT.0
avro c++使用
參考官方教程 http://avro.apache.org/docs/1.8.0/api/cpp/html/
主要的使用代碼demo佑附,在lang/c++/samples下都有樊诺。參考著看。
和cppkafka配合使用需要注意的地方
avro encode的時(shí)候音同,一定要注意編碼的轉(zhuǎn)換词爬。下面是參考示例。
// 自定義結(jié)構(gòu)瘟斜,對(duì)應(yīng)的avro json未request.json缸夹。
// 內(nèi)容省略
request req;
auto out = avro::memoryOutputStream(); // 獲取輸出流
avro::EncoderPtr e = avro::binaryEncoder(); // 編碼器
e->init(*out); // 用輸出流初始化編碼器
avro::encode(*e, req); // 將req內(nèi)容編碼
// 編碼后的輸出流需要轉(zhuǎn)輸入流螺句,再轉(zhuǎn)給cppkafka的Buffer對(duì)象接收
auto in = avro::memoryInputStream(*out);
// 注意avro中底層儲(chǔ)存字節(jié)都是unsigned char虽惭,而不是常見的string中的char!
// 所以先用unsigned char的stringbuf接收流中的內(nèi)容
basic_stringbuf<unsigned char> strbuf;
basic_ostream<unsigned char> os(&strbuf);
const unsigned char *h = NULL;
const unsigned char *p = NULL;
size_t total=0;
size_t n = 0;
while (in->next(&p, &n)) {
os.write(p, n);
if(total == 0) {
h = p;
}
total += n;
}
// 初始化kafka broker list
cppkafka::Configuration config = {
{ "metadata.broker.list", "192.168.2.90:9092" }
};
// 這個(gè)是個(gè)繞人的地方蛇尚,初始化cppkafka buffer的時(shí)候不可以使用stringbuf的str函數(shù)芽唇。會(huì)導(dǎo)致編碼錯(cuò)亂。
cppkafka::Buffer buf(p, n);
cppkafka::MessageBuilder builder("test");
builder.payload(buf);
cppkafka::Producer producer(config);
producer.produce(builder);
producer.flush();