flink學(xué)習(xí)(2)flink Streaming從kafka接收數(shù)據(jù)

今天測試今天嘗試了flink從kafka獲取數(shù)據(jù)的測試程序編寫乌询,主要測試的kafka發(fā)送json的接收例子,嘗試了幾個(gè)kafka的DeserializationSchema(反序列化模式),包括了SimpleStringSchema,JSONKeyValueDeserializationSchema以及自定義DeserializationSchema.代碼通過Flink計(jì)算引擎從Kafka相應(yīng)的Topic中讀取數(shù)據(jù)做瞪,通過FlinkKafkaConsumer010來實(shí)現(xiàn).

1.SimpleStringSchema

官網(wǎng)上有SimpleStringSchema的示例,它可以構(gòu)建DataStream[String],返回的就是kafka生產(chǎn)者發(fā)過來的信息右冻。

以下是代碼:

package whTest

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._

object Fromkafka {
  case class Person (name:String,sex:String,age:Int)
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //開啟checkPoint, Time interval between state checkpoints 5000 milliseconds.
    /**
      * 如果我們啟用了Flink的Checkpint機(jī)制装蓬,
      * 那么Flink Kafka Consumer將會從指定的Topic中消費(fèi)消息,
      * 然后定期地將Kafka offsets信息纱扭、狀態(tài)信息以及其他的操作信息進(jìn)行Checkpint牍帚。
      * 所以,如果Flink作業(yè)出故障了乳蛾,F(xiàn)link將會從最新的Checkpint中恢復(fù)暗赶,
      * 并且從上一次偏移量開始讀取Kafka中消費(fèi)消息鄙币。
      */
    env.enableCheckpointing(5000)
    import org.apache.flink.streaming.api.TimeCharacteristic
    //設(shè)置系統(tǒng)基本時(shí)間特性為事件時(shí)間
   // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   //kafka連接配置信息
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    val kafkaStream = env
      .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties))
      .print()
    
    // execute program
    env.execute("kafkaTest")
  }
}

測試結(jié)果:

{"ID_Link":"11111","CarNum":100,"speed":10.0}//即為生產(chǎn)者發(fā)送的信息

如果我們需要將消息進(jìn)行封裝,DataStream[String]->DataStream[MyType]蹂随,可以在DataStream[String]后追加map函數(shù)進(jìn)行轉(zhuǎn)換,當(dāng)然也可以使用下文的自定義DeserializationSchema十嘿。

2. JSONKeyValueDeserializationSchema

JSONKeyValueDeserializationSchema可以將序列化的JSON轉(zhuǎn)換為ObjectNode對象,可以用objectNode.get("field")訪問字段岳锁。新建JSONKeyValueDeserializationSchema需要帶一個(gè)boolean類型參數(shù)绩衷,為true表示需要指明是否需要包含“元數(shù)據(jù)”、偏移量激率、分區(qū)和主題等信息咳燕,為false表明只需要數(shù)據(jù)。
以下是代碼和結(jié)果:

package whTest

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._

object Fromkafka {
  case class Person (name:String,sex:String,age:Int)
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //開啟checkPoint, Time interval between state checkpoints 5000 milliseconds.
    /**
      * 如果我們啟用了Flink的Checkpint機(jī)制乒躺,
      * 那么Flink Kafka Consumer將會從指定的Topic中消費(fèi)消息招盲,
      * 然后定期地將Kafka offsets信息、狀態(tài)信息以及其他的操作信息進(jìn)行Checkpint聪蘸。
      * 所以宪肖,如果Flink作業(yè)出故障了,F(xiàn)link將會從最新的Checkpint中恢復(fù)健爬,
      * 并且從上一次偏移量開始讀取Kafka中消費(fèi)消息控乾。
      */
    env.enableCheckpointing(5000)
    import org.apache.flink.streaming.api.TimeCharacteristic
    //設(shè)置系統(tǒng)基本時(shí)間特性為事件時(shí)間
   // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    // only required for Kafka 0.8
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    val kafkaStream = env
      .addSource(new FlinkKafkaConsumer010("test", new JSONKeyValueDeserializationSchema(true), properties))
      .print()
  
    // execute program
    env.execute("kafkaTest")
  }
}

結(jié)果:

  // new JSONKeyValueDeserializationSchema(true)   send json :{"name":"limei","age":12,"sex":"f"}        get : {"value":{"name":"limei","age":12,"sex":"f"},"metadata":{"offset":10,"topic":"test","partition":0}}
    //  new JSONKeyValueDeserializationSchema(false)   send json :{"name":"limei","age":12,"sex":"f"}        get :{"value":{"name":"limei","age":12,"sex":"f"}}

3.自定義DeserializationSchema

自定義DeserializationSchema需要實(shí)現(xiàn)DeserializationSchema接口,這一部分代碼可以參考官方代碼org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer娜遵。
我需要實(shí)現(xiàn)的是將從kafka獲取到的json數(shù)據(jù)轉(zhuǎn)化為我需要的自定義pojo類(VideoData)蜕衡。
主要是要實(shí)現(xiàn)DeserializationSchema方法的deserialize方法,這個(gè)方法的輸入是byte[] message類型设拟,我們需要將其轉(zhuǎn)換為String類型慨仿,然后通過Json工具類解析成POJO類。這里我使用的是google的Gson框架纳胧。

以下是DeserializationSchema類和POJO類代碼

package whTest;

import com.google.gson.Gson;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.nio.CharBuffer;
import java.nio.charset.CharsetDecoder;

public class VideoDataDeSerializer implements DeserializationSchema<VideoData> {
    private static final long serialVersionUID = 1L;
    @Override
    public VideoData deserialize(byte[] message) throws IOException {
        ByteBuffer buffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN);
             
        String mess = this.byteBuffertoString(buffer);
                //封裝為POJO類
        Gson gson = new Gson();
        VideoData data = gson.fromJson(mess, VideoData.class);
        return data;
    }

    @Override
    public boolean isEndOfStream(VideoData nextElement) {
        return false;
    }

    @Override
    public TypeInformation<VideoData> getProducedType() {
        return null;
    }

    /**
     * 將ByteBuffer類型轉(zhuǎn)換為String類型
     * @param buffer
     * @return
     */
    public static String byteBuffertoString(ByteBuffer buffer)
    {
        Charset charset = null;
        CharsetDecoder decoder = null;
        CharBuffer charBuffer = null;
        try
        {
            charset = Charset.forName("UTF-8");
            decoder = charset.newDecoder();
            // charBuffer = decoder.decode(buffer);//用這個(gè)的話镰吆,只能輸出來一次結(jié)果,第二次顯示為空
            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
            return charBuffer.toString();
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
            return "";
        }
    }
}

POJO類:

package whTest;

public class VideoData {
    public VideoData(String ID_Link,int CarNum,float speed){
        this.ID_Link =ID_Link;
        this.CarNum = CarNum;
        this.speed = speed;
    }
    private String ID_Link;
    private int CarNum;
    private float speed;

    public void setID_Link(String ID_Link) {
        this.ID_Link = ID_Link;
    }

    public void setCarNum(int carNum) {
        CarNum = carNum;
    }

    public void setSpeed(float speed) {
        this.speed = speed;
    }

    public String getID_Link() {
        return ID_Link;
    }

    public int getCarNum() {
        return CarNum;
    }

    public float getSpeed() {
        return speed;
    }
}

主函數(shù)只需要把DeserializationSchema類修改為自定義的VideoDataDeSerializer跑慕,當(dāng)kafka生產(chǎn)者發(fā)送過來用VideoData轉(zhuǎn)換的Json類型時(shí)万皿,返回的就是我們需要的DataStream[VideoData]。這就不需要后面再用map函數(shù)將String轉(zhuǎn)換為VideoData類型了核行。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末牢硅,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子芝雪,更是在濱河造成了極大的恐慌减余,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,681評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件惩系,死亡現(xiàn)場離奇詭異位岔,居然都是意外死亡如筛,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,205評論 3 399
  • 文/潘曉璐 我一進(jìn)店門赃承,熙熙樓的掌柜王于貴愁眉苦臉地迎上來妙黍,“玉大人悴侵,你說我怎么就攤上這事瞧剖。” “怎么了可免?”我有些...
    開封第一講書人閱讀 169,421評論 0 362
  • 文/不壞的土叔 我叫張陵抓于,是天一觀的道長。 經(jīng)常有香客問我浇借,道長捉撮,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,114評論 1 300
  • 正文 為了忘掉前任妇垢,我火速辦了婚禮巾遭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘闯估。我一直安慰自己灼舍,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,116評論 6 398
  • 文/花漫 我一把揭開白布涨薪。 她就那樣靜靜地躺著骑素,像睡著了一般狡汉。 火紅的嫁衣襯著肌膚如雪莱革。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,713評論 1 312
  • 那天诽嘉,我揣著相機(jī)與錄音侠姑,去河邊找鬼创橄。 笑死,一個(gè)胖子當(dāng)著我的面吹牛莽红,可吹牛的內(nèi)容都是我干的妥畏。 我是一名探鬼主播,決...
    沈念sama閱讀 41,170評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼船老,長吁一口氣:“原來是場噩夢啊……” “哼咖熟!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起柳畔,我...
    開封第一講書人閱讀 40,116評論 0 277
  • 序言:老撾萬榮一對情侶失蹤馍管,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后薪韩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體确沸,經(jīng)...
    沈念sama閱讀 46,651評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡捌锭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,714評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了罗捎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片观谦。...
    茶點(diǎn)故事閱讀 40,865評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖桨菜,靈堂內(nèi)的尸體忽然破棺而出豁状,到底是詐尸還是另有隱情,我是刑警寧澤倒得,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布泻红,位于F島的核電站,受9級特大地震影響霞掺,放射性物質(zhì)發(fā)生泄漏谊路。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,211評論 3 336
  • 文/蒙蒙 一菩彬、第九天 我趴在偏房一處隱蔽的房頂上張望缠劝。 院中可真熱鬧,春花似錦骗灶、人聲如沸惨恭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,699評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽喉恋。三九已至,卻和暖如春母廷,著一層夾襖步出監(jiān)牢的瞬間轻黑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,814評論 1 274
  • 我被黑心中介騙來泰國打工琴昆, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留氓鄙,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,299評論 3 379
  • 正文 我出身青樓业舍,卻偏偏與公主長得像抖拦,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子舷暮,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,870評論 2 361