使用傳統(tǒng)的 avro API 自定義序列化類和反序列化類比較麻煩帐姻,需要根據(jù) schema 生成實體類忘伞,需要調(diào)用 avro 的 API 實現(xiàn) 對象到 byte[] 和 byte[] 到對象的轉(zhuǎn)化,而那些方法看上去比較繁瑣微峰,幸運的是舷丹,Twitter 開源的類庫 Bijection 對傳統(tǒng)的 Avro API 進行了封裝了和優(yōu)化,讓我們可以方便的實現(xiàn)以上操作蜓肆。
1. 添加 Bijection 類庫的依賴颜凯,并新建一個 schema 文件
Bijection 類庫的依賴如下:
<dependency>
<groupId>com.twitter</groupId>
<artifactId>bijection-avro_2.11</artifactId>
<version>0.9.6</version>
</dependency>
在 maven 工程的 resources 目錄下新建一個 schema 文件,名稱為"user.json"症杏,因為我們不用 avro 生成實體類的方式,所以定義一個普通的 json 文件來描述 schema 即可瑞信,另外厉颤,在 json 文件中,也不需要"namespace": "packageName"
這個限定生成實體類的包名的參數(shù)凡简,本文使用的 json 文件內(nèi)容如下:
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
2. KafkaProducer 使用 Bijection 類庫發(fā)送序列化后的消息
package com.bonc.rdpe.kafka110.producer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
/**
* @Title BijectionProducer.java
* @Description KafkaProducer 使用 Bijection 類庫發(fā)送序列化后的消息
* @Author YangYunhe
* @Date 2018-06-22 10:42:06
*/
public class BijectionProducer {
public static void main(String[] args) throws Exception {
String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
FileReader fr = new FileReader(new File(schemaFilePath));
BufferedReader br = new BufferedReader(fr);
StringBuilder sb = new StringBuilder();
String line;
while((line = br.readLine()) != null) {
sb.append(line).append("\n");
}
String schemaStr = sb.toString();
br.close();
fr.close();
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaStr);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
Producer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("id", i);
avroRecord.put("name", "name" + i);
avroRecord.put("age", 22);
byte[] avroRecordBytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("dev3-yangyunhe-topic001", avroRecordBytes);
producer.send(record);
Thread.sleep(1000);
}
producer.close();
}
}
3. KafkaConsumer 使用 Bijection 類庫來反序列化消息
package com.bonc.rdpe.kafka110.consumer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Collections;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import com.bonc.rdpe.kafka110.producer.BijectionProducer;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
/**
* @Title BijectionConsumer.java
* @Description KafkaConsumer 使用 Bijection 類庫來反序列化消息
* @Author YangYunhe
* @Date 2018-06-22 11:10:29
*/
public class BijectionConsumer {
public static void main(String[] args) throws Exception {
String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath();
FileReader fr = new FileReader(new File(schemaFilePath));
BufferedReader br = new BufferedReader(fr);
StringBuilder sb = new StringBuilder();
String line;
while((line = br.readLine()) != null) {
sb.append(line).append("\n");
}
String schemaStr = sb.toString();
br.close();
fr.close();
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("group.id", "dev3-yangyunhe-group001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("dev3-yangyunhe-topic001"));
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaStr);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
try {
while(true) {
ConsumerRecords<String, byte[]> records = consumer.poll(1000);
for (ConsumerRecord<String, byte[]> record : records) {
GenericRecord genericRecord = recordInjection.invert(record.value()).get();
System.out.println("value = [user.id = " + genericRecord.get("id") + ", " +
"user.name = " + genericRecord.get("name") + ", " +
"user.age = " + genericRecord.get("age") + "], " +
"partition = " + record.partition() + ", " +
"offset = " + record.offset());
}
}
} finally {
consumer.close();
}
}
}
4. 測試結果
先運行 KafkaConsumer逼友,沒有輸出
當運行 KakfaProducer 后精肃,KakfaConsumer 控制臺輸出:
value = [user.id = 0, user.name = name0, user.age = 22], partition = 2, offset = 662
value = [user.id = 1, user.name = name1, user.age = 22], partition = 1, offset = 663
value = [user.id = 2, user.name = name2, user.age = 22], partition = 0, offset = 663
value = [user.id = 3, user.name = name3, user.age = 22], partition = 2, offset = 663
value = [user.id = 4, user.name = name4, user.age = 22], partition = 1, offset = 664
......
參考文章:
在Kafka中使用Avro編碼消息:Producter篇
在Kafka中使用Avro編碼消息:Consumer篇