同為流處理技術(shù)伏钠,Kafka Streams的API和更為人所熟知的Spark Streaming在很多方面有不少相似之處,比如大量類(lèi)似的算子。因此隧哮,對(duì)于一個(gè)有Spark經(jīng)驗(yàn)的工程師來(lái)說(shuō),編寫(xiě)一個(gè)Kafka Streams應(yīng)用應(yīng)該上手很快座舍。
流處理概念
數(shù)據(jù)流(stream)
Stream是KafkaStream中最重要的概念沮翔,代表大小沒(méi)有限制且不斷更新的數(shù)據(jù)集(unbounded, continuously updating data set),一個(gè)Stream是一個(gè)有序的曲秉,允許重復(fù)的不可變的數(shù)據(jù)集采蚀,被定義為一個(gè)容錯(cuò)的鍵值對(duì)疲牵。
處理拓?fù)洌╬rocessor topology)
處理拓?fù)涫钦麄€(gè)流處理的運(yùn)算邏輯,可以理解為一個(gè)圖(graph)結(jié)構(gòu)榆鼠,其中的頂點(diǎn)是各個(gè)流處理器(stream processor)纲爸,數(shù)據(jù)流(stream)則構(gòu)成了邊。
構(gòu)建方法:
StreamsBuilder builder = new StreamsBuilder();
在實(shí)例化StreamBuilder
來(lái)構(gòu)建了處理拓?fù)浜箬得撸涂梢詮腒afka抽取topic中的數(shù)據(jù):
KStream<String, String> textLines = builder.stream("stream-in");
并調(diào)用算子對(duì)數(shù)據(jù)流進(jìn)行變換(map/filter/...)
流處理器(processor)
流處理器代表了處理拓?fù)渲械牟煌襟E缩焦,并完成相應(yīng)的數(shù)據(jù)轉(zhuǎn)換。
這里责静,我們著重介紹使用DSL API的由不同算子(map/filter/selectKey...)來(lái)定義流處理器的方法袁滥。
無(wú)狀態(tài) & 有狀態(tài)
無(wú)狀態(tài)(stateless)
無(wú)狀態(tài)(stateless)意味著數(shù)據(jù)轉(zhuǎn)換的結(jié)果僅僅取決于你目前正在處理的數(shù)據(jù)。
- map
- filter
有狀態(tài)(stateful)
有狀態(tài)(stateful)意味著數(shù)據(jù)轉(zhuǎn)換的結(jié)果依賴(lài)于一個(gè)外部的狀態(tài)(state)灾螃,比如一個(gè)外部的表格题翻。
- join
- 各類(lèi)聚合(aggregation)操作
- count (需要之前數(shù)據(jù)記錄的信息)
窗口操作(windowing operation)
窗口操作(windowing operation)不同于Kafka Streams基礎(chǔ)的per-record處理方法,它支持將一段時(shí)間內(nèi)的數(shù)據(jù)集合起來(lái)腰鬼,再一起處理嵌赠。
比如某些場(chǎng)合希望分析所有產(chǎn)生于前一天/前一個(gè)小時(shí)的數(shù)據(jù)。
KStream與KTable
這部分內(nèi)容可以參考另一篇博文什么是KStream和KTable熄赡。
流處理算子
mapValues & map
輸入一條記錄姜挺,輸出一條經(jīng)過(guò)變換的記錄。
mapValues | map |
---|---|
只影響value | 可能對(duì)key和value都有影響 |
不會(huì)導(dǎo)致re-partition | 可能導(dǎo)致re-partition |
KStream和KTable都可以調(diào)用 | 只有KStream可以調(diào)用 |
uppercased = stream.mapValues(value -> value.toUpperCase())
flatMapValues & flatMap
輸入一條記錄彼硫,輸出0條炊豪,1條或更多經(jīng)過(guò)變換的記錄。
flatMapValues | flatMap |
---|---|
只影響value | 可能對(duì)key和value都有影響 |
不會(huì)導(dǎo)致re-partition | 可能導(dǎo)致re-partition |
只有KStream可以調(diào)用 | 只有KStream可以調(diào)用 |
words = sentences.flatMapValues(value -> Arrays.asList(value.split(" ")))
/*
(1, "hello")
(1, "hello world") -->
(1, "world")
*/
filter
輸入一條記錄拧篮,輸出0條或1條記錄词渤。
并不會(huì)改變記錄本身,也不會(huì)觸發(fā)re-partitioning串绩。
KStream與KTable都可以調(diào)用缺虐。
KStream<String, Long> positives = stream.filter((key, value) -> value > 0)
selectKey
為數(shù)據(jù)記錄賦一個(gè)新的key(從舊的key和value轉(zhuǎn)換而來(lái))
可能觸發(fā)re-partitioning
rekeyed = stream.selectKey((key, vlaue) -> key.substring(0, 1))
groupByKey
在aggregation操作之前
僅對(duì)KStream適用。
- KStream → KGroupedStream
stream.groupByKey().count()
groupBy
相當(dāng)于selectKey() + groupByKey()
- KStream → KGroupedStream
- KTable → KGroupedTable
可能觸發(fā)re-partitioning
stream.groupBy(
(key, value) -> value,
Serialized.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String()) /* value */
);