最近做了一個將結果數(shù)據(jù)寫入到Kafka的需求厨内,sink部分代碼如下:
val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](
sinkTopic, new StringKeyedSerializationSchema,producerConfig, sinkSemantic)
ds.addSink(kafkaProducer).setParallelism(sinkParallelism)
其中StringKeyedSerializationSchema是自定義的實現(xiàn)KeyedSerializationSchema的序列化器椒功,用于序列化寫入kafka的key/value, 任務也上線了,在flink web頁面看到任務各項指標一切正常,也測試消費寫入kafka的數(shù)據(jù),得到的結果也如預期一樣肖方,想著萬事大吉了,so easy~
過了一會kafka中間件的同事找過來說:你這個寫入topic的數(shù)據(jù)怎么只有這幾個分區(qū)未状,其他分區(qū)都沒有數(shù)據(jù)寫入~
什么情況俯画?任務看著一切都ok啊,怎么就有分區(qū)沒有數(shù)據(jù)寫入呢司草?馬上google一下數(shù)據(jù)寫入kafka的分區(qū)策略:
如果指定寫入分區(qū)艰垂,就將數(shù)據(jù)寫入分區(qū)
如果沒有指定分區(qū),指定了key, 那么就會按照key hash對分區(qū)取模方式發(fā)送
如果既沒指定分區(qū)又沒指定key,那么就會以輪序的方式發(fā)送
而實際情況是有幾個分區(qū)一條數(shù)據(jù)都沒有寫入埋虹,并且在StringKeyedSerializationSchema也指定了每條寫入數(shù)據(jù)的key, 那么就一定是第一種情況了猜憎,在FlinkKafkaProducer011中指定了數(shù)據(jù)寫入的分區(qū),馬上翻看源碼搔课,在FlinkKafkaProducer011的invoke方法里面有這么一個邏輯:
if (flinkKafkaPartitioner != null) {
record = new ProducerRecord<>(
targetTopic,
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
timestamp,
serializedKey,
serializedValue);
} else {
record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
}
很明顯就是執(zhí)行了if邏輯胰柑,也是就flinkKafkaPartitioner不為空,在構建ProducerRecord時調(diào)用了flinkKafkaPartitioner.partition的方法,指定寫入的partition柬讨,而flinkKafkaPartitioner是在FlinkKafkaProducer011初始化的時候給的默認值FlinkFixedPartitioner崩瓤,在看下其partition方式:
partitions[parallelInstanceId % partitions.length]
parallelInstanceId表示當前task的index,partitions表示kafka的topic的分區(qū)踩官,該邏輯求得的分區(qū)就是根據(jù)當前task index 對partition取余得到的却桶,而我設置的sinkParallelism是4,topic的分區(qū)數(shù)是6蔗牡,到這里就比較明朗肾扰,取余永遠不會得到4、5蛋逾,所以就導致分區(qū)4、5一直沒有數(shù)據(jù)寫入窗悯。如果設置的parallism設置比kafka的分區(qū)數(shù)還要大区匣,就會導致得到的partition值大于topic實際partition。
那么解決方式有一下幾種:
parallism設置成為與kafka topic 分區(qū)數(shù)一致大小
將flinkKafkaPartitioner指定為空蒋院,并且制定寫入kafka的key
將flinkKafkaPartitioner與寫入的key都置為空
自定義一個FlinkKafkaPartitioner亏钩,重寫partition方法
最終選擇第三種較為簡單的方案,修改代碼:
val kafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](
sinkTopic, new StringKeyedSerializationSchema,producerConfig,Optional.ofNullable(null), sinkSemantic,5)
同時將StringKeyedSerializationSchema的serializeKey返回值設置為null. 再次運行任務欺旧,查看kafka 數(shù)據(jù)寫入情況姑丑,所有分區(qū)都有數(shù)據(jù)寫入。最終破案辞友。