僅僅是一段代碼
Kafka Streams作為集成在Kafka系統(tǒng)中的一個(gè)API,在配合Kafka來進(jìn)行流處理時(shí)有著得天獨(dú)厚的優(yōu)勢被啼。
不同于Spark Streaming和Flink等,使用Kafka Streams不需要單獨(dú)的集群,只要在代碼中調(diào)用Kafka Streams的API即可,并且天生可以享受Kafka本身帶來的優(yōu)勢:高可擴(kuò)展性罕容,高容錯(cuò)等。
簡單來說稿饰,Kafka Streams應(yīng)用就是一段Java/Scala代碼锦秒,僅此而已。
這使得Kafka Streams應(yīng)用像其它Java程序一樣湘纵,可以通過命令行運(yùn)行脂崔,也可以通過Puppet滤淳,Chef等進(jìn)行部署梧喷,又或者通過Docker容器作為微服務(wù)運(yùn)行。
這里,我們展示如何在IntelliJ中編寫Kafka Streams程序铺敌,與已經(jīng)存在的Kafka集群連接汇歹,并方便地進(jìn)行擴(kuò)展。
如何配置Kafka Streams
以WordCount為例偿凭,具體代碼可以參考Kafka Streams 入門實(shí)例1 WordCount产弹。
Kafka Streams應(yīng)用需要調(diào)用Kafka Streams API,并與Kafka集群進(jìn)行交互弯囊。實(shí)際的交互操作在內(nèi)部是通過Consumer API以及Producer API來實(shí)現(xiàn)的痰哨。
需要配置的兩個(gè)最重要的參數(shù)為:
application.id
這個(gè)參數(shù)定義了Kafka Streams應(yīng)用的ID。同時(shí)匾嘱,應(yīng)用在調(diào)用Consumer API來和集群交互時(shí)斤斧,使用的consumer group id也將與這個(gè)值相同。因此霎烙,如果中途修改了這個(gè)參數(shù)的值撬讽,consumer group id也將隨之變化,并將失去之前從Kafka消費(fèi)的數(shù)據(jù)悬垃。bootstrap.servers
這個(gè)參數(shù)即是Kafka集群的位置游昼。
如何擴(kuò)展Kafka Streams
Kafka Streams的另一個(gè)特點(diǎn)就是,作為一段Java/Scala代碼尝蠕,在進(jìn)行擴(kuò)展時(shí)烘豌,代碼本身不需要作任何變動(dòng)。
當(dāng)我們運(yùn)行編寫好的應(yīng)用時(shí)看彼,可以在運(yùn)行日志中查看當(dāng)前的任務(wù)扇谣,這里以將擁有2個(gè)partition的topic作為源topic的WordCount為例:
可以看到當(dāng)前的任務(wù)有0_0和0_1。
此時(shí)只有一個(gè)WordCount實(shí)例在運(yùn)行闲昭。為了增加一個(gè)實(shí)例罐寨,只需要再次點(diǎn)擊綠色運(yùn)行按鈕。之后會出現(xiàn)另一個(gè)終端窗口序矩。此時(shí)再檢查兩邊的日志鸯绿,會發(fā)現(xiàn)第一個(gè)中顯示的當(dāng)前任務(wù)變?yōu)椋?/p>
而第二個(gè)則是:
顯然兩個(gè)任務(wù)被重新分配到了這兩個(gè)不同的實(shí)例上。