今天測試今天嘗試了flink從kafka獲取數(shù)據(jù)的測試程序編寫乌询,主要測試的kafka發(fā)送json的接收例子,嘗試了幾個(gè)kafka的DeserializationSchema(反序列化模式),包括了SimpleStringSchema,JSONKeyValueDeserializationSchema以及自定義DeserializationSchema.代碼通過Flink計(jì)算引擎從Kafka相應(yīng)的Topic中讀取數(shù)據(jù)做瞪,通過FlinkKafkaConsumer010來實(shí)現(xiàn).
1.SimpleStringSchema
官網(wǎng)上有SimpleStringSchema的示例,它可以構(gòu)建DataStream[String],返回的就是kafka生產(chǎn)者發(fā)過來的信息右冻。
以下是代碼:
package whTest
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._
object Fromkafka {
case class Person (name:String,sex:String,age:Int)
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//開啟checkPoint, Time interval between state checkpoints 5000 milliseconds.
/**
* 如果我們啟用了Flink的Checkpint機(jī)制装蓬,
* 那么Flink Kafka Consumer將會從指定的Topic中消費(fèi)消息,
* 然后定期地將Kafka offsets信息纱扭、狀態(tài)信息以及其他的操作信息進(jìn)行Checkpint牍帚。
* 所以,如果Flink作業(yè)出故障了乳蛾,F(xiàn)link將會從最新的Checkpint中恢復(fù)暗赶,
* 并且從上一次偏移量開始讀取Kafka中消費(fèi)消息鄙币。
*/
env.enableCheckpointing(5000)
import org.apache.flink.streaming.api.TimeCharacteristic
//設(shè)置系統(tǒng)基本時(shí)間特性為事件時(shí)間
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//kafka連接配置信息
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val kafkaStream = env
.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties))
.print()
// execute program
env.execute("kafkaTest")
}
}
測試結(jié)果:
{"ID_Link":"11111","CarNum":100,"speed":10.0}//即為生產(chǎn)者發(fā)送的信息
如果我們需要將消息進(jìn)行封裝,DataStream[String]->DataStream[MyType]蹂随,可以在DataStream[String]后追加map函數(shù)進(jìn)行轉(zhuǎn)換,當(dāng)然也可以使用下文的自定義DeserializationSchema十嘿。
2. JSONKeyValueDeserializationSchema
JSONKeyValueDeserializationSchema可以將序列化的JSON轉(zhuǎn)換為ObjectNode對象,可以用objectNode.get("field")訪問字段岳锁。新建JSONKeyValueDeserializationSchema需要帶一個(gè)boolean類型參數(shù)绩衷,為true表示需要指明是否需要包含“元數(shù)據(jù)”、偏移量激率、分區(qū)和主題等信息咳燕,為false表明只需要數(shù)據(jù)。
以下是代碼和結(jié)果:
package whTest
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._
object Fromkafka {
case class Person (name:String,sex:String,age:Int)
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//開啟checkPoint, Time interval between state checkpoints 5000 milliseconds.
/**
* 如果我們啟用了Flink的Checkpint機(jī)制乒躺,
* 那么Flink Kafka Consumer將會從指定的Topic中消費(fèi)消息招盲,
* 然后定期地將Kafka offsets信息、狀態(tài)信息以及其他的操作信息進(jìn)行Checkpint聪蘸。
* 所以宪肖,如果Flink作業(yè)出故障了,F(xiàn)link將會從最新的Checkpint中恢復(fù)健爬,
* 并且從上一次偏移量開始讀取Kafka中消費(fèi)消息控乾。
*/
env.enableCheckpointing(5000)
import org.apache.flink.streaming.api.TimeCharacteristic
//設(shè)置系統(tǒng)基本時(shí)間特性為事件時(shí)間
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val kafkaStream = env
.addSource(new FlinkKafkaConsumer010("test", new JSONKeyValueDeserializationSchema(true), properties))
.print()
// execute program
env.execute("kafkaTest")
}
}
結(jié)果:
// new JSONKeyValueDeserializationSchema(true) send json :{"name":"limei","age":12,"sex":"f"} get : {"value":{"name":"limei","age":12,"sex":"f"},"metadata":{"offset":10,"topic":"test","partition":0}}
// new JSONKeyValueDeserializationSchema(false) send json :{"name":"limei","age":12,"sex":"f"} get :{"value":{"name":"limei","age":12,"sex":"f"}}
3.自定義DeserializationSchema
自定義DeserializationSchema需要實(shí)現(xiàn)DeserializationSchema接口,這一部分代碼可以參考官方代碼org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer娜遵。
我需要實(shí)現(xiàn)的是將從kafka獲取到的json數(shù)據(jù)轉(zhuǎn)化為我需要的自定義pojo類(VideoData)蜕衡。
主要是要實(shí)現(xiàn)DeserializationSchema方法的deserialize方法,這個(gè)方法的輸入是byte[] message類型设拟,我們需要將其轉(zhuǎn)換為String類型慨仿,然后通過Json工具類解析成POJO類。這里我使用的是google的Gson框架纳胧。
以下是DeserializationSchema類和POJO類代碼
package whTest;
import com.google.gson.Gson;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.nio.CharBuffer;
import java.nio.charset.CharsetDecoder;
public class VideoDataDeSerializer implements DeserializationSchema<VideoData> {
private static final long serialVersionUID = 1L;
@Override
public VideoData deserialize(byte[] message) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN);
String mess = this.byteBuffertoString(buffer);
//封裝為POJO類
Gson gson = new Gson();
VideoData data = gson.fromJson(mess, VideoData.class);
return data;
}
@Override
public boolean isEndOfStream(VideoData nextElement) {
return false;
}
@Override
public TypeInformation<VideoData> getProducedType() {
return null;
}
/**
* 將ByteBuffer類型轉(zhuǎn)換為String類型
* @param buffer
* @return
*/
public static String byteBuffertoString(ByteBuffer buffer)
{
Charset charset = null;
CharsetDecoder decoder = null;
CharBuffer charBuffer = null;
try
{
charset = Charset.forName("UTF-8");
decoder = charset.newDecoder();
// charBuffer = decoder.decode(buffer);//用這個(gè)的話镰吆,只能輸出來一次結(jié)果,第二次顯示為空
charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
return charBuffer.toString();
}
catch (Exception ex)
{
ex.printStackTrace();
return "";
}
}
}
POJO類:
package whTest;
public class VideoData {
public VideoData(String ID_Link,int CarNum,float speed){
this.ID_Link =ID_Link;
this.CarNum = CarNum;
this.speed = speed;
}
private String ID_Link;
private int CarNum;
private float speed;
public void setID_Link(String ID_Link) {
this.ID_Link = ID_Link;
}
public void setCarNum(int carNum) {
CarNum = carNum;
}
public void setSpeed(float speed) {
this.speed = speed;
}
public String getID_Link() {
return ID_Link;
}
public int getCarNum() {
return CarNum;
}
public float getSpeed() {
return speed;
}
}
主函數(shù)只需要把DeserializationSchema類修改為自定義的VideoDataDeSerializer跑慕,當(dāng)kafka生產(chǎn)者發(fā)送過來用VideoData轉(zhuǎn)換的Json類型時(shí)万皿,返回的就是我們需要的DataStream[VideoData]。這就不需要后面再用map函數(shù)將String轉(zhuǎn)換為VideoData類型了核行。