前言
環(huán)境所依賴的pom文件
?<dependencies>
????????<dependency>
????????????<groupId>org.apache.avro</groupId>
????????????<artifactId>avro</artifactId>
????????????<version>1.8.2</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.apache.flink</groupId>
????????????<artifactId>flink-scala_2.12</artifactId>
????????????<version>1.10.1</version>
????????</dependency>
????????<!--?https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala?-->
????????<dependency>
????????????<groupId>org.apache.flink</groupId>
????????????<artifactId>flink-streaming-scala_2.12</artifactId>
????????????<version>1.10.1</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.apache.flink</groupId>
????????????<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
????????????<version>1.10.1</version>
????????</dependency>
????????<!--?https://mvnrepository.com/artifact/org.apache.flink/flink-avro?-->
????????<dependency>
????????????<groupId>org.apache.flink</groupId>
????????????<artifactId>flink-avro</artifactId>
????????????<version>1.10.1</version>
????????</dependency>
????????<!--?https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients?-->
????????<dependency>
????????????<groupId>org.apache.kafka</groupId>
????????????<artifactId>kafka-clients</artifactId>
????????????<version>1.0.0</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.apache.kafka</groupId>
????????????<artifactId>kafka-streams</artifactId>
????????????<version>1.0.0</version>
????????</dependency>
????</dependencies>
????<build>
????????<plugins>
????????????<plugin>
????????????????<groupId>org.apache.avro</groupId>
????????????????<artifactId>avro-maven-plugin</artifactId>
????????????????<version>1.8.2</version>
????????????????<executions>
????????????????????<execution>
????????????????????????<phase>generate-sources</phase>
????????????????????????<goals>
????????????????????????????<goal>schema</goal>
????????????????????????</goals>
????????????????????????<configuration>
????????????????????????????<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
????????????????????????????<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
????????????????????????</configuration>
????????????????????</execution>
????????????????</executions>
????????????</plugin>
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-compiler-plugin</artifactId>
????????????????<configuration>
????????????????????<source>1.6</source>
????????????????????<target>1.6</target>
????????????????</configuration>
????????????</plugin>
????????</plugins>
????</build>
一抡驼、Avro提供的技術(shù)支持包括以下五個(gè)方面:
- 優(yōu)秀的數(shù)據(jù)結(jié)構(gòu)踏揣;
- 一個(gè)緊湊的,快速的,二進(jìn)制數(shù)據(jù)格式连锯;
- 一個(gè)容器文件,用來(lái)存儲(chǔ)持久化數(shù)據(jù)贾铝;
- RPC遠(yuǎn)程過(guò)程調(diào)用汇恤;
- 集成最簡(jiǎn)單的動(dòng)態(tài)語(yǔ)言。讀取或者寫入數(shù)據(jù)文件访锻,使用或?qū)崿F(xiàn)RPC協(xié)議均不需要代碼實(shí)現(xiàn)褪尝。對(duì)于靜態(tài)- - 語(yǔ)言編寫的話需要實(shí)現(xiàn);
二期犬、Avro優(yōu)點(diǎn)
- 二進(jìn)制消息河哑,性能好/效率高
- 使用JSON描述模式
- 模式和數(shù)據(jù)統(tǒng)一存儲(chǔ),消息自描述龟虎,不需要生成stub代碼(支持生成IDL)
- RPC調(diào)用在握手階段交換模式定義
- 包含完整的客戶端/服務(wù)端堆棧璃谨,可快速實(shí)現(xiàn)RPC
- 支持同步和異步通信
- 支持動(dòng)態(tài)消息
- 模式定義允許定義數(shù)據(jù)的排序(序列化時(shí)會(huì)遵循這個(gè)順序)
- 提供了基于Jetty內(nèi)核的服務(wù)基于Netty的服務(wù)
三、Avro Json格式介紹
{
????"namespace":?"com.avro.bean",
????"type":?"record",
????"name":?"UserBehavior",
????"fields":?[
????????{"name":?"userId",?"type":?"long"},
????????{"name":?"itemId",??"type":?"long"},
????????{"name":?"categoryId",?"type":?"int"},
????????{"name":?"behavior",?"type":?"string"},
????????{"name":?"timestamp",?"type":?"long"}
????]
}
- namespace : 要生成的目錄
- type :類型 avro 使用 record
- name : 會(huì)自動(dòng)生成對(duì)應(yīng)的對(duì)象
- fields : 要指定的字段
注意: 創(chuàng)建的文件后綴名一定要叫 avsc
四鲤妥、使用Java自定義序列化到kafka
???????? 首先我們先使用 Java編寫Kafka客戶端寫入數(shù)據(jù)和消費(fèi)數(shù)據(jù)佳吞。
4.1 準(zhǔn)備測(cè)試數(shù)據(jù)
543462,1715,1464116,pv,1511658000662867,2244074,1575622,pv,1511658000561558,3611281,965809,pv,1511658000894923,3076029,1879194,pv,1511658000834377,4541270,3738615,pv,1511658000315321,942195,4339722,pv,1511658000625915,1162383,570735,pv,1511658000
4.2 自定義Avro 序列化和反序列化
首先我們需要實(shí)現(xiàn)2個(gè)類分別為Serializer
和Deserializer
分別是序列化和反序列化
package?com.avro.AvroUtil;
import?com.avro.bean.UserBehavior;
import?org.apache.avro.io.BinaryDecoder;
import?org.apache.avro.io.BinaryEncoder;
import?org.apache.avro.io.DecoderFactory;
import?org.apache.avro.io.EncoderFactory;
import?org.apache.avro.specific.SpecificDatumReader;
import?org.apache.avro.specific.SpecificDatumWriter;
import?org.apache.kafka.common.serialization.Deserializer;
import?org.apache.kafka.common.serialization.Serializer;
import?java.io.ByteArrayInputStream;
import?java.io.ByteArrayOutputStream;
import?java.io.IOException;
import?java.util.Map;
/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.avro.AvroUtil
?*?@File?:SimpleAvroSchemaJava.java
?*?@date?2021/1/8?20:02?*/
/**
?*??自定義序列化和反序列化?*/
public?class?SimpleAvroSchemaJava?implements?Serializer<UserBehavior>,?Deserializer<UserBehavior>?{
????
????@Override
????public?void?configure(Map<String,??>?map,?boolean?b)?{
????}
????//序列化方法
????@Override
????public?byte[]?serialize(String?s,?UserBehavior?userBehavior)?{
????????//?創(chuàng)建序列化執(zhí)行器
????????SpecificDatumWriter<UserBehavior>?writer?=?new?SpecificDatumWriter<UserBehavior>(userBehavior.getSchema());
?????????//?創(chuàng)建一個(gè)流?用存儲(chǔ)序列化后的二進(jìn)制文件
????????ByteArrayOutputStream?out?=?new?ByteArrayOutputStream();
????????//?創(chuàng)建二進(jìn)制編碼器
????????BinaryEncoder?encoder?=?EncoderFactory.get().directBinaryEncoder(out,?null);
????????try?{
????????????//?數(shù)據(jù)入都流中
????????????writer.write(userBehavior,?encoder);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????????return?out.toByteArray();
????}
????@Override
????public?void?close()?{
????}
????//反序列化
????@Override
????public?UserBehavior?deserialize(String?s,?byte[]?bytes)?{
????????//?用來(lái)保存結(jié)果數(shù)據(jù)
????????UserBehavior?userBehavior?=?new?UserBehavior();
????????//?創(chuàng)建輸入流用來(lái)讀取二進(jìn)制文件
????????ByteArrayInputStream?arrayInputStream?=?new?ByteArrayInputStream(bytes);
????????//?創(chuàng)建輸入序列化執(zhí)行器
????????SpecificDatumReader<UserBehavior>?stockSpecificDatumReader?=?new?SpecificDatumReader<UserBehavior>(userBehavior.getSchema());
????????//創(chuàng)建二進(jìn)制解碼器
????????BinaryDecoder?binaryDecoder?=?DecoderFactory.get().directBinaryDecoder(arrayInputStream,?null);
????????try?{
????????????//?數(shù)據(jù)讀取
????????????userBehavior=stockSpecificDatumReader.read(null,?binaryDecoder);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????????//?結(jié)果返回
????????return?userBehavior;
????}
}
4.3 創(chuàng)建序列化對(duì)象
package?com.avro.kafka;
import?com.avro.bean.UserBehavior;
import?org.apache.kafka.clients.producer.KafkaProducer;
import?org.apache.kafka.clients.producer.ProducerRecord;
import?java.io.BufferedReader;
import?java.io.FileReader;
import?java.util.ArrayList;
import?java.util.List;
import?java.util.Properties;
/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.avro.kafka
?*?@File?:UserBehaviorProducerKafka.java
?*?@date?2021/1/8?20:14?*/
public?class?UserBehaviorProducerKafka?{
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????//?獲取數(shù)據(jù)
????????List<UserBehavior>?data?=?getData();
????????//?創(chuàng)建配置文件
????????Properties?props?=?new?Properties();
????????props.setProperty("bootstrap.servers",?"192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092");
????????props.setProperty("key.serializer",?"org.apache.kafka.common.serialization.StringSerializer");
????????props.setProperty("value.serializer",?"com.avro.AvroUtil.SimpleAvroSchemaJava");
????????//?創(chuàng)建kafka的生產(chǎn)者
????????KafkaProducer<String,?UserBehavior>?userBehaviorProducer?=?new?KafkaProducer<String,?UserBehavior>(props);
????????//?循環(huán)遍歷數(shù)據(jù)
????????for?(UserBehavior?userBehavior?:?data)?{
????????????ProducerRecord<String,?UserBehavior>?producerRecord?=?new?ProducerRecord<String,?UserBehavior>("UserBehaviorKafka",?userBehavior);
????????????userBehaviorProducer.send(producerRecord);
????????????System.out.println("數(shù)據(jù)寫入成功"+data);
????????????Thread.sleep(1000);
????????}
????}
????public?static?List<UserBehavior>?getData()?{
????????ArrayList<UserBehavior>?userBehaviors?=?new?ArrayList<UserBehavior>();
????????try?{
????????????BufferedReader?br?=?new?BufferedReader(new?FileReader(new?File("data/UserBehavior.csv")));
????????????String?line?=?"";
????????????while?((line?=?br.readLine())?!=?null)?{
????????????????String[]?split?=?line.split(",");
?????????????userBehaviors.add(?new?UserBehavior(Long.parseLong(split[0]),?Long.parseLong(split[1]),?Integer.parseInt(split[2]),?split[3],?Long.parseLong(split[4])));
????????????}
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????????return?userBehaviors;
????}
}
注意:value.serializer
一定要指定我們自己寫好的那個(gè)反序列化類,負(fù)責(zé)會(huì)無(wú)效
4.4 創(chuàng)建反序列化對(duì)象
package?com.avro.kafka;
import?com.avro.bean.UserBehavior;
import?org.apache.kafka.clients.consumer.ConsumerRecord;
import?org.apache.kafka.clients.consumer.ConsumerRecords;
import?org.apache.kafka.clients.consumer.KafkaConsumer;
import?java.util.Arrays;
import?java.util.Properties;
/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.avro.kafka
?*?@File?:UserBehaviorConsumer.java
?*?@date?2021/1/8?20:58?*/
public?class?UserBehaviorConsumer?{
????public?static?void?main(String[]?args)?{
????????Properties?prop?=?new?Properties();
????????prop.put("bootstrap.servers",?"192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092");
????????prop.put("group.id",?"UserBehavior");
????????prop.put("key.deserializer",?"org.apache.kafka.common.serialization.StringDeserializer");
????????//?設(shè)置反序列化類為自定義的avro反序列化類
????????prop.put("value.deserializer",?"com.avro.AvroUtil.SimpleAvroSchemaJava");
????????KafkaConsumer<String,?UserBehavior>?consumer?=?new?KafkaConsumer<String,?UserBehavior>(prop);
????????consumer.subscribe(Arrays.asList("UserBehaviorKafka"));
????????while?(true)?{
????????????ConsumerRecords<String,?UserBehavior>?poll?=?consumer.poll(1000);
????????????for?(ConsumerRecord<String,?UserBehavior>?stringStockConsumerRecord?:?poll)?{
????????????????System.out.println(stringStockConsumerRecord.value());
????????????}
????????}
????}
}
4.5 啟動(dòng)運(yùn)行
創(chuàng)建kafkaTopic 和啟動(dòng)一個(gè)消費(fèi)者
#?創(chuàng)建topic
./kafka-topics.sh?--create?--zookeeper?node01:2181,node02:2181,node03:2181?--replication-factor?2?--partitions?3?--topic?UserBehaviorKafka
#?模擬消費(fèi)者
./kafka-console-consumer.sh?--from-beginning?--topic?UserBehaviorKafka?--zookeeper?node01:2181,node02:2node03:2181
五棉安、Flink 實(shí)現(xiàn)Avro自定義序列化到Kafka
???????? 到這里好多小伙們就說(shuō)我Java實(shí)現(xiàn)了那Flink 不就改一下Consumer 和Producer 不就完了嗎底扳?
5.1 準(zhǔn)備數(shù)據(jù)
543462,1715,1464116,pv,1511658000662867,2244074,1575622,pv,1511658000561558,3611281,965809,pv,1511658000894923,3076029,1879194,pv,1511658000834377,4541270,3738615,pv,1511658000315321,942195,4339722,pv,1511658000625915,1162383,570735,pv,1511658000
5.2 創(chuàng)建Flink自定義Avro序列化和反序列化
package?com.avro.AvroUtil;
import?com.avro.bean.UserBehavior;
import?com.typesafe.sslconfig.ssl.FakeChainedKeyStore;
import?org.apache.avro.io.BinaryDecoder;
import?org.apache.avro.io.BinaryEncoder;
import?org.apache.avro.io.DecoderFactory;
import?org.apache.avro.io.EncoderFactory;
import?org.apache.avro.specific.SpecificDatumReader;
import?org.apache.avro.specific.SpecificDatumWriter;
import?org.apache.flink.api.common.serialization.DeserializationSchema;
import?org.apache.flink.api.common.serialization.SerializationSchema;
import?org.apache.flink.api.common.typeinfo.TypeInformation;
import?org.apache.kafka.common.serialization.Deserializer;
import?org.apache.kafka.common.serialization.Serializer;
import?java.io.ByteArrayInputStream;
import?java.io.ByteArrayOutputStream;
import?java.io.IOException;
import?java.util.Map;
/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.avro.AvroUtil
?*?@File?:SimpleAvroSchemaFlink.java
?*?@date?2021/1/8?20:02?*/
/**
?*??自定義序列化和反序列化?*/
public?class?SimpleAvroSchemaFlink?implements?DeserializationSchema<UserBehavior>,?SerializationSchema<UserBehavior>?{
?
????@Override
????public?byte[]?serialize(UserBehavior?userBehavior)?{
????????//?創(chuàng)建序列化執(zhí)行器
????????SpecificDatumWriter<UserBehavior>?writer?=?new?SpecificDatumWriter<UserBehavior>(userBehavior.getSchema());
????????//?創(chuàng)建一個(gè)流?用存儲(chǔ)序列化后的二進(jìn)制文件
????????ByteArrayOutputStream?out?=?new?ByteArrayOutputStream();
????????//?創(chuàng)建二進(jìn)制編碼器
????????BinaryEncoder?encoder?=?EncoderFactory.get().directBinaryEncoder(out,?null);
????????try?{
????????????//?數(shù)據(jù)入都流中
????????????writer.write(userBehavior,?encoder);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????????return?out.toByteArray();
????}
????@Override
????public?TypeInformation<UserBehavior>?getProducedType()?{
??????return?TypeInformation.of(UserBehavior.class);
????}
????@Override
????public?UserBehavior?deserialize(byte[]?bytes)?throws?IOException?{
????????//?用來(lái)保存結(jié)果數(shù)據(jù)
????????UserBehavior?userBehavior?=?new?UserBehavior();
????????//?創(chuàng)建輸入流用來(lái)讀取二進(jìn)制文件
????????ByteArrayInputStream?arrayInputStream?=?new?ByteArrayInputStream(bytes);
????????//?創(chuàng)建輸入序列化執(zhí)行器
????????SpecificDatumReader<UserBehavior>?stockSpecificDatumReader?=?new?SpecificDatumReader<UserBehavior>(userBehavior.getSchema());
????????//創(chuàng)建二進(jìn)制解碼器
????????BinaryDecoder?binaryDecoder?=?DecoderFactory.get().directBinaryDecoder(arrayInputStream,?null);
????????try?{
????????????//?數(shù)據(jù)讀取
????????????userBehavior=stockSpecificDatumReader.read(null,?binaryDecoder);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????????//?結(jié)果返回
????????return?userBehavior;
????}
????@Override
????public?boolean?isEndOfStream(UserBehavior?userBehavior)?{
????????return?false;
????}
}
5.3 創(chuàng)建Flink Comsumer 反序列化
package?com.avro.FlinkKafka
import?com.avro.AvroUtil.{SimpleAvroSchemaFlink}
import?com.avro.bean.UserBehavior
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import?java.util.Properties
/**
?*?@Package?com.avro.FlinkKafka
?*?@File :UserBehaviorConsumerFlink.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2021/1/8?21:18
?*?@version?V1.0?*/
object?UserBehaviorConsumerFlink?{
??def?main(args:?Array[String]):?Unit?=?{
????//1.構(gòu)建流處理運(yùn)行環(huán)境
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????env.setParallelism(1)?//?設(shè)置并行度1?方便后面測(cè)試
????//?2.設(shè)置kafka?配置信息
????val?prop?=?new?Properties
????prop.put("bootstrap.servers",?"192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092")
????prop.put("group.id",?"UserBehavior")
????prop.put("key.deserializer",?"org.apache.kafka.common.serialization.StringDeserializer")
????//?設(shè)置反序列化類為自定義的avro反序列化類
????prop.put("value.deserializer",?"com.avro.AvroUtil.SimpleAvroSchemaFlink")
????//????val?kafka:?FlinkKafkaConsumer011[String]?=??new?FlinkKafkaConsumer011[String]("UserBehaviorKafka",?new?SimpleStringSchema(),?prop)
????//?3.構(gòu)建Kafka?連接器
????val?kafka:?FlinkKafkaConsumer011[UserBehavior]?=?new?FlinkKafkaConsumer011[UserBehavior]("UserBehavior",?new?SimpleAvroSchemaFlink(),?prop)
????//4.設(shè)置Flink層最新的數(shù)據(jù)開始消費(fèi)
????kafka.setStartFromLatest()
????//5.基于kafka構(gòu)建數(shù)據(jù)源
????val?data:?DataStream[UserBehavior]?=?env.addSource(kafka)
????//6.結(jié)果打印
????data.print()
????env.execute("UserBehaviorConsumerFlink")
??}
}
5.4 創(chuàng)建Flink Producer 序列化
package?com.avro.FlinkKafka
import?com.avro.AvroUtil.SimpleAvroSchemaFlink
import?com.avro.bean.UserBehavior
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import?java.util.Properties
/**
?*?@Package?com.avro.FlinkKafka
?*?@File :UserBehaviorProducerFlink.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2021/1/8?21:38
?*?@version?V1.0?*/
object?UserBehaviorProducerFlink?{
??def?main(args:?Array[String]):?Unit?=?{
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????val?value?=?env.readTextFile("./data/UserBehavior.csv")
????val?users:?DataStream[UserBehavior]?=?value.map(row?=>?{
??????val?arr?=?row.split(",")
??????val?behavior?=?new?UserBehavior()
??????behavior.setUserId(arr(0).toLong)
??????behavior.setItemId(arr(1).toLong)
??????behavior.setCategoryId(arr(2).toInt)
??????behavior.setBehavior(arr(3))
??????behavior.setTimestamp(arr(4).toLong)
??????behavior
????})
????val?prop?=?new?Properties()
????prop.setProperty("bootstrap.servers",?"node01:9092,node02:9092,node03:9092")
????//4.連接Kafka
????val?producer:?FlinkKafkaProducer011[UserBehavior]?=?new?FlinkKafkaProducer011[UserBehavior]("UserBehaviorKafka",?new?SimpleAvroSchemaFlink(),?prop)
????//5.將數(shù)據(jù)打入kafka
????users.addSink(producer)
????//6.執(zhí)行任務(wù)
????env.execute("UserBehaviorProducerFlink")
??}
}
5.5 啟動(dòng)運(yùn)行
需要源碼的請(qǐng)去GitHub 自行下載 ?https://github.com/lhh2002/Flink_Avro
小結(jié)
???????? ?其實(shí)我在實(shí)現(xiàn)這個(gè)功能的時(shí)候也是蒙的,不會(huì)難道就不學(xué)了嗎垂券,肯定不是呀花盐。我在5.2提出的那個(gè)問(wèn)題的時(shí)候其實(shí)是我自己親身經(jīng)歷過(guò)的。首先遇到了問(wèn)題不要想著怎么放棄菇爪,而是想想怎么解決算芯,當(dāng)時(shí)我的思路看源碼
看別人寫的。最后經(jīng)過(guò)不懈的努力也終成功了凳宙,我在這里為大家提供Flink面試題
需要的朋友可以去下面GitHub去下載熙揍,信自己,努力和汗水總會(huì)能得到回報(bào)的氏涩。我是大數(shù)據(jù)老哥届囚,我們下期見~~~
資源獲取 獲取Flink面試題有梆,Spark面試題,程序員必備軟件意系,hive面試題泥耀,Hadoop面試題,Docker面試題蛔添,簡(jiǎn)歷模板等資源請(qǐng)去
GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigData
Gitee 自行下載?https://gitee.com/li_hey_hey/Framework-Of-BigData