記一次Flink寫入Kafka坑點

最近做了一個將結果數(shù)據(jù)寫入到Kafka的需求厨内,sink部分代碼如下:

  1. val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](

  2. sinkTopic, new StringKeyedSerializationSchema,producerConfig, sinkSemantic)

  3. ds.addSink(kafkaProducer).setParallelism(sinkParallelism)

其中StringKeyedSerializationSchema是自定義的實現(xiàn)KeyedSerializationSchema的序列化器椒功,用于序列化寫入kafka的key/value, 任務也上線了,在flink web頁面看到任務各項指標一切正常,也測試消費寫入kafka的數(shù)據(jù),得到的結果也如預期一樣肖方,想著萬事大吉了,so easy~
過了一會kafka中間件的同事找過來說:你這個寫入topic的數(shù)據(jù)怎么只有這幾個分區(qū)未状,其他分區(qū)都沒有數(shù)據(jù)寫入~

image

什么情況俯画?任務看著一切都ok啊,怎么就有分區(qū)沒有數(shù)據(jù)寫入呢司草?馬上google一下數(shù)據(jù)寫入kafka的分區(qū)策略:

  1. 如果指定寫入分區(qū)艰垂,就將數(shù)據(jù)寫入分區(qū)

  2. 如果沒有指定分區(qū),指定了key, 那么就會按照key hash對分區(qū)取模方式發(fā)送

  3. 如果既沒指定分區(qū)又沒指定key,那么就會以輪序的方式發(fā)送

而實際情況是有幾個分區(qū)一條數(shù)據(jù)都沒有寫入埋虹,并且在StringKeyedSerializationSchema也指定了每條寫入數(shù)據(jù)的key, 那么就一定是第一種情況了猜憎,在FlinkKafkaProducer011中指定了數(shù)據(jù)寫入的分區(qū),馬上翻看源碼搔课,在FlinkKafkaProducer011的invoke方法里面有這么一個邏輯:

  1. if (flinkKafkaPartitioner != null) {

  2. record = new ProducerRecord<>(

  3. targetTopic,

  4. flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),

  5. timestamp,

  6. serializedKey,

  7. serializedValue);

  8. } else {

  9. record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);

  10. }

很明顯就是執(zhí)行了if邏輯胰柑,也是就flinkKafkaPartitioner不為空,在構建ProducerRecord時調(diào)用了flinkKafkaPartitioner.partition的方法,指定寫入的partition柬讨,而flinkKafkaPartitioner是在FlinkKafkaProducer011初始化的時候給的默認值FlinkFixedPartitioner崩瓤,在看下其partition方式:

  1. partitions[parallelInstanceId % partitions.length]

parallelInstanceId表示當前task的index,partitions表示kafka的topic的分區(qū)踩官,該邏輯求得的分區(qū)就是根據(jù)當前task index 對partition取余得到的却桶,而我設置的sinkParallelism是4,topic的分區(qū)數(shù)是6蔗牡,到這里就比較明朗肾扰,取余永遠不會得到4、5蛋逾,所以就導致分區(qū)4、5一直沒有數(shù)據(jù)寫入窗悯。如果設置的parallism設置比kafka的分區(qū)數(shù)還要大区匣,就會導致得到的partition值大于topic實際partition。
那么解決方式有一下幾種:

  1. parallism設置成為與kafka topic 分區(qū)數(shù)一致大小

  2. 將flinkKafkaPartitioner指定為空蒋院,并且制定寫入kafka的key

  3. 將flinkKafkaPartitioner與寫入的key都置為空

  4. 自定義一個FlinkKafkaPartitioner亏钩,重寫partition方法

最終選擇第三種較為簡單的方案,修改代碼:

  1. val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](

  2. sinkTopic, new StringKeyedSerializationSchema,producerConfig,Optional.ofNullable(null), sinkSemantic,5)

同時將StringKeyedSerializationSchema的serializeKey返回值設置為null. 再次運行任務欺旧,查看kafka 數(shù)據(jù)寫入情況姑丑,所有分區(qū)都有數(shù)據(jù)寫入。最終破案辞友。

image
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末栅哀,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子称龙,更是在濱河造成了極大的恐慌留拾,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鲫尊,死亡現(xiàn)場離奇詭異痴柔,居然都是意外死亡,警方通過查閱死者的電腦和手機疫向,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門咳蔚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人搔驼,你說我怎么就攤上這事谈火。” “怎么了匙奴?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵堆巧,是天一觀的道長。 經(jīng)常有香客問我,道長谍肤,這世上最難降的妖魔是什么啦租? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮荒揣,結果婚禮上篷角,老公的妹妹穿的比我還像新娘。我一直安慰自己系任,他們只是感情好恳蹲,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著俩滥,像睡著了一般嘉蕾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上霜旧,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天错忱,我揣著相機與錄音,去河邊找鬼挂据。 笑死以清,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的崎逃。 我是一名探鬼主播掷倔,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼个绍!你這毒婦竟也來了勒葱?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤障贸,失蹤者是張志新(化名)和其女友劉穎错森,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體篮洁,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡涩维,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了袁波。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瓦阐。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖篷牌,靈堂內(nèi)的尸體忽然破棺而出睡蟋,到底是詐尸還是另有隱情,我是刑警寧澤枷颊,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布戳杀,位于F島的核電站该面,受9級特大地震影響,放射性物質發(fā)生泄漏信卡。R本人自食惡果不足惜隔缀,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望傍菇。 院中可真熱鬧猾瘸,春花似錦、人聲如沸丢习。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽咐低。三九已至揽思,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間见擦,已是汗流浹背绰更。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留锡宋,地道東北人。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓特恬,卻偏偏與公主長得像执俩,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子癌刽,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 大致可以通過上述情況進行排除 1.kafka服務器問題 查看日志是否有報錯役首,網(wǎng)絡訪問問題等。 2. kafka p...
    生活的探路者閱讀 7,587評論 0 10
  • 在軟件項目的生命周期中显拜,開發(fā)只占開始的一小部分衡奥,大部分時間我們要對項目進行運行維護,Kafka相關的項目也不例外远荠。...
    柴詩雨閱讀 8,181評論 0 7
  • 一.Kafka發(fā)送消息的整體流程: 步驟:1.ProducerInterceptors對消息進行攔截矮固。2.Seri...
    陳陽001閱讀 3,791評論 0 5
  • 一、Kafka簡介 Kafka (科技術語)譬淳。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)档址,它可以處理消費者規(guī)...
    邊學邊記閱讀 1,736評論 0 14
  • Kafka的基本概念 BrokerKafka集群中包含多個服務器,其中每個服務器稱為一個broker邻梆。有一點需要注...
    frmark閱讀 372評論 0 0