1.環(huán)境情況
flink 1.11
kafka 2.4.0
2. 操作準(zhǔn)備
a. avro 的jar 包 avro-tools-1.7.7.jar
b. 對象的 XXX.avsc 文件 ,描述序列化各個字段的屬性類型讥巡。
namespace:命名路徑
type:固定寫法 record
name:java對象類名
fields:字段描述
{
"namespace":"com.test.avro",
"type":"record",
"name":"User",
"fields":[
{
"name":"name",
"type":"string"
},
{
"name":"age",
"type":[
"int",
"null"
]
},
{
"name":"email",
"type":[
"string",
"null"
]
},
{
"name":"custom_props",
"type":[
"null",
{
"type":"map",
"values":"string"
}
]
}
]
}
3. 執(zhí)行步驟
a第一步.使用windows 自動生成 XXX.java 文件
命令:C:\Users\colin>java -jar F:\repository\org\apache\avro\avro-tools\1.7.7\avro-tools-1.7.7.jar compile -string schema C:\Users\colin\IdeaProjects\logs_behavior_analysis_v2\ods_logs_kafka\src\main\resources\avro\Logs.avsc ./
java -jar + tools路徑+ copile -string schema + XXX.avsc路徑+ XXX.java存放路徑
注意:不加-string 則默認(rèn)使用 CharSequence類型绢片,遇到map就會自動加密(base64加密方式)幢哨,本人再這里搞了很久都解析不出map中的字段僵刮。所以建議使用 -string指定字段屬性為String。
image.png
生成的java文件截圖如下:
image.png
b第二步:放入程序中末荐,引用此對象
sink的序列化
AvroSerializationSchema<Logs> avroSerializationSchema = AvroSerializationSchema.forSpecific(Logs.class);
//Logs.class 這個就是你生成的 XXX.java文件
final FlinkKafkaProducer010<Logs> myProducer = new FlinkKafkaProducer010<>(
topic,
avroSerializationSchema,
prop);
Source 反序列化
AvroDeserializationSchema<Logs> avroDeserializationSchema = AvroDeserializationSchema.forSpecific(Logs.class);
DataStream<Logs> kafkaStream = env.addSource(new FlinkKafkaConsumer010<Logs>("colin_topic", avroDeserializationSchema, properties))
4. tools maven
通過這樣tools下載包
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>1.7.7</version>
</dependency>