Storm通信機制
Storm和Flink都是流計算系統(tǒng)中比較成熟的開源實現(xiàn)芍秆,在一些流計算平臺選型的文章中介衔,兩者也常常被拿來重點比較,但是兩個平臺都在持續(xù)更新渐尿,一些選型比較和壓測結果文章都可能已經過時徙融。而一個系統(tǒng)的核心實現(xiàn)往往比較少改動窄瘟,因此本文嘗試從底層實現(xiàn)的層面來比較兩者的異同。
在上一篇文章中锌历,我們自頂向下地了解了Flink的內部通信機制贮庞,本文按照同樣的結構來說明Storm的通信機制。代碼基于tag v1.2.2究西。
當我們編寫一個Storm topology時窗慎,也是在描繪一個有向圖。圖中的邊相當于由一系列數(shù)據記錄組成的數(shù)據流卤材,圖中的頂點相當于我們對數(shù)據流的處理遮斥。
先簡單介紹Storm的術語使用,并和Flink作對比,方便熟悉其中任意一個系統(tǒng)的同學更快地理解這些術語扇丛。術語介紹部分主要參考來源為Storm官方文檔Concepts和
Understanding the Parallelism of a Storm Topology
- Spout: Spout是有向運行圖中沒有上游的節(jié)點,是數(shù)據流的起點术吗。相當于Flink中的SourceOperator。
- Bolt: 對數(shù)據流進行處理的節(jié)點帆精,相當于Flink中的TransformationOperator
- Component:Bolt和Spout的統(tǒng)稱 较屿,相當于Flink中的Operator
- Executor: 代表了Component的并行度隧魄。每個executor都會有一個工作線程,負責處理用戶定義的業(yè)務邏輯隘蝎,一個發(fā)送線程购啄,負責把數(shù)據發(fā)送到下游隊列。一個Executor中可以運行同一個component的多個實例末贾。(相當于Flink中的SubTask)
- Task: task代表一個component實例, 同一個Execotor中的task會被串行執(zhí)行(要區(qū)別于并行度)
- Worker: 代表一個進程闸溃,一個Work中可以運行多個Executor,相當于Flink中的TaskManager
術語對比:
Storm | Flink |
---|---|
Worker | TaskManager |
Executor | Task |
Spout | SourceOperator |
Bolt | TransformationOperator |
Component | Operator |
Stream | Stream |
Task | SubTask |
? | Chain |
Storm的Tasks和Flink的Chain并不能等價,在storm中多個Task可以運行在一個Executor上拱撵,但這些Task指的是屬于同一個component的不同實例辉川,兩個Task之間是是等價關系而不是上下游關系。Flink中一個Task中可以有多個Subtask , 在Flink中的Chain是指有上下游關系的且滿足一定條件的多個Subtask可以成鏈的方式被當成一個Task處理拴测。
Component間的通信實現(xiàn)
以Storm的WordCount為例乓旗,此例用Storm的底層API編寫(相對Trident來說的底層),源碼可以從Storm官方的Storm-starter模塊獲取集索。
Storm的編程模型由Spout,Bolt和Stream組成屿愚。在圖中split和count為Bolt, 灰色箭頭為stream。圖中總共有三種不同的component, 其中spout的實現(xiàn)類為RandomSentenceSpout务荆,負責隨機地從一個字符串數(shù)組中選擇句子妆距。split的功能是對句子進行分詞。count功能是計算單詞的出現(xiàn)次數(shù)函匕。功能上與Flink的WordCount例子大同小異娱据。
為了圖例簡潔和簡化模型,在圖中這三種executor的并行度分別為2盅惜、2中剩、1,和代碼中并行度不一致抒寂。這里沒有開啟acker和metric功能, 因此本文沒有畫出__acker和__metric兩種系統(tǒng)實現(xiàn)的bolt结啼,這acker部分會在分布式事務的對比中分析。
Storm沒有實現(xiàn)Flink那樣的Chain功能屈芜,上下游component不會位于同一個線程中郊愧,因此Storm的上下游component通信只有兩種方式,本地線程通信或遠程線程通信井佑。
Storm內部每個executor都會有一個接收隊列和一個發(fā)送隊列糕珊,一個工作線程和一個發(fā)送線程。每個worker內部都會有一個發(fā)送隊列毅糟,一個接收線程和一個發(fā)送線程。Storm中的隊列按職能分了三類:分別為executor接收隊列澜公,executor發(fā)送隊列和worker發(fā)送隊列姆另。這三類隊列的消費者以sleep()的方式不斷輪詢來接收消息喇肋,接收消息后的處理結果publish到下游隊列。
有很多Storm的技術文章中畫出了worker的接收線程和topology.receiver.buffer.size, 事實上在Storm1.0.x中 worker的接收線程已被移除迹辐,改為push的方式蝶防,在Storm server接收到消息后直接反序列化然后寫到各個executor的receive-queue中。
Buffer的讀寫
Storm實現(xiàn)的生產者消費者模式使用到的緩存隊列為LAMX Disruptor中的RingBuffer明吩。LAMX Disruptor號稱最快的無鎖并發(fā)框架间学。在Storm的使用場景中,flush到RingBuffer時使用的等待策略為TimeoutBlockingWaitStrategy是通過ReentrantLock加鎖阻塞的, 且flush到RingBuffer前也會通過鎖來避免并發(fā)調用publishDirect(ArrayList<Object> objs, boolean block)方法印荔。
Buffer寫入
以Bolt->Bolt數(shù)據傳輸為例低葫,Bolt中的tuple發(fā)送主要通過OutputCollector實現(xiàn), 當一個bolt在execute()方法中調用了OutputCollector.emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) 后仍律,它的調用棧如下圖所示:
粉色部分為clojure實現(xiàn)嘿悬,黃色部分是java實現(xiàn)。調用棧的最后調用了java實現(xiàn)的DisruptorQueue.publish(Object obj) 方法水泉。DisruptorQueue是Storm對LAMX disruptor的封裝善涨,主要增加了批量發(fā)布和超時發(fā)布功能。由上一章節(jié)可知草则,tuple會被發(fā)布到一個名為executor$id-send-queue的DisruptorQueue中钢拧。
DisruptorQueue的發(fā)布的代碼邏輯比較復雜,主要通過ThreadLocalInserter和Flusher分別實現(xiàn)Tuple的批量發(fā)布和超時發(fā)布炕横。
批量發(fā)布部分主要實現(xiàn)如下:DisruptorQueue的公共方法publish(Object obj)中源内,先后調用 ThreadLocalInserter的add和flush方法
/**
變量解釋:
batcher即為ThreadLocalInserter對象實例
**/
public void publish(Object obj) {
...
batcher.add(obj);
batcher.flush(false);
}
batcher.add(obj)方法的功能是把tuple放進當前批次的緩存中,如果當前批次_currentBatch滿了看锉,且當前沒有發(fā)送失改批次(為了保證順序性姿锭,未發(fā)送成功的批次需要先發(fā)送),會觸發(fā)flush到disruptor的ringbuffer伯铣。但不保證flush成功呻此,如果因為ringbuffer空間不足flush失敗,會把失敗的批次放進無界隊列_overflow中緩存腔寡。注意:add方法不阻塞焚鲜。
batcher.flush(boolean block)的功能是觸發(fā)發(fā)送失敗的批次flush到ringbuffer中。該方法還可能在定時調度的Flusher線程中被調用放前。注意:block==false時忿磅,flush方法不阻塞。
綜上凭语,RingBuffer中的發(fā)布單元為一個批次大小的tuple(而不是單個tuple)葱她,publish方法不會阻塞,_overflow是個無界非阻塞隊列似扔。因此吨些,如果下游處理不及時且上游持續(xù)生產數(shù)據時搓谆,可能因為_overflow中緩存的對象過多而發(fā)生OOM。Storm提供了兩種方式來避免這種情況豪墅,留在后續(xù)Storm和flink實現(xiàn)對比再討論泉手。
/**
變量解釋:
_currentBatch為ArrayList<Object>對象實例, 用于緩存當前批次的tuple
_overflow為ConcurrentLinkedList<ArrayList<Object>>,無界隊列,用于緩存發(fā)送失敗的tuple batch
_inputBatchSize為當前批次的最大緩存tuple數(shù)
**/
public synchronized void add(Object obj) {
...
//如果當前批次已滿
if (_currentBatch.size() >= _inputBatchSize) {
boolean flushed = false;
//如果當前批次已滿且緩存中沒有發(fā)送失敗的批次
if (_overflow.isEmpty()) {
try {
//發(fā)布到disruptor的ringbuffer中,非阻塞偶器,當ringbuffer空間不足時拋出InsufficientCapacityException
publishDirect(_currentBatch, false);
_overflowCount.addAndGet(0 - _currentBatch.size());
_currentBatch.clear();
flushed = true;
} catch (InsufficientCapacityException e) {
//Ignored we will flush later
}
}
//如果當前批次已滿 且≌睹取(緩存中有發(fā)送失敗的批次 或 當前批次發(fā)送失敗)
if (!flushed) {
//把當前批次加入到未發(fā)送失敗的緩存隊列中
_overflow.add(_currentBatch);
_currentBatch = new ArrayList<Object>(_inputBatchSize);
}
}
}
//May be called by a background thread
public void flush(boolean block) {
if (block) {
_flushLock.lock();
} else if (!_flushLock.tryLock()) {
//Someone else if flushing so don't do anything
return;
}
try {
while (!_overflow.isEmpty()) {
publishDirect(_overflow.peek(), block);
_overflowCount.addAndGet(0 - _overflow.poll().size());
}
} catch (InsufficientCapacityException e) {
//Ignored we should not block
} finally {
_flushLock.unlock();
}
}
到這里屏轰,看過Flink通信機制的同學應該明白“Flink的反壓機制實現(xiàn)得更天然”的說法了颊郎。
DisruptorQueue是底層實現(xiàn),直接暴露給用戶的發(fā)送數(shù)據到下游的接口是output collector亭枷。Storm output collector的實現(xiàn)相較Flink混亂袭艺,存在兩個問題:
- Collector命名比較混亂,例如有的實現(xiàn)類叫XXCollectorImpl,有的又不帶Impl后綴 , ISpoutOutputCollector和IOutputCollector是兩個完全不同的接口叨粘,兩者不在同一繼承樹中,分別實現(xiàn)Spout的數(shù)據發(fā)送接口和Bolt的數(shù)據發(fā)送接口猾编。不通過關鍵字搜索比較難找出全部實現(xiàn)了“tuple發(fā)送”功能的代碼。
- Storm的collector實現(xiàn)耦合了tuple的發(fā)送邏輯和tuple的ack fail邏輯升敲,因為ack/fail邏輯不同而劃分了兩種主要的OutputCollector , 分別是負責發(fā)送Spout tuple的ISpoutOutputCollector答倡、負責發(fā)送IRichBolt tuple的IOutputCollector、其它Collector基本上是通過委托模式基于這兩個Collector實現(xiàn)的驴党。例如帶有自動自動ack/fail tuple功能的IBasicOutputCollector,這個類把tuple發(fā)送邏輯委托給OutputCollector,而java實現(xiàn)的OutputCollector最后會委托給由clojure代碼executor.clj中實現(xiàn)的IOutputCollector匿名類瘪撇。
Storm有兩個批量處理框架,相關框架的實現(xiàn)類分別以Transactional和Trident開頭港庄,Transactional開頭的批處理實現(xiàn)已經被標記為廢棄倔既,現(xiàn)主要維護Trident的實現(xiàn)。這兩個API中提供給用戶編程使用的ITridentSpout和ITransactionalSpout 最后都會在Bolt所在的executor中調用鹏氧,所以批處理編程API中的Spout使用的Collector實際父類或委托類為IOutputCollector渤涌。
Buffer讀取
RingBuffer的讀取和處理邏輯通過com.lmax.disruptor.EventHandler接口實現(xiàn),executor中的工作線程和發(fā)送線程以及worker中的發(fā)送線程都分別實現(xiàn)了該接口把还。以executor工作線程為例实蓬,executor工作線程讀取event后轉換為Tuple, 并調用IBolt.execute(Tuple tuple)接口觸發(fā)用戶實現(xiàn)的業(yè)務邏輯。
上圖的邏輯在一個輪詢間隔為0的無限循環(huán)中: 當隊列空閑時吊履,cpu空轉安皱。
(defnk consume-loop*
[^DisruptorQueue queue handler
:kill-fn (fn [error] (exit-process! 1 "Async loop died!"))]
(async-loop ;;定義一個循環(huán)
(fn [] (consume-batch-when-available queue handler) 0) ;;此處返回0, 代表sleep-time
:kill-fn kill-fn ;;接收到kill信號時執(zhí)行的清理邏輯
:thread-name (.getName queue))) ;;線程名稱
;; afn returns amount of time to sleep
(defnk async-loop [afn
:daemon false
:kill-fn (fn [error] (exit-process! 1 "Async loop died!"))
:priority Thread/NORM_PRIORITY
:factory? false
:start true
:thread-name nil]
...
(let [sleep-time (afn)]
(when-not (nil? sleep-time)
(sleep-secs sleep-time)
(recur)) ;;循環(huán)調用
)
...
Storm和Flink對比
- | Storm | Flink |
---|---|---|
隊列 | Disruptor | ArrayDeque+synchronized |
隊列有無鎖 | 有鎖艇炎,使用ReentrantLock | 有鎖酌伊,使用synchronized |
隊列有無等待 | 等待,默認Condiction.await(timeout) | 等待缀踪,使用wait/notify |
緩存 | 有緩存居砖,用ArrayList和ConcurrentLinkedList | 有緩存燕锥,用自定義的MemorySegment和ArrayDeque |
緩存大小 | 可配置,默認100條,和message大小無關 | 可配置悯蝉,默認最小32768 byte,和條數(shù)無關,是消息序列化后的大型锌(消息可以跨多個buffer) |
生產方式 | 多生產者 | 多生產者 |
消費方式 | 單消費者 | 單消費者 |
序列化 | 默認kryo | 自定義 |
序列化的時機 | 遠程通信時 | 寫入緩存時(因此本地線程通信也會序列化) |
隊列數(shù) | 每個工作線程一個接收隊列(both spout and bolt),每 | 每個工作線程一個消費隊列(source除外) |