Kafka 中使用 Avro 序列化框架(二):使用 Twitter 的 Bijection 類庫實現(xiàn) avro 的序列化與反序列化

使用傳統(tǒng)的 avro API 自定義序列化類和反序列化類比較麻煩帐姻,需要根據(jù) schema 生成實體類忘伞,需要調(diào)用 avro 的 API 實現(xiàn) 對象到 byte[] 和 byte[] 到對象的轉(zhuǎn)化,而那些方法看上去比較繁瑣微峰,幸運的是舷丹,Twitter 開源的類庫 Bijection 對傳統(tǒng)的 Avro API 進行了封裝了和優(yōu)化,讓我們可以方便的實現(xiàn)以上操作蜓肆。

1. 添加 Bijection 類庫的依賴颜凯,并新建一個 schema 文件

Bijection 類庫的依賴如下:

<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>bijection-avro_2.11</artifactId>
    <version>0.9.6</version>
</dependency>

在 maven 工程的 resources 目錄下新建一個 schema 文件,名稱為"user.json"症杏,因為我們不用 avro 生成實體類的方式,所以定義一個普通的 json 文件來描述 schema 即可瑞信,另外厉颤,在 json 文件中,也不需要"namespace": "packageName"這個限定生成實體類的包名的參數(shù)凡简,本文使用的 json 文件內(nèi)容如下:

{
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name",  "type": "string"},
        {"name": "age", "type": "int"}
    ]
}

2. KafkaProducer 使用 Bijection 類庫發(fā)送序列化后的消息

package com.bonc.rdpe.kafka110.producer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

/**
 * @Title BijectionProducer.java 
 * @Description KafkaProducer 使用 Bijection 類庫發(fā)送序列化后的消息
 * @Author YangYunhe
 * @Date 2018-06-22 10:42:06
 */
public class BijectionProducer {

    public static void main(String[] args) throws Exception {
        
        String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
        FileReader fr = new FileReader(new File(schemaFilePath));
        BufferedReader br = new BufferedReader(fr);
        StringBuilder sb = new StringBuilder();
        String line;
        while((line = br.readLine()) != null) {
            sb.append(line).append("\n");
        }
        String schemaStr = sb.toString();
        br.close();
        fr.close();
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(schemaStr);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
        
        Producer<String, byte[]> producer = new KafkaProducer<>(props);
        
        for (int i = 0; i < 100; i++) {
            GenericData.Record avroRecord = new GenericData.Record(schema);
            avroRecord.put("id", i);
            avroRecord.put("name", "name" + i);
            avroRecord.put("age", 22);
            byte[] avroRecordBytes = recordInjection.apply(avroRecord);
            ProducerRecord<String, byte[]> record = new ProducerRecord<>("dev3-yangyunhe-topic001", avroRecordBytes);
            producer.send(record);
            Thread.sleep(1000);
        }
        producer.close();
    }
}

3. KafkaConsumer 使用 Bijection 類庫來反序列化消息

package com.bonc.rdpe.kafka110.consumer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Collections;
import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import com.bonc.rdpe.kafka110.producer.BijectionProducer;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;

/**
 * @Title BijectionConsumer.java 
 * @Description KafkaConsumer 使用 Bijection 類庫來反序列化消息
 * @Author YangYunhe
 * @Date 2018-06-22 11:10:29
 */
public class BijectionConsumer {
    
    public static void main(String[] args) throws Exception {
        
        String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
        FileReader fr = new FileReader(new File(schemaFilePath));
        BufferedReader br = new BufferedReader(fr);
        StringBuilder sb = new StringBuilder();
        String line;
        while((line = br.readLine()) != null) {
            sb.append(line).append("\n");
        }
        String schemaStr = sb.toString();
        br.close();
        fr.close();
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("group.id", "dev3-yangyunhe-group001");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList("dev3-yangyunhe-topic001"));
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(schemaStr);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
        
        try {
            while(true) {
                ConsumerRecords<String, byte[]> records = consumer.poll(1000);
                for (ConsumerRecord<String, byte[]> record : records) {
                    GenericRecord genericRecord = recordInjection.invert(record.value()).get();
                    System.out.println("value = [user.id = " + genericRecord.get("id") + ", " +
                            "user.name = " + genericRecord.get("name") + ", " +
                            "user.age = " + genericRecord.get("age") + "], " + 
                            "partition = " + record.partition() + ", " + 
                            "offset = " + record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

4. 測試結果

先運行 KafkaConsumer逼友,沒有輸出
當運行 KakfaProducer 后精肃,KakfaConsumer 控制臺輸出:

value = [user.id = 0, user.name = name0, user.age = 22], partition = 2, offset = 662
value = [user.id = 1, user.name = name1, user.age = 22], partition = 1, offset = 663
value = [user.id = 2, user.name = name2, user.age = 22], partition = 0, offset = 663
value = [user.id = 3, user.name = name3, user.age = 22], partition = 2, offset = 663
value = [user.id = 4, user.name = name4, user.age = 22], partition = 1, offset = 664

......

參考文章:
在Kafka中使用Avro編碼消息:Producter篇
在Kafka中使用Avro編碼消息:Consumer篇

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市帜乞,隨后出現(xiàn)的幾起案子司抱,更是在濱河造成了極大的恐慌,老刑警劉巖黎烈,帶你破解...
    沈念sama閱讀 212,222評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件习柠,死亡現(xiàn)場離奇詭異,居然都是意外死亡照棋,警方通過查閱死者的電腦和手機资溃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,455評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來烈炭,“玉大人溶锭,你說我怎么就攤上這事》叮” “怎么了趴捅?”我有些...
    開封第一講書人閱讀 157,720評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長霹疫。 經(jīng)常有香客問我拱绑,道長,這世上最難降的妖魔是什么更米? 我笑而不...
    開封第一講書人閱讀 56,568評論 1 284
  • 正文 為了忘掉前任欺栗,我火速辦了婚禮,結果婚禮上征峦,老公的妹妹穿的比我還像新娘迟几。我一直安慰自己,他們只是感情好栏笆,可當我...
    茶點故事閱讀 65,696評論 6 386
  • 文/花漫 我一把揭開白布类腮。 她就那樣靜靜地躺著,像睡著了一般蛉加。 火紅的嫁衣襯著肌膚如雪蚜枢。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,879評論 1 290
  • 那天针饥,我揣著相機與錄音厂抽,去河邊找鬼。 笑死丁眼,一個胖子當著我的面吹牛筷凤,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 39,028評論 3 409
  • 文/蒼蘭香墨 我猛地睜開眼藐守,長吁一口氣:“原來是場噩夢啊……” “哼挪丢!你這毒婦竟也來了?” 一聲冷哼從身側響起卢厂,我...
    開封第一講書人閱讀 37,773評論 0 268
  • 序言:老撾萬榮一對情侶失蹤乾蓬,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后慎恒,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體任内,經(jīng)...
    沈念sama閱讀 44,220評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,550評論 2 327
  • 正文 我和宋清朗相戀三年巧号,在試婚紗的時候發(fā)現(xiàn)自己被綠了族奢。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,697評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡丹鸿,死狀恐怖越走,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情靠欢,我是刑警寧澤廊敌,帶...
    沈念sama閱讀 34,360評論 4 332
  • 正文 年R本政府宣布,位于F島的核電站门怪,受9級特大地震影響骡澈,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜掷空,卻給世界環(huán)境...
    茶點故事閱讀 40,002評論 3 315
  • 文/蒙蒙 一肋殴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧坦弟,春花似錦护锤、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,782評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至赤炒,卻和暖如春氯析,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背莺褒。 一陣腳步聲響...
    開封第一講書人閱讀 32,010評論 1 266
  • 我被黑心中介騙來泰國打工掩缓, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人遵岩。 一個月前我還...
    沈念sama閱讀 46,433評論 2 360
  • 正文 我出身青樓你辣,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子绢记,可洞房花燭夜當晚...
    茶點故事閱讀 43,587評論 2 350

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn)正卧,斷路器蠢熄,智...
    卡卡羅2017閱讀 134,633評論 18 139
  • 1、通過CocoaPods安裝項目名稱項目信息 AFNetworking網(wǎng)絡請求組件 FMDB本地數(shù)據(jù)庫組件 SD...
    陽明先生_X自主閱讀 15,969評論 3 119
  • 啥叫自作孽饥追,不可活吧? 起身猛了,腿酸打不了太順溜的彎,腳腕子也些許腫藐不,去扶者铜,他妹的左手大拇指也酸疼! 有點天地不...
    縱情嬉戲天地間閱讀 243評論 0 0
  • 1 不是所有的相遇,都能被溫柔以待幅骄,走到最后。亦不是所有的牽手本今,都能笑看東風拆座、相守相依。 他是風度翩翩的青年才俊冠息,...
    茗洛川閱讀 17,795評論 181 741
  • 來深圳后倒是正正經(jīng)經(jīng)的吃過兩家東北菜館挪凑。我對餃子的喜愛程度超越了自己身上“南方人”標簽的約束力,東北餃子店往往是我...
    皮醬閱讀 642評論 1 2