flink kafka的Sink或者Source 使用avro序列化方式

1.環(huán)境情況

flink 1.11
kafka 2.4.0

2. 操作準(zhǔn)備

a. avro 的jar 包 avro-tools-1.7.7.jar
b. 對象的 XXX.avsc 文件 ,描述序列化各個字段的屬性類型讥巡。

namespace:命名路徑
type:固定寫法 record
name:java對象類名
fields:字段描述

{
    "namespace":"com.test.avro",
    "type":"record",
    "name":"User",
    "fields":[
        {
            "name":"name",
            "type":"string"
        },
        {
            "name":"age",
            "type":[
                "int",
                "null"
            ]
        },
        {
            "name":"email",
            "type":[
                "string",
                "null"
            ]
        },
        {
            "name":"custom_props",
            "type":[
                "null",
                {
                    "type":"map",
                    "values":"string"
                }
            ]
        }
    ]
}

3. 執(zhí)行步驟

a第一步.使用windows 自動生成 XXX.java 文件
命令:C:\Users\colin>java -jar F:\repository\org\apache\avro\avro-tools\1.7.7\avro-tools-1.7.7.jar compile -string schema C:\Users\colin\IdeaProjects\logs_behavior_analysis_v2\ods_logs_kafka\src\main\resources\avro\Logs.avsc ./
java -jar + tools路徑+ copile -string schema + XXX.avsc路徑+ XXX.java存放路徑

注意:不加-string 則默認(rèn)使用 CharSequence類型绢片,遇到map就會自動加密(base64加密方式)幢哨,本人再這里搞了很久都解析不出map中的字段僵刮。所以建議使用 -string指定字段屬性為String。

image.png

生成的java文件截圖如下:


image.png

b第二步:放入程序中末荐,引用此對象
sink的序列化

   AvroSerializationSchema<Logs> avroSerializationSchema = AvroSerializationSchema.forSpecific(Logs.class);
//Logs.class 這個就是你生成的 XXX.java文件
        final FlinkKafkaProducer010<Logs> myProducer = new FlinkKafkaProducer010<>(
                topic,
                avroSerializationSchema,
                prop);

Source 反序列化

AvroDeserializationSchema<Logs> avroDeserializationSchema = AvroDeserializationSchema.forSpecific(Logs.class);

        DataStream<Logs> kafkaStream = env.addSource(new FlinkKafkaConsumer010<Logs>("colin_topic", avroDeserializationSchema, properties))

4. tools maven

通過這樣tools下載包

   <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-tools</artifactId>
            <version>1.7.7</version>
        </dependency>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子邦蜜,更是在濱河造成了極大的恐慌,老刑警劉巖亥至,帶你破解...
    沈念sama閱讀 222,183評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件悼沈,死亡現(xiàn)場離奇詭異,居然都是意外死亡姐扮,警方通過查閱死者的電腦和手機絮供,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來茶敏,“玉大人壤靶,你說我怎么就攤上這事【” “怎么了贮乳?”我有些...
    開封第一講書人閱讀 168,766評論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長恬惯。 經(jīng)常有香客問我向拆,道長,這世上最難降的妖魔是什么宿崭? 我笑而不...
    開封第一講書人閱讀 59,854評論 1 299
  • 正文 為了忘掉前任亲铡,我火速辦了婚禮,結(jié)果婚禮上葡兑,老公的妹妹穿的比我還像新娘奖蔓。我一直安慰自己,他們只是感情好讹堤,可當(dāng)我...
    茶點故事閱讀 68,871評論 6 398
  • 文/花漫 我一把揭開白布吆鹤。 她就那樣靜靜地躺著,像睡著了一般洲守。 火紅的嫁衣襯著肌膚如雪疑务。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,457評論 1 311
  • 那天梗醇,我揣著相機與錄音知允,去河邊找鬼。 笑死叙谨,一個胖子當(dāng)著我的面吹牛温鸽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,999評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼涤垫,長吁一口氣:“原來是場噩夢啊……” “哼姑尺!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蝠猬,我...
    開封第一講書人閱讀 39,914評論 0 277
  • 序言:老撾萬榮一對情侶失蹤切蟋,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后榆芦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體柄粹,經(jīng)...
    沈念sama閱讀 46,465評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,543評論 3 342
  • 正文 我和宋清朗相戀三年歧杏,在試婚紗的時候發(fā)現(xiàn)自己被綠了镰惦。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,675評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡犬绒,死狀恐怖旺入,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情凯力,我是刑警寧澤茵瘾,帶...
    沈念sama閱讀 36,354評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站咐鹤,受9級特大地震影響拗秘,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜祈惶,卻給世界環(huán)境...
    茶點故事閱讀 42,029評論 3 335
  • 文/蒙蒙 一雕旨、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧捧请,春花似錦凡涩、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至可款,卻和暖如春育韩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背闺鲸。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評論 1 274
  • 我被黑心中介騙來泰國打工筋讨, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人摸恍。 一個月前我還...
    沈念sama閱讀 49,091評論 3 378
  • 正文 我出身青樓版仔,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蛮粮,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,685評論 2 360