Kafka Streams 運(yùn)算操作詳解

同為流處理技術(shù)伏钠,Kafka Streams的API和更為人所熟知的Spark Streaming在很多方面有不少相似之處,比如大量類(lèi)似的算子。因此隧哮,對(duì)于一個(gè)有Spark經(jīng)驗(yàn)的工程師來(lái)說(shuō),編寫(xiě)一個(gè)Kafka Streams應(yīng)用應(yīng)該上手很快座舍。


流處理概念

數(shù)據(jù)流(stream)

Stream是KafkaStream中最重要的概念沮翔,代表大小沒(méi)有限制且不斷更新的數(shù)據(jù)集(unbounded, continuously updating data set),一個(gè)Stream是一個(gè)有序的曲秉,允許重復(fù)的不可變的數(shù)據(jù)集采蚀,被定義為一個(gè)容錯(cuò)的鍵值對(duì)疲牵。

處理拓?fù)洌╬rocessor topology)

處理拓?fù)涫钦麄€(gè)流處理的運(yùn)算邏輯,可以理解為一個(gè)圖(graph)結(jié)構(gòu)榆鼠,其中的頂點(diǎn)是各個(gè)流處理器(stream processor)纲爸,數(shù)據(jù)流(stream)則構(gòu)成了邊。

構(gòu)建方法:

StreamsBuilder builder = new StreamsBuilder();

在實(shí)例化StreamBuilder來(lái)構(gòu)建了處理拓?fù)浜箬得撸涂梢詮腒afka抽取topic中的數(shù)據(jù):

KStream<String, String> textLines = builder.stream("stream-in");

并調(diào)用算子對(duì)數(shù)據(jù)流進(jìn)行變換(map/filter/...)

流處理器(processor)

流處理器代表了處理拓?fù)渲械牟煌襟E缩焦,并完成相應(yīng)的數(shù)據(jù)轉(zhuǎn)換。
這里责静,我們著重介紹使用DSL API的由不同算子(map/filter/selectKey...)來(lái)定義流處理器的方法袁滥。

無(wú)狀態(tài) & 有狀態(tài)

無(wú)狀態(tài)(stateless)

無(wú)狀態(tài)(stateless)意味著數(shù)據(jù)轉(zhuǎn)換的結(jié)果僅僅取決于你目前正在處理的數(shù)據(jù)。

  • map
  • filter

有狀態(tài)(stateful)

有狀態(tài)(stateful)意味著數(shù)據(jù)轉(zhuǎn)換的結(jié)果依賴(lài)于一個(gè)外部的狀態(tài)(state)灾螃,比如一個(gè)外部的表格题翻。

  • join
  • 各類(lèi)聚合(aggregation)操作
  • count (需要之前數(shù)據(jù)記錄的信息)

窗口操作(windowing operation)

窗口操作(windowing operation)不同于Kafka Streams基礎(chǔ)的per-record處理方法,它支持將一段時(shí)間內(nèi)的數(shù)據(jù)集合起來(lái)腰鬼,再一起處理嵌赠。
比如某些場(chǎng)合希望分析所有產(chǎn)生于前一天/前一個(gè)小時(shí)的數(shù)據(jù)。

KStream與KTable

這部分內(nèi)容可以參考另一篇博文什么是KStream和KTable熄赡。


流處理算子

mapValues & map

輸入一條記錄姜挺,輸出一條經(jīng)過(guò)變換的記錄。

mapValues map
只影響value 可能對(duì)key和value都有影響
不會(huì)導(dǎo)致re-partition 可能導(dǎo)致re-partition
KStream和KTable都可以調(diào)用 只有KStream可以調(diào)用
uppercased = stream.mapValues(value -> value.toUpperCase())

flatMapValues & flatMap

輸入一條記錄彼硫,輸出0條炊豪,1條或更多經(jīng)過(guò)變換的記錄。

flatMapValues flatMap
只影響value 可能對(duì)key和value都有影響
不會(huì)導(dǎo)致re-partition 可能導(dǎo)致re-partition
只有KStream可以調(diào)用 只有KStream可以調(diào)用
words = sentences.flatMapValues(value -> Arrays.asList(value.split(" ")))

/*
                                          (1, "hello")
    (1, "hello world")   -->  
                                          (1, "world")
*/

filter

輸入一條記錄拧篮,輸出0條或1條記錄词渤。

并不會(huì)改變記錄本身,也不會(huì)觸發(fā)re-partitioning串绩。
KStream與KTable都可以調(diào)用缺虐。

KStream<String, Long> positives = stream.filter((key, value) -> value > 0)

selectKey

為數(shù)據(jù)記錄賦一個(gè)新的key(從舊的key和value轉(zhuǎn)換而來(lái))

可能觸發(fā)re-partitioning

rekeyed = stream.selectKey((key, vlaue) -> key.substring(0, 1))

groupByKey

在aggregation操作之前

僅對(duì)KStream適用。

  • KStream → KGroupedStream
stream.groupByKey().count()

groupBy

相當(dāng)于selectKey() + groupByKey()

  • KStream → KGroupedStream
  • KTable → KGroupedTable

可能觸發(fā)re-partitioning

stream.groupBy(
    (key, value) -> value,
    Serialized.with(
      Serdes.String(), /* key (note: type was modified) */
      Serdes.String())  /* value */
  );
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末礁凡,一起剝皮案震驚了整個(gè)濱河市高氮,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌把篓,老刑警劉巖纫溃,帶你破解...
    沈念sama閱讀 216,651評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異韧掩,居然都是意外死亡紊浩,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)坊谁,“玉大人费彼,你說(shuō)我怎么就攤上這事】谏郑” “怎么了箍铲?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,931評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)鬓椭。 經(jīng)常有香客問(wèn)我颠猴,道長(zhǎng),這世上最難降的妖魔是什么小染? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,218評(píng)論 1 292
  • 正文 為了忘掉前任翘瓮,我火速辦了婚禮,結(jié)果婚禮上裤翩,老公的妹妹穿的比我還像新娘资盅。我一直安慰自己,他們只是感情好踊赠,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,234評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布呵扛。 她就那樣靜靜地躺著,像睡著了一般筐带。 火紅的嫁衣襯著肌膚如雪今穿。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,198評(píng)論 1 299
  • 那天伦籍,我揣著相機(jī)與錄音荣赶,去河邊找鬼。 笑死鸽斟,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的利诺。 我是一名探鬼主播富蓄,決...
    沈念sama閱讀 40,084評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼慢逾!你這毒婦竟也來(lái)了立倍?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,926評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤侣滩,失蹤者是張志新(化名)和其女友劉穎口注,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體君珠,經(jīng)...
    沈念sama閱讀 45,341評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡寝志,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,563評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片材部。...
    茶點(diǎn)故事閱讀 39,731評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡毫缆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出乐导,到底是詐尸還是另有隱情苦丁,我是刑警寧澤,帶...
    沈念sama閱讀 35,430評(píng)論 5 343
  • 正文 年R本政府宣布物臂,位于F島的核電站旺拉,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏棵磷。R本人自食惡果不足惜蛾狗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,036評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望泽本。 院中可真熱鬧淘太,春花似錦、人聲如沸规丽。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,676評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)赌莺。三九已至冰抢,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間艘狭,已是汗流浹背挎扰。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,829評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留巢音,地道東北人遵倦。 一個(gè)月前我還...
    沈念sama閱讀 47,743評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像官撼,于是被迫代替她去往敵國(guó)和親梧躺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,629評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容