通信機制 Storm vs. Flink

Storm通信機制

Storm和Flink都是流計算系統(tǒng)中比較成熟的開源實現(xiàn)芍秆,在一些流計算平臺選型的文章中介衔,兩者也常常被拿來重點比較,但是兩個平臺都在持續(xù)更新渐尿,一些選型比較和壓測結果文章都可能已經過時徙融。而一個系統(tǒng)的核心實現(xiàn)往往比較少改動窄瘟,因此本文嘗試從底層實現(xiàn)的層面來比較兩者的異同。

在上一篇文章中锌历,我們自頂向下地了解了Flink的內部通信機制贮庞,本文按照同樣的結構來說明Storm的通信機制。代碼基于tag v1.2.2究西。

當我們編寫一個Storm topology時窗慎,也是在描繪一個有向圖。圖中的邊相當于由一系列數(shù)據記錄組成的數(shù)據流卤材,圖中的頂點相當于我們對數(shù)據流的處理遮斥。

先簡單介紹Storm的術語使用,并和Flink作對比,方便熟悉其中任意一個系統(tǒng)的同學更快地理解這些術語扇丛。術語介紹部分主要參考來源為Storm官方文檔Concepts
Understanding the Parallelism of a Storm Topology

  • Spout: Spout是有向運行圖中沒有上游的節(jié)點,是數(shù)據流的起點术吗。相當于Flink中的SourceOperator。
  • Bolt: 對數(shù)據流進行處理的節(jié)點帆精,相當于Flink中的TransformationOperator
  • Component:Bolt和Spout的統(tǒng)稱 较屿,相當于Flink中的Operator
  • Executor: 代表了Component的并行度隧魄。每個executor都會有一個工作線程,負責處理用戶定義的業(yè)務邏輯隘蝎,一個發(fā)送線程购啄,負責把數(shù)據發(fā)送到下游隊列。一個Executor中可以運行同一個component的多個實例末贾。(相當于Flink中的SubTask)
  • Task: task代表一個component實例, 同一個Execotor中的task會被串行執(zhí)行(要區(qū)別于并行度)
  • Worker: 代表一個進程闸溃,一個Work中可以運行多個Executor,相當于Flink中的TaskManager

術語對比:

Storm Flink
Worker TaskManager
Executor Task
Spout SourceOperator
Bolt TransformationOperator
Component Operator
Stream Stream
Task SubTask
? Chain

Storm的Tasks和Flink的Chain并不能等價,在storm中多個Task可以運行在一個Executor上拱撵,但這些Task指的是屬于同一個component的不同實例辉川,兩個Task之間是是等價關系而不是上下游關系。Flink中一個Task中可以有多個Subtask , 在Flink中的Chain是指有上下游關系的且滿足一定條件的多個Subtask可以成鏈的方式被當成一個Task處理拴测。

Component間的通信實現(xiàn)

以Storm的WordCount為例乓旗,此例用Storm的底層API編寫(相對Trident來說的底層),源碼可以從Storm官方的Storm-starter模塊獲取集索。

storm編程模型

Storm的編程模型由Spout,Bolt和Stream組成屿愚。在圖中split和count為Bolt, 灰色箭頭為stream。圖中總共有三種不同的component, 其中spout的實現(xiàn)類為RandomSentenceSpout务荆,負責隨機地從一個字符串數(shù)組中選擇句子妆距。split的功能是對句子進行分詞。count功能是計算單詞的出現(xiàn)次數(shù)函匕。功能上與Flink的WordCount例子大同小異娱据。

為了圖例簡潔和簡化模型,在圖中這三種executor的并行度分別為2盅惜、2中剩、1,和代碼中并行度不一致抒寂。這里沒有開啟acker和metric功能, 因此本文沒有畫出__acker和__metric兩種系統(tǒng)實現(xiàn)的bolt结啼,這acker部分會在分布式事務的對比中分析。

Storm沒有實現(xiàn)Flink那樣的Chain功能屈芜,上下游component不會位于同一個線程中郊愧,因此Storm的上下游component通信只有兩種方式,本地線程通信或遠程線程通信井佑。

Storm內部每個executor都會有一個接收隊列和一個發(fā)送隊列糕珊,一個工作線程和一個發(fā)送線程。每個worker內部都會有一個發(fā)送隊列毅糟,一個接收線程和一個發(fā)送線程。Storm中的隊列按職能分了三類:分別為executor接收隊列澜公,executor發(fā)送隊列和worker發(fā)送隊列姆另。這三類隊列的消費者以sleep()的方式不斷輪詢來接收消息喇肋,接收消息后的處理結果publish到下游隊列。

storm內部消息傳遞

有很多Storm的技術文章中畫出了worker的接收線程和topology.receiver.buffer.size, 事實上在Storm1.0.x中 worker的接收線程已被移除迹辐,改為push的方式蝶防,在Storm server接收到消息后直接反序列化然后寫到各個executor的receive-queue中。

Buffer的讀寫

Storm實現(xiàn)的生產者消費者模式使用到的緩存隊列為LAMX Disruptor中的RingBuffer明吩。LAMX Disruptor號稱最快的無鎖并發(fā)框架间学。在Storm的使用場景中,flush到RingBuffer時使用的等待策略為TimeoutBlockingWaitStrategy是通過ReentrantLock加鎖阻塞的, 且flush到RingBuffer前也會通過鎖來避免并發(fā)調用publishDirect(ArrayList<Object> objs, boolean block)方法印荔。

Buffer寫入

以Bolt->Bolt數(shù)據傳輸為例低葫,Bolt中的tuple發(fā)送主要通過OutputCollector實現(xiàn), 當一個bolt在execute()方法中調用了OutputCollector.emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) 后仍律,它的調用棧如下圖所示:
粉色部分為clojure實現(xiàn)嘿悬,黃色部分是java實現(xiàn)。調用棧的最后調用了java實現(xiàn)的DisruptorQueue.publish(Object obj) 方法水泉。DisruptorQueue是Storm對LAMX disruptor的封裝善涨,主要增加了批量發(fā)布和超時發(fā)布功能。由上一章節(jié)可知草则,tuple會被發(fā)布到一個名為executor$id-send-queue的DisruptorQueue中钢拧。

Buffer寫入

DisruptorQueue的發(fā)布的代碼邏輯比較復雜,主要通過ThreadLocalInserter和Flusher分別實現(xiàn)Tuple的批量發(fā)布和超時發(fā)布炕横。
批量發(fā)布部分主要實現(xiàn)如下:DisruptorQueue的公共方法publish(Object obj)中源内,先后調用 ThreadLocalInserter的add和flush方法

/**
變量解釋:
batcher即為ThreadLocalInserter對象實例
**/
public void publish(Object obj) {
    ... 
    batcher.add(obj);
    batcher.flush(false);
}
  • batcher.add(obj)方法的功能是把tuple放進當前批次的緩存中,如果當前批次_currentBatch滿了看锉,且當前沒有發(fā)送失改批次(為了保證順序性姿锭,未發(fā)送成功的批次需要先發(fā)送),會觸發(fā)flush到disruptor的ringbuffer伯铣。但不保證flush成功呻此,如果因為ringbuffer空間不足flush失敗,會把失敗的批次放進無界隊列_overflow中緩存腔寡。注意:add方法不阻塞焚鲜。

  • batcher.flush(boolean block)的功能是觸發(fā)發(fā)送失敗的批次flush到ringbuffer中。該方法還可能在定時調度的Flusher線程中被調用放前。注意:block==false時忿磅,flush方法不阻塞。

綜上凭语,RingBuffer中的發(fā)布單元為一個批次大小的tuple(而不是單個tuple)葱她,publish方法不會阻塞,_overflow是個無界非阻塞隊列似扔。因此吨些,如果下游處理不及時且上游持續(xù)生產數(shù)據時搓谆,可能因為_overflow中緩存的對象過多而發(fā)生OOM。Storm提供了兩種方式來避免這種情況豪墅,留在后續(xù)Storm和flink實現(xiàn)對比再討論泉手。

/**
變量解釋:
_currentBatch為ArrayList<Object>對象實例, 用于緩存當前批次的tuple
_overflow為ConcurrentLinkedList<ArrayList<Object>>,無界隊列,用于緩存發(fā)送失敗的tuple batch
_inputBatchSize為當前批次的最大緩存tuple數(shù)

**/
    public synchronized void add(Object obj) {
    ...
        //如果當前批次已滿
        if (_currentBatch.size() >= _inputBatchSize) {
            boolean flushed = false;
            //如果當前批次已滿且緩存中沒有發(fā)送失敗的批次
            if (_overflow.isEmpty()) {
                try {
                  //發(fā)布到disruptor的ringbuffer中,非阻塞偶器,當ringbuffer空間不足時拋出InsufficientCapacityException
                    publishDirect(_currentBatch, false);
                    _overflowCount.addAndGet(0 - _currentBatch.size());
                    _currentBatch.clear();
                    flushed = true;
                } catch (InsufficientCapacityException e) {
                    //Ignored we will flush later
                }
            }
            //如果當前批次已滿 且≌睹取(緩存中有發(fā)送失敗的批次 或 當前批次發(fā)送失敗)
            if (!flushed) {
          //把當前批次加入到未發(fā)送失敗的緩存隊列中
                _overflow.add(_currentBatch);
                _currentBatch = new ArrayList<Object>(_inputBatchSize);
            }
        }
    }

    //May be called by a background thread
    public void flush(boolean block) {
        if (block) {
            _flushLock.lock();
        } else if (!_flushLock.tryLock()) {
            //Someone else if flushing so don't do anything
            return;
        }
        try {
            while (!_overflow.isEmpty()) {
                publishDirect(_overflow.peek(), block);
                _overflowCount.addAndGet(0 - _overflow.poll().size());
            }
        } catch (InsufficientCapacityException e) {
        //Ignored we should not block
        } finally {
            _flushLock.unlock();
        }
    }

到這里屏轰,看過Flink通信機制的同學應該明白“Flink的反壓機制實現(xiàn)得更天然”的說法了颊郎。

DisruptorQueue是底層實現(xiàn),直接暴露給用戶的發(fā)送數(shù)據到下游的接口是output collector亭枷。Storm output collector的實現(xiàn)相較Flink混亂袭艺,存在兩個問題:

  1. Collector命名比較混亂,例如有的實現(xiàn)類叫XXCollectorImpl,有的又不帶Impl后綴 , ISpoutOutputCollector和IOutputCollector是兩個完全不同的接口叨粘,兩者不在同一繼承樹中,分別實現(xiàn)Spout的數(shù)據發(fā)送接口和Bolt的數(shù)據發(fā)送接口猾编。不通過關鍵字搜索比較難找出全部實現(xiàn)了“tuple發(fā)送”功能的代碼。
  2. Storm的collector實現(xiàn)耦合了tuple的發(fā)送邏輯和tuple的ack fail邏輯升敲,因為ack/fail邏輯不同而劃分了兩種主要的OutputCollector , 分別是負責發(fā)送Spout tuple的ISpoutOutputCollector答倡、負責發(fā)送IRichBolt tuple的IOutputCollector、其它Collector基本上是通過委托模式基于這兩個Collector實現(xiàn)的驴党。例如帶有自動自動ack/fail tuple功能的IBasicOutputCollector,這個類把tuple發(fā)送邏輯委托給OutputCollector,而java實現(xiàn)的OutputCollector最后會委托給由clojure代碼executor.clj中實現(xiàn)的IOutputCollector匿名類瘪撇。

Storm有兩個批量處理框架,相關框架的實現(xiàn)類分別以Transactional和Trident開頭港庄,Transactional開頭的批處理實現(xiàn)已經被標記為廢棄倔既,現(xiàn)主要維護Trident的實現(xiàn)。這兩個API中提供給用戶編程使用的ITridentSpout和ITransactionalSpout 最后都會在Bolt所在的executor中調用鹏氧,所以批處理編程API中的Spout使用的Collector實際父類或委托類為IOutputCollector渤涌。

output collector實現(xiàn)

Buffer讀取

RingBuffer的讀取和處理邏輯通過com.lmax.disruptor.EventHandler接口實現(xiàn),executor中的工作線程和發(fā)送線程以及worker中的發(fā)送線程都分別實現(xiàn)了該接口把还。以executor工作線程為例实蓬,executor工作線程讀取event后轉換為Tuple, 并調用IBolt.execute(Tuple tuple)接口觸發(fā)用戶實現(xiàn)的業(yè)務邏輯。

buffer讀取

上圖的邏輯在一個輪詢間隔為0的無限循環(huán)中: 當隊列空閑時吊履,cpu空轉安皱。

(defnk consume-loop*
  [^DisruptorQueue queue handler
   :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))]
  (async-loop    ;;定義一個循環(huán)
          (fn [] (consume-batch-when-available queue handler) 0) ;;此處返回0, 代表sleep-time
          :kill-fn kill-fn  ;;接收到kill信號時執(zhí)行的清理邏輯
          :thread-name (.getName queue)))  ;;線程名稱

;; afn returns amount of time to sleep
(defnk async-loop [afn
                   :daemon false
                   :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))
                   :priority Thread/NORM_PRIORITY
                   :factory? false
                   :start true
                   :thread-name nil]
           ... 
                         (let [sleep-time (afn)]
                           (when-not (nil? sleep-time)
                             (sleep-secs sleep-time)
                             (recur))     ;;循環(huán)調用
                           )
           ...

Storm和Flink對比

- Storm Flink
隊列 Disruptor ArrayDeque+synchronized
隊列有無鎖 有鎖艇炎,使用ReentrantLock 有鎖酌伊,使用synchronized
隊列有無等待 等待,默認Condiction.await(timeout) 等待缀踪,使用wait/notify
緩存 有緩存居砖,用ArrayList和ConcurrentLinkedList 有緩存燕锥,用自定義的MemorySegment和ArrayDeque
緩存大小 可配置,默認100條,和message大小無關 可配置悯蝉,默認最小32768 byte,和條數(shù)無關,是消息序列化后的大型锌(消息可以跨多個buffer)
生產方式 多生產者 多生產者
消費方式 單消費者 單消費者
序列化 默認kryo 自定義
序列化的時機 遠程通信時 寫入緩存時(因此本地線程通信也會序列化)
隊列數(shù) 每個工作線程一個接收隊列(both spout and bolt),每 每個工作線程一個消費隊列(source除外)
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末鼻由,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子厚棵,更是在濱河造成了極大的恐慌蕉世,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件婆硬,死亡現(xiàn)場離奇詭異狠轻,居然都是意外死亡,警方通過查閱死者的電腦和手機彬犯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門向楼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人谐区,你說我怎么就攤上這事湖蜕。” “怎么了宋列?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵昭抒,是天一觀的道長。 經常有香客問我炼杖,道長灭返,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任坤邪,我火速辦了婚禮熙含,結果婚禮上,老公的妹妹穿的比我還像新娘罩扇。我一直安慰自己婆芦,他們只是感情好绒极,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布倍宾。 她就那樣靜靜地躺著,像睡著了一般氢伟。 火紅的嫁衣襯著肌膚如雪员帮。 梳的紋絲不亂的頭發(fā)上或粮,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天,我揣著相機與錄音捞高,去河邊找鬼氯材。 笑死渣锦,一個胖子當著我的面吹牛,可吹牛的內容都是我干的氢哮。 我是一名探鬼主播袋毙,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼冗尤!你這毒婦竟也來了听盖?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤裂七,失蹤者是張志新(化名)和其女友劉穎皆看,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體背零,經...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡腰吟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了徙瓶。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片毛雇。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖倍啥,靈堂內的尸體忽然破棺而出禾乘,到底是詐尸還是另有隱情,我是刑警寧澤虽缕,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布始藕,位于F島的核電站,受9級特大地震影響氮趋,放射性物質發(fā)生泄漏伍派。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一剩胁、第九天 我趴在偏房一處隱蔽的房頂上張望诉植。 院中可真熱鬧,春花似錦昵观、人聲如沸晾腔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽灼擂。三九已至,卻和暖如春觉至,著一層夾襖步出監(jiān)牢的瞬間剔应,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留峻贮,地道東北人席怪。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像纤控,于是被迫代替她去往敵國和親挂捻。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內容