Storm原理深入淺出

Storm架構(gòu)

Storm是一個(gè)分布式料扰、可靠的實(shí)時(shí)計(jì)算系統(tǒng)。與Hadoop不同的是,它采用流式的消息處理方法悴能,對(duì)于每條消息輸入到系統(tǒng)中后就能被立即處理。適用于一些對(duì)實(shí)時(shí)性要求高的場(chǎng)景雳灾,比如廣告點(diǎn)擊在線統(tǒng)計(jì)漠酿、交易額實(shí)時(shí)統(tǒng)計(jì)等寻狂。

一些名詞解釋

  • Stream:Storm中被處理的數(shù)據(jù)流眼刃,一條消息稱為一個(gè)元組锁孟。

  • Spout:Storm連接外部數(shù)據(jù)源的組件芭届,可以認(rèn)為Storm的數(shù)據(jù)源谋旦。

  • Bolt:數(shù)據(jù)處理組件褐桌,Bolt里面封裝了處理數(shù)據(jù)的邏輯照雁。Spout和Bolt是Storm中的兩類組件秋泄,類似MapReduce中的Map和Reduce阱持。比如可以在Bolt上定義過濾夭拌、聚合、join衷咽、寫數(shù)據(jù)庫等鸽扁。

  • Stream Group:消息分組策略,定義了Bolt組件以何種方式接收數(shù)據(jù)镶骗。Storm內(nèi)置了八種消息分組策略桶现,我們也可以通過實(shí)現(xiàn)CustomStreamGrouping定義自己的消息分組策略。

    1. Shuffle grouping:隨機(jī)分配消息給Bolt task鼎姊,能夠保證每個(gè)Bolt task都能分配到相同數(shù)據(jù)量的元組骡和。
    2. Fileds grouping:根據(jù)字段進(jìn)行劃分相赁,比如按“user-id”字段進(jìn)行劃分,那么相同“user-id”的值會(huì)被分配到一個(gè)Bolt task中慰于。
    3. Partial grouping:類似Filed grouping噪生,但是能夠保證下游Bolt任務(wù)負(fù)載均衡。
    4. All grouping:將每條消息都廣播給所有Bolt任務(wù)东囚,也就是說每個(gè)Bolt處理的數(shù)據(jù)完全相同跺嗽。需要小心使用。
    5. Global grouping:所有消息流數(shù)據(jù)全部發(fā)送到一個(gè)Bolt任務(wù)中页藻。
    6. None grouping:不關(guān)心分組策略桨嫁,相當(dāng)于Shuffle grouping。
    7. Direct grouping:直接分組份帐,上游指定哪個(gè)Bolt任務(wù)接受數(shù)據(jù)璃吧。
    8. Local or shuffle grouping:資源本地化的一種實(shí)現(xiàn)方式,如果任務(wù)都在同一個(gè)進(jìn)程中废境,則會(huì)發(fā)送到該Bolt任務(wù)中畜挨。入果沒有,相當(dāng)于shuffle grouping噩凹。
  • Topology:由消息分組將Spout和Bolt連接起來的任務(wù)拓?fù)浒驮O喈?dāng)于MapReduce中Map和Reduce組成的任務(wù)。

  • Worker:工作進(jìn)程驮宴,運(yùn)行在Supervisor節(jié)點(diǎn)上逮刨,一個(gè)Woker進(jìn)程可以包含一個(gè)或多個(gè)Executor線程。每個(gè)Woker進(jìn)程上會(huì)執(zhí)行一組Task堵泽,比如Storm集群總共有50個(gè)Woker修己,如果Task總數(shù)為300,那么每個(gè)Worker上面需要執(zhí)行6個(gè)Task迎罗。Worker執(zhí)行topology的一個(gè)子集(不會(huì)出現(xiàn)一個(gè)Worker進(jìn)程為多個(gè)Topology提供服務(wù))睬愤,一個(gè)Topology任務(wù)可能由多個(gè)Woker進(jìn)程負(fù)責(zé)執(zhí)行。

  • Executor:執(zhí)行Spout或Bolt任務(wù)的線程纹安,由Worker進(jìn)程創(chuàng)建尤辱。每個(gè)Executor線程只會(huì)執(zhí)行一個(gè)Topology中的一個(gè)Component的task實(shí)例(但不一定只執(zhí)行一個(gè)task,可能執(zhí)行多個(gè)task)钻蔑。

  • Task:Storm中最小的處理單元啥刻,一個(gè)Executor中可以運(yùn)行一個(gè)或多個(gè)Task。Topology中的Spout和Bolt可以設(shè)置并行度咪笑,一個(gè)并行度對(duì)應(yīng)一個(gè)Task可帽。

一個(gè)Topology任務(wù)啟動(dòng)后,組件(Spout或Bolt)的task數(shù)量就已經(jīng)確定了(就是組件的并行度)窗怒。但是我們可以為該組件添加執(zhí)行線程映跟,也就是Executor(因?yàn)橛锌赡芤粋€(gè)Executor執(zhí)行了多個(gè)task蓄拣,為了提高執(zhí)行效率,可以增加Executor線程努隙。但是需要注意球恤,一個(gè)Executor只會(huì)為同一個(gè)Component的task服務(wù))。默認(rèn)情況下荸镊,task數(shù)是等于Executor數(shù)的咽斧,即一個(gè)Executor執(zhí)行一個(gè)task。

Storm架構(gòu)

Storm集群有兩類節(jié)點(diǎn):運(yùn)行Nimbus守護(hù)進(jìn)程的主節(jié)點(diǎn)和運(yùn)行Supervisor守護(hù)進(jìn)程的工作節(jié)點(diǎn)躬存。Nimbus節(jié)點(diǎn)用于分配代碼张惹、分配計(jì)算任務(wù)(分配給哪些Supervisor上的哪些Woker)和監(jiān)控狀態(tài)(用于故障檢測(cè)、恢復(fù))岭洲。Supervisor節(jié)點(diǎn)負(fù)責(zé)監(jiān)聽工作(監(jiān)聽Nimbus分配的任務(wù))宛逗、啟動(dòng)并停止Woker進(jìn)程。

Worker是運(yùn)行在Supervisor上的進(jìn)程盾剩,Supervisor收到Nimbus分配的任務(wù)后雷激,負(fù)責(zé)啟動(dòng)Nimbus指定的Woker進(jìn)程。Woker進(jìn)程執(zhí)行Topology的子集告私,一個(gè)Topology任務(wù)可能由運(yùn)行在多臺(tái)機(jī)器上的Worker進(jìn)程組成屎暇。

Woker進(jìn)程啟動(dòng)一個(gè)或多個(gè)Executor線程,Executor線程中可以有一個(gè)或多個(gè)Task德挣。每個(gè)Executor都會(huì)啟動(dòng)一個(gè)消息循環(huán)線程恭垦,用于接受快毛、處理和發(fā)送消息格嗅。

Nimbus和Supervisor之間協(xié)調(diào)工作也是由Zookeeper來完成的。

image.png

Nimbus和Supervisor都能快速失敗恢復(fù)唠帝,而且它們都是無狀態(tài)的屯掖,狀態(tài)信息存儲(chǔ)在Zookeeper(元數(shù)據(jù))和本地中。當(dāng)Nimbus或Supervisor掛掉后襟衰,可以重新啟動(dòng)并讀取狀態(tài)信息到集群中來正常運(yùn)行贴铜,所以Storm系統(tǒng)具有很高的容錯(cuò)性。

在邏輯上將Storm中消息來源節(jié)點(diǎn)稱為Spout瀑晒,消息處理節(jié)點(diǎn)稱為Bolt绍坝,它們通過流分組組成Topology。


image.png

Storm中元數(shù)據(jù)

為了更好的理解Storm的設(shè)計(jì)苔悦,我們可以通過Zookeeper中存儲(chǔ)的元數(shù)據(jù)來理解Storm架構(gòu)中各個(gè)節(jié)點(diǎn)之間的關(guān)系轩褐。

image.png

Storm在Zookeeper存儲(chǔ)的消息都經(jīng)都是以/storm開始,所有數(shù)據(jù)都存儲(chǔ)在葉子節(jié)點(diǎn)上玖详。下面說一個(gè)每個(gè)節(jié)點(diǎn)數(shù)據(jù)存儲(chǔ)的具體含義:

  • /storm/workerbeats/{topology-id}/node-port:拓?fù)淙蝿?wù)所在的Woker進(jìn)程信息把介,node和port是Woker所在的主機(jī)和端口勤讽。里面主要存儲(chǔ)了Woker的運(yùn)行狀態(tài)和統(tǒng)計(jì)信息。Woker進(jìn)程會(huì)定時(shí)上報(bào)心跳到該節(jié)點(diǎn)拗踢,Nimbus通過心跳信息來確認(rèn)Woker進(jìn)程的存活脚牍,對(duì)于死掉的Woker,Nimbus會(huì)重新調(diào)度巢墅。統(tǒng)計(jì)信息包括該Woker上所有Executor的統(tǒng)計(jì)信息诸狭,比如發(fā)送消息數(shù),接受消息數(shù)等君纫,這些信息會(huì)顯示在UI中作谚。一個(gè)Topology任務(wù)可能劃分到多個(gè)Woker(node-port)上。
  • /storm/storms/{topolog-id}:存儲(chǔ)了拓?fù)淙蝿?wù)的本身信息庵芭,比如名字妹懒、啟動(dòng)時(shí)間、運(yùn)行狀態(tài)双吆、使用Woker數(shù)眨唬、每個(gè)組件的并行度等。這個(gè)信息在任務(wù)注冊(cè)后好乐,就不會(huì)在發(fā)生改變匾竿。該節(jié)點(diǎn)信息,可以幫助Nimbus進(jìn)行資源分配蔚万,因?yàn)槟軌蛑滥男㏒upervisor上面有哪些任務(wù)岭妖。
  • /storm/assignments/{topology-id}:Nimbus為每個(gè)Topology任務(wù)的分配信息,比如Topology在該Nimbus中的存儲(chǔ)目錄反璃、被分配給哪些Supervisor昵慌、Woker、Executor上淮蜈。
  • /storm/supervisors/{supervisor-id>}:Supervisor節(jié)點(diǎn)的注冊(cè)信息斋攀,存儲(chǔ)了該節(jié)點(diǎn)自身一些統(tǒng)計(jì)信息,比如使用了哪些端口等梧田。該Znode是臨時(shí)節(jié)點(diǎn)淳蔼,當(dāng)Supervisor下線后,這些信息會(huì)自動(dòng)刪除裁眯,Nimbus感知到該節(jié)點(diǎn)下線后會(huì)重新為該Supervisor下的任務(wù)分配節(jié)點(diǎn)鹉梨。
  • /storm/errors/{topology-id}/{component-id}/{equential-id}:存儲(chǔ)運(yùn)行中每個(gè)組件的錯(cuò)誤信息,每個(gè)組件只會(huì)保留最近十條錯(cuò)誤信息穿稳。

下面一張圖講述了節(jié)點(diǎn)直接如何創(chuàng)建存皂、使用這些元數(shù)據(jù):

image.png

Nimbus、Supervisor司草、Woker兩兩之間都需要維持心跳艰垂。Nimbus通過/storm/superviors/節(jié)點(diǎn)能夠知道哪些Supervisor存活泡仗。Nimbus通過/storm/workerbeats/節(jié)點(diǎn)能夠知道哪些Woker存活。Supervisor和Woker通過本地文件維持心跳猜憎,他們雖然是兩個(gè)進(jìn)程無法直接通信娩怎,為什么通過文件維持心跳呢,應(yīng)該有很多網(wǎng)絡(luò)通信框架可以吧胰柑。

Storm源碼

Storm基于Clojure和Java編寫的截亦。其中Clojure實(shí)現(xiàn)了Nimbus、Supervisor柬讨、Woker崩瓤、Executor以及Task這些基礎(chǔ)服務(wù)。Java實(shí)現(xiàn)了Storm的流處理(組件實(shí)現(xiàn))以及事務(wù)Topology踩官。
Storm中還有一些Trident代碼却桶,Trident是Storm實(shí)時(shí)消息處理的更高層抽象。

Storm集群搭建

環(huán)境準(zhǔn)備

1蔗牡、Java安裝
Storm1.x在Java7和Java8中完成了測(cè)試颖系,所以Java版本最好是1.7+。
2辩越、Python安裝
Storm1.x在Python2.6.6中完成了測(cè)試嘁扼,理論上3.x也能夠運(yùn)行,但是官方并沒有測(cè)試黔攒。
3趁啸、Zookeeper安裝
Storm使用Zookeeper進(jìn)行協(xié)調(diào)服務(wù),所以需要準(zhǔn)備Zookeeper環(huán)境督惰,版本官方上面沒有指定版本不傅。

Storm安裝

下載Storm jar包

http://storm.apache.org/downloads.html

Storm文件配置
Storm配置文件在${STORM_HOME}/conf/storm.yaml中,下面是一些必須的配置項(xiàng):

 #zookeeper集群
 storm.zookeeper.servers:
     - "192.168.0.1"
     - "192.168.0.2"
     - "192.168.0.3"
 #zookeeper端口
 storm.zookeeper.port: 2181
 #可作為nimbus的候選主機(jī)
 nimbus.seeds: ["192.168.0.1"]
 #storm數(shù)據(jù)存儲(chǔ)目錄姑丑,用于存儲(chǔ)少量狀態(tài)信息蛤签,比如jar、conf等
 storm.local.dir: "/opt/yjz/storm/data"
 #suppervisor可以作為woker進(jìn)程啟動(dòng)的端口栅哀,表明該Supervisor最多可以啟動(dòng)四個(gè)Worker進(jìn)程
 supervisor.slots.ports:
     - 6700
     - 6701
     - 6702
     - 6703

更多配置可以查看https://github.com/apache/storm/blob/v1.2.2/conf/defaults.yaml。隨著深入了解称龙,可以優(yōu)化集群配置留拾。

節(jié)點(diǎn)啟動(dòng)

nohub bin/storm nimbus &
nohub bin/storm ui &
nonub bin/storm supervisor &

查看nimbus節(jié)點(diǎn),發(fā)現(xiàn)nimbus進(jìn)程已經(jīng)啟動(dòng)鲫尊,core為ui進(jìn)程痴柔。

jps
8134 nimbus
8713 core
10123 Jps
19710 QuorumPeerMain

查看supervisor節(jié)點(diǎn),發(fā)現(xiàn)supervisor進(jìn)程已經(jīng)啟動(dòng)疫向。

jps
19237 QuorumPeerMain
7610 Supervisor
8975 Jps

查看UI咳蔚,192.168.0.1:8080豪嚎。

image.png

Storm編程

編程模型

我們上面說過Storm提供了兩類組件:Spout和Bolt。所以我們使用Storm編程的對(duì)象主要也就是針對(duì)Spout和Bolt谈火。
Spout用于定義接受外部數(shù)據(jù)源數(shù)據(jù)侈询,并將其轉(zhuǎn)換成Storm內(nèi)部數(shù)據(jù),然后將這些數(shù)據(jù)發(fā)送給Bolt糯耍。
Bolt組件定義了數(shù)據(jù)處理邏輯扔字,也就是我們的業(yè)務(wù)處理邏輯,比如過濾温技、聚合革为、Join等。Bolt組件也可以用于寫入外部介質(zhì)舵鳞,比如寫入Mysql震檩、Redis等。
組件之間傳輸?shù)臄?shù)據(jù)在Storm中稱為元組(Tuple蜓堕,可以理解為一行數(shù)據(jù))恳蹲,Tuple是Storm數(shù)據(jù)傳輸?shù)幕締卧uple中定義了字段(Field俩滥,可以理解一行數(shù)據(jù)中一個(gè)字段)嘉蕾,這些Field是有Schema的,F(xiàn)iled可以是byte霜旧、short错忱、integer、long挂据、float以清、double、boolean崎逃、string和byte array掷倔,除了這些基礎(chǔ)類型,我們還可以自定義數(shù)據(jù)類型(需要自己實(shí)現(xiàn)針對(duì)自定義類型的序列化)个绍。

image.png

Storm 組件

在編寫Storm作業(yè)時(shí)我們會(huì)針對(duì)Spout和Bolt進(jìn)行編程實(shí)現(xiàn)勒葱,下面我們分別看一下Storm為我們提供的組件接口。

Spout組件

Spout是Storm中Topology的生產(chǎn)者巴柿,負(fù)責(zé)讀取外部數(shù)據(jù)凛虽,并轉(zhuǎn)換成Tuple。Spout可以設(shè)置處理的消息類型為可靠或不可靠广恢。對(duì)可靠的消息凯旋,Spout會(huì)緩存發(fā)出去的消息,當(dāng)該消息在topology中處理失敗時(shí),Spout可以重新發(fā)送該消息至非。對(duì)于不可靠的消息钠署,Spout一旦發(fā)送出該消息后就會(huì)將該消息扔掉,所以如果該消息處理失敗荒椭,那么該消息就會(huì)丟失谐鼎。

Spout可以發(fā)射多條消息流Stream,通過使用OutputFieldsDeclarer.declareStream()方法來定義多個(gè)Stream戳杀,然后使用SpoutOutputCollector.emit()方法在發(fā)射消息時(shí)指定Stream该面。

Spout中最重要的方法nextTuple(),將外部數(shù)據(jù)源數(shù)據(jù)以tuple形式發(fā)送到Topology中的Bolt組件進(jìn)行處理信卡。需要注意nextTuple()不能阻塞隔缀,因?yàn)镾torm是在一個(gè)線程內(nèi)調(diào)用Spout的所有方法。

Spout對(duì)于可靠類型消息傍菇,還有兩個(gè)比較重要的方法就是ack和fail猾瘸。Storm在檢測(cè)一個(gè)tuple被整個(gè)Topology執(zhí)行成功的時(shí)候會(huì)回調(diào)ack,否則調(diào)用fail丢习。

下面是Spout組件在Storm中的實(shí)現(xiàn)牵触,我們分別來看下這些組件都是什么。

image.png

IComponent接口

IComponent接口是所有組件的頂級(jí)接口咐低,IComponent中定義了所有組件可能需要用到的方法(其實(shí)就是定義了Spout和Bolt組件都用得到的方法)揽思。

  • declareOutputFields方法用于聲明該拓?fù)渌辛?Stream)的輸出模式(Schema),比如聲明輸出流ID见擦、輸出字段(Field)以及輸出流是否為直接流钉汗。
  • getComponentConfiguration方法用于聲明該組件的配置,但是它只能覆蓋以"topology.*"配置為開頭的子集鲤屡。我們也可以通過TopologyBuilder構(gòu)建拓?fù)鋾r(shí)损痰,對(duì)組件進(jìn)行進(jìn)一步配置覆蓋。

ISpout接口

ISpout是Spout組件的定義的頂級(jí)接口酒来,它定義了Spout組件支持的方法卢未。Storm會(huì)在相應(yīng)的階段調(diào)用ISpout接口中特定的方法,比如啟動(dòng)拓?fù)鋾r(shí)會(huì)首先調(diào)用open()方法堰汉,正常停止拓?fù)鋾r(shí)會(huì)調(diào)用close()方法(kill -9不會(huì)被調(diào)用)辽社。

  • open方法會(huì)在任務(wù)在集群工作進(jìn)程初始化時(shí)調(diào)用,用于提供Spout執(zhí)行所需要的環(huán)境衡奥。比如讀取外部數(shù)據(jù)源的一些初始化配置可以寫在這里爹袁。Map類型的conf參數(shù)是這個(gè)Spout的配置,它包含了拓?fù)渑c集群配置的合并集矮固。TopologyContext類型的context是該拓?fù)涞纳舷挛模ㄍ負(fù)鋓d、組件id档址、輸入輸出信息等盹兢。SpoutOutputCollector類型的collector參數(shù)是Spout的收集器,用于發(fā)射tuple守伸。
  • close方法當(dāng)一個(gè)ISpout關(guān)閉時(shí)會(huì)被調(diào)用绎秒,但是并不能保證一定被調(diào)用,比如Supervisor被kill -9強(qiáng)制殺死的時(shí)候尼摹。
  • active方法是Spout從失效模式到激活狀態(tài)時(shí)被調(diào)用见芹。Spout可以處于失效狀態(tài)或激活狀態(tài),處于失效狀態(tài)的Spout不會(huì)調(diào)用nextTuple方法蠢涝。從失效狀態(tài)到激活狀態(tài)調(diào)用nextTuple方法前玄呛,會(huì)調(diào)用active方法。
  • deactive方法是當(dāng)Spout失效時(shí)被調(diào)用和二,失效狀態(tài)的Spout不會(huì)調(diào)用nextTuple方法徘铝。
  • nextTuple方法是Spout組件最重要的方法,nextTuple用于讀取外部數(shù)據(jù)源數(shù)據(jù)轉(zhuǎn)換為tuple惯吕,并且通過SpoutOutputCollector收集器發(fā)射tuple惕它。由于nextTuple、ack废登、fail方法都是在一個(gè)線程內(nèi)被調(diào)用淹魄,所以nextTuple方法不應(yīng)該有阻塞代碼。
  • ack方法是當(dāng)Storm確定從該Spout發(fā)射出去標(biāo)識(shí)符為msgId的消息被topology完整處理完成時(shí)堡距,會(huì)調(diào)用ack方法甲锡,我們可以在這里實(shí)現(xiàn)一些邏輯,比如從我們的數(shù)據(jù)源隊(duì)列中移除該消息吏颖。
  • fail方法和ack方法相反搔体,當(dāng)msgId消息在topology中被處理失敗時(shí)會(huì)被調(diào)用。

Spout的可靠性消息類型不需要我們通過fail方法實(shí)現(xiàn)半醉,Storm會(huì)自動(dòng)實(shí)現(xiàn)疚俱。ack和fail方法只是給我們獲取消息被處理成功與否的接口。

IRichSpout接口

IRichSpout接口繼承了ISpout和IComponent接口缩多,它是我們實(shí)現(xiàn)Spout組件的主要接口呆奕。它本身沒有定義任何方法,所以我們實(shí)現(xiàn)Spout組件時(shí)候只需要實(shí)現(xiàn)ISpout和IComponent接口的方法衬吆。

BaseComponent抽象類

BaseComponent抽象類實(shí)現(xiàn)實(shí)現(xiàn)了IComponent組件的getComponentConfiguration方法梁钾,并且返回為空。
它主要的作用就是對(duì)于一些Spout組件并不需要進(jìn)行覆蓋配置逊抡,這時(shí)候通過繼承BaseComponent抽象類就不需要實(shí)現(xiàn)該方法了姆泻。

BaseXxx在Storm組件中零酪,一般都是指一些基礎(chǔ)實(shí)現(xiàn)。目的就是為了避免我們寫代碼時(shí)候去實(shí)現(xiàn)一些我們用不到的方法(我們一般都是置為空)拇勃。

BaseRichSpout抽象類

BaseRichSpout繼承了IRichSpout接口和BaseComponent接口四苇,它實(shí)現(xiàn)了一些我們?cè)趯?shí)際編寫Spout組件時(shí)可能用不到的方法。比如close方咆、active月腋、deactive、ack瓣赂、fail方法(方法體都為空)榆骚。這樣當(dāng)我們通過繼承BaseRichSpout抽象類來實(shí)現(xiàn)Spout組件時(shí)候,這些方法我們都可以不需要實(shí)現(xiàn)了煌集。

public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
    @Override
    public void close() {
    }
    @Override
    public void activate() {
    }
    @Override
    public void deactivate() {
    }
    @Override
    public void ack(Object msgId) {
    }
    @Override
    public void fail(Object msgId) {
    }
}

總結(jié)

當(dāng)我們編寫Spout組件時(shí)妓肢,如果想要實(shí)現(xiàn)Spout給我們提供的所有方法,可以直接實(shí)現(xiàn)接口IRichSpout接口牙勘。如果我們只需要實(shí)現(xiàn)Spout組件所必須的方法职恳,可以直接繼承BaseRichSpout抽象類。

Bolt組件接口

所有的消息處理邏輯被封裝到Bolt中方面,比如可以用來做過濾放钦、聚合、查詢數(shù)據(jù)庫恭金、寫入數(shù)據(jù)等操禀。
Bolt也可以發(fā)送多條消息流Stream,使用OutputFieldsDeclarer.declareStream()方法定義Stream横腿,然后使用OutputCollector.emit()方法在發(fā)射消息時(shí)颓屑,指定Stream。

Bolt最重要的方法時(shí)execute()耿焊,它以接受一個(gè)tuple揪惦,經(jīng)過邏輯處理后,使用OutputCollector.emit()發(fā)射出0個(gè)或多個(gè)tuple罗侯。Bolt還需要為每個(gè)tuple調(diào)用ack方法器腋,來通知Storm這個(gè)消息被該Bolt task執(zhí)行完成,從而通知這個(gè)tuple的發(fā)射者Spout(調(diào)用Spout的ack或fail)钩杰。

Bolt組件定義的邏輯和Spout組件的類似纫塌,下面是Bolt組件的實(shí)現(xiàn)方式。

image.png

IBolt接口

IBolt接口是Bolt組件的頂級(jí)接口讲弄,它定義了Bolt組件所需要的方法措左。IBolt的設(shè)計(jì)原則是以一個(gè)元組作為輸入,通過邏輯處理生成零個(gè)或多個(gè)元組輸出避除。

  • prepare方法在拓?fù)渥鳂I(yè)在工作進(jìn)程初始化時(shí)調(diào)用怎披。和ISpout中的open方法一樣胸嘁,做一些組件初始化的工作。prepare方法同樣提供了三個(gè)stormConf钳枕、context和collector缴渊,用途和ISpout中的open方法一樣赏壹。需要注意Bolt中的collector的類型是OutputCollector鱼炒。
  • execute方法是Bolt組件最重要的方法,接收上游發(fā)送的元組蝌借,執(zhí)行業(yè)務(wù)邏輯昔瞧,然后通過OutputCollector來向下游發(fā)射元組。在執(zhí)行完execute方法后菩佑,我們應(yīng)該調(diào)用ack或fail方法來通知Storm該消息已經(jīng)被處理(否則Storm一直會(huì)等到消息處理超時(shí)自晰,才認(rèn)為該消息處理失敗)。
  • cleanup方法當(dāng)IBolt被即將關(guān)閉時(shí)調(diào)用稍坯,和ISpout的close方法一樣酬荞,不保證一定被執(zhí)行到。

execute中的tuple可以不立即執(zhí)行瞧哟,可以等待其它元組到來一起執(zhí)行混巧。比如做聚合、join等操作勤揩。

IRichBolt接口

IRichBolt接口實(shí)現(xiàn)了IBolt和IComponent接口咧党,它的作用和ISpout接口一樣,這里就不在復(fù)述了陨亡。

BaseRichBolt抽象方法

BaseRichBolt抽象類和BaseRichSpout抽象方法一樣傍衡,對(duì)一些我們可能用不到的方法,提供默認(rèn)實(shí)現(xiàn)负蠕。

public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
    @Override
    public void cleanup() {
    }    
}

IBasicBolt蛙埂、BaseBasicBolt

使用IBasicBolt和BaseBasicBolt類來實(shí)現(xiàn)Bolt組件,我們可以不需要手動(dòng)ack遮糖,IBasicBolt的execute方法會(huì)自動(dòng)執(zhí)行Acking機(jī)制(仔細(xì)看它使用的是BasicOutputCollector在emit后绣的,執(zhí)行ack方法)。如果我們希望該元組失敗止吁,可以顯示拋出一個(gè)FailedException異常被辑。

image.png

IBasicBolt接口方法和IRichBolt具有相同的方法,只是prepare中沒有傳遞OutputCollector收集器了敬惦,而是在execute方法中直接傳遞了BasicOutputCollector盼理。所以如果我們不需要對(duì)該Bolt組件添加配置和獲取拓?fù)渖舷挛膶?duì)象,可以直接實(shí)現(xiàn)BaseBasicBolt抽象類俄删,因?yàn)樵擃愄峁┝藀repare和cleanup的默認(rèn)實(shí)現(xiàn)宏怔,我們只需要實(shí)現(xiàn)execute方法即可奏路。

總結(jié)

如果我們不想要手動(dòng)調(diào)用ack,可以繼承IBasicBolt或BaseBasickBolt來實(shí)現(xiàn)Bolt組件臊诊。當(dāng)然鸽粉,如果想要顯示靈活調(diào)用,可以通過繼承IRichBolt或BaseRichBolt來實(shí)現(xiàn)Bolt組件抓艳。

Stream Groupings

我們上面說過Topology是通過Stream Grouping將Spout和Bolt組合而成的触机。無論Bolt還是Spout我們都可以為其設(shè)置并行度,并行度對(duì)應(yīng)著task玷或,而Stream Grouping定義了該Bolt中的所有task以什么形式來接受數(shù)據(jù)流儡首。Stream Grouping支持的八種流分組方式在上面已經(jīng)說過了,這里就不說了偏友。

Topology配置

當(dāng)我編寫完Spout和Bolt組件后蔬胯,需要提供一個(gè)主類來設(shè)置拓?fù)洹_@個(gè)主類也是Storm執(zhí)行Topology任務(wù)的入口類位他。

并行度

在說Topology配置前氛濒,我們先理解一下Storm中的并行度。
在Storm中運(yùn)行Topology任務(wù)主要依賴下面三個(gè)實(shí)體:

  • 工作進(jìn)程(Worker processes)
  • 執(zhí)行線程(Executor)
  • 任務(wù)(Task)
image.png

一個(gè)工作進(jìn)程運(yùn)行一個(gè)Topology的子集鹅髓,并且每個(gè)工作進(jìn)程只屬于一個(gè)Topology任務(wù)(不會(huì)存在一個(gè)Woker服務(wù)多個(gè)Topology)舞竿,一個(gè)Topology任務(wù)可能由多臺(tái)機(jī)器的多個(gè)Woker組成。
Woker進(jìn)程中可以啟動(dòng)多個(gè)Executor線程迈勋,每個(gè)Executor線程運(yùn)行一個(gè)或多個(gè)Task炬灭,但是這些task必須是同一Component中的。也就是每個(gè)Executor只能服務(wù)于一個(gè)Component靡菇。
Task是具體執(zhí)行數(shù)據(jù)處理的重归,我們實(shí)現(xiàn)的Bolt或Spout組件中每個(gè)組件可以啟動(dòng)一個(gè)或多個(gè)task來執(zhí)行,以達(dá)到提高處理效率的目的厦凤。Task數(shù)在Topology啟動(dòng)后就不能在改變了鼻吮,但是我們可以修改執(zhí)行Task的Executor線程數(shù),來動(dòng)態(tài)調(diào)整為該拓?fù)浞峙涞馁Y源较鼓。默認(rèn)情況下椎木,每個(gè)Executor會(huì)對(duì)應(yīng)一個(gè)task。

配置實(shí)例

我們首先需要使用TopologyBuilder來配置拓?fù)潢P(guān)系博烂,在設(shè)置過程中可以添加配置香椎、設(shè)置組件執(zhí)行Executor數(shù)量、設(shè)置組件task數(shù)以及設(shè)置Stream Grouping禽篱。

public static void main(String[] args) throws Exception{
        //設(shè)置Component(bolt和spout)之間的拓?fù)浣Y(jié)構(gòu)
        TopologyBuilder builder = new TopologyBuilder();
        //添加spout組件
        builder.setSpout("word-reader",new WordReaderSpout()).addConfiguration("topology.debug",true);
        //添加bolt組件畜伐,并設(shè)置組件所需task數(shù),這里沒有設(shè)置Executor數(shù)量躺率,默認(rèn)會(huì)為每個(gè)task啟動(dòng)一個(gè)Executor線程玛界。這里還設(shè)置了以shuffleGrouping分組方式接收上游word-reader組件發(fā)送的消息
        builder.setBolt("word-normalizer",new WordNormalizerBolt()).setNumTasks(2).shuffleGrouping("word-reader");
        //設(shè)置bolt組件万矾,并且設(shè)置了啟動(dòng)Executor數(shù),這里沒有設(shè)置task數(shù)量慎框,默認(rèn)會(huì)為么給Executor分配一個(gè)task良狈。這里還設(shè)置了以shuffleGrouping分組方式接收上游word-normailizer組件發(fā)送的消息
        builder.setBolt("word-counter",new WordCounterBolt()).shuffleGrouping("word-normalizer",3);

        //運(yùn)行時(shí)與集群配置合并,并通過prepare或open方法發(fā)送給所有組件節(jié)點(diǎn)
        Config conf = new Config();
        //設(shè)置運(yùn)行該拓?fù)湫枰獛讉€(gè)Worker進(jìn)程笨枯,如果沒有設(shè)置默認(rèn)為1個(gè)薪丁。
        conf.setNumWorkers(2);
        conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,10);

        conf.put("word-file",args[0]);
        conf.setDebug(false);
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
        //使用集群方式提交拓?fù)鋱?zhí)行
        StormSubmitter.submitTopology("storm-hello-word",conf,builder.createTopology());

    }

如果集群中Woker數(shù)量被用完(storm.yaml中設(shè)置的supervisor.slots.ports),在提交新Topology時(shí)會(huì)失敗猎醇。需要等待運(yùn)行中的Topology釋放資源后才可以執(zhí)行窥突。

更新運(yùn)行中的Topology的Executor

Storm提供了對(duì)運(yùn)行中Topology任務(wù)的Executor熱更新,有兩種方式可以進(jìn)行更新硫嘶。

  • 使用Storm web UI中rebalance更新。
  • 是用Storm命令行工具CLI進(jìn)行更新梧税。

比如將myTopology任務(wù)改為5個(gè)工作進(jìn)程沦疾,組件blue-spout使用3個(gè)Executor,組件yello-bolt使用5個(gè)bolt第队。

storm rebalance myTopology -n 5 -e blue-spout=3 -e yello-bolt=5

Storm配置文件

Storm有許多各種各樣的配置哮塞,有些是系統(tǒng)配置不能通過Topology任務(wù)進(jìn)行修改,而有些配置是支持在Topology任務(wù)中進(jìn)行修改凳谦。
默認(rèn)Storm中的所有配置都在Storm代碼庫中的default.yaml中忆畅,我們可以通過在Nimbus節(jié)點(diǎn)或Supervisor節(jié)點(diǎn)中的storm.yaml進(jìn)行覆蓋。除了通過storm.yaml進(jìn)行覆蓋修改配置外尸执,我們還可以通過StormSubmitter構(gòu)建拓?fù)鋾r(shí)來修改配置(傳入的Config對(duì)象)家凯,但是這里只能修改以TOPOLOGY為前綴的配置項(xiàng)。

我們也可以通過Java API修改配置如失,有兩種方式:

  • 內(nèi)部修改:覆蓋Spout或Bolt的getComponentConfiguration方法來修改配置绊诲。
  • 外部修改:在TopologyBuilder中的setSpout或setBolt方法返回的對(duì)象調(diào)用addConfiguration來覆蓋配置。

這些配置的優(yōu)先級(jí)為:
default.yaml < storm.yaml < 配置拓?fù)涮砑优渲庙?xiàng) < 組件內(nèi)部添加配置項(xiàng) < 組件外部添加配置項(xiàng)

Maven開發(fā)配置

使用Maven開發(fā)Storm作業(yè)褪贵,首先就需要配置pom.xml文件掂之,包括Jar包引入、打包配置等脆丁。
Storm目前版本是1.2.2世舰,Jar包引入:

<dependency>
  <groupId>org.apache.storm</groupId>
  <artifactId>storm-core</artifactId>
  <version>1.2.2</version>
  <scope>provided</scope>
</dependency>

這里之所有使用provided模式是因?yàn)椋琒torm會(huì)自動(dòng)從工作節(jié)點(diǎn)下載Storm Jar包槽卫。
當(dāng)我們編寫完Storm作業(yè)后跟压,需要將相關(guān)依賴打到一個(gè)Jar包內(nèi)猫缭,可以使用assembly插件進(jìn)行打包濒蒋。

  <plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
      <descriptorRefs>  
        <descriptorRef>jar-with-dependencies</descriptorRef>
      </descriptorRefs>
      <archive>
        <manifest>
          <mainClass>com.path.to.main.Class</mainClass>
        </manifest>
      </archive>
    </configuration>
  </plugin>

其中mainClass就是我們作業(yè)的啟動(dòng)主類启盛。

本地模式

Storm為了方便開發(fā)測(cè)試凡桥,提供了本地模式運(yùn)行Storm Topology,本地模式模擬了集群模式下作業(yè)的執(zhí)行喷好,所以我們?cè)陂_發(fā)調(diào)試過程中可以使用本地模式測(cè)試開發(fā)的Storm作業(yè)翔横。
本地模式將Spout和Bolt都運(yùn)行在一個(gè)進(jìn)程上的多個(gè)線程執(zhí)行,來模擬真實(shí)的集群運(yùn)行情況梗搅。對(duì)于一些耗時(shí)操作禾唁,會(huì)采用Thread.sleep()方法模擬,所以有時(shí)會(huì)導(dǎo)致運(yùn)行速度緩慢无切。

//只需要?jiǎng)?chuàng)建LocalCluster對(duì)象類就可以使用本地模式
LocalCluster localCluster = new LocalCluster();
//提交作業(yè)
localCluster.submitTopology();
//停止作業(yè)
localCluster.killTopology();
//關(guān)閉本地集群模式
localCluster.shutdown();

Local模式提供了一下配置型:

Config.TOPOLOGY_MAX_TASK_PARALLELISM:設(shè)置Topology的最大并行度荡短。
Config.TOPOLOGY_DEBUG:開啟DEBUG模式,方便作業(yè)調(diào)試哆键。

Storm核心機(jī)制原理

Storm消息可靠性處理

Storm提供了幾種不同級(jí)別的消息可靠性處理:

  • 盡力處理(best effort)
  • 至少一次被處理(at least once)
  • 恰好處理一次(exactly once,需要借助Trident)

盡力處理屬于最低級(jí)別保證機(jī)制掘托,我們可以不添加任何額外操作,Storm就能幫我們達(dá)到籍嘹。

至少一次被處理(at least once)

Spout發(fā)出的消息可能會(huì)產(chǎn)生成千上萬條消息(經(jīng)過各種Bolt task處理后就會(huì)分散出一條消息)闪盔,這些消息會(huì)組成一顆消息樹,其中Spout發(fā)出的消息為消息根辱士,Storm會(huì)跟蹤整棵樹的處理情況泪掀,如果這顆樹中的任一消息處理失敗,或者整棵樹在規(guī)定時(shí)間沒有被處理完(通過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置颂碘,默認(rèn)為30s)异赫,那么Storm就認(rèn)為Spout發(fā)出的這些消息處理失敗了,Spout會(huì)重新發(fā)送該條消息头岔。

判斷一個(gè)tuple tree是否被處理完成塔拳,有以下兩個(gè)條件:

  1. tuple tree不在生長(zhǎng)。
  2. tuple tree中的任何消息都被處理切油。

使用Storm API來實(shí)現(xiàn)

如果想要使用Storm提供的可靠性處理蝙斜,我們需要做兩件事:

  1. 無論何時(shí),只要在tuple tree中創(chuàng)建了一個(gè)新節(jié)點(diǎn)澎胡,就需要告知Storm孕荠。
  2. 當(dāng)處理完一個(gè)消息后,需要告知Storm中對(duì)應(yīng)tuple tree攻谁。

通過上面兩個(gè)步驟稚伍,Storm就可以檢測(cè)一個(gè)tuple tree是否被處理完成,并且會(huì)調(diào)用消息產(chǎn)生對(duì)應(yīng)Spout的ack和fail方法戚宦。

當(dāng)為tuple tree中指定的節(jié)點(diǎn)增加一個(gè)新節(jié)點(diǎn)時(shí)个曙,稱為錨定(anchoring)。錨定是在發(fā)送消息的同時(shí)進(jìn)行的,具體錨定方式為:把輸入消息作為emit方法的第一個(gè)參數(shù)垦搬。這樣就告知tuple tree呼寸,該節(jié)點(diǎn)產(chǎn)生了新節(jié)點(diǎn),只有當(dāng)新節(jié)點(diǎn)也完成時(shí)tuple tree才算執(zhí)行完成猴贰。

public class SplitSentence extends BaseRichBolt {
        ...
        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }
        ...

Storm支持一個(gè)輸出消息被錨定在一個(gè)或多個(gè)輸入消息上对雪,比如join、聚合等場(chǎng)景米绕。一個(gè)被多重錨定的消息處理失敗瑟捣,會(huì)導(dǎo)致與之關(guān)聯(lián)的多個(gè)Spout重新發(fā)送消息。

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

多重錨定會(huì)將錨定的消息添加到多棵tuple tree上栅干,并且有可能打破樹結(jié)構(gòu)迈套,從而形成一個(gè)DAG圖。

如果沒有錨定碱鳞,也就是沒有在emit方法的第一個(gè)參數(shù)指定輸入tuple桑李。那么這個(gè)節(jié)點(diǎn)所產(chǎn)生的子樹失敗,spout不會(huì)重新發(fā)送消息劫笙。該節(jié)點(diǎn)ack完成后芙扎,tuple tree就認(rèn)為被處理完成了。有些場(chǎng)景非常適合這種不需要錨定的消息填大。

完成錨定后,我們還需要在消息被處理完成后告知tuple tree俏橘。我們必須在每個(gè)execute方法的后面顯示調(diào)用OutputCollector的ack或fail方法允华,來表明該消息在該bolt是否被處理完成(否則會(huì)一直等到超時(shí))。
顯示調(diào)用ack或fail寥掐,是除了快速告知tuple tree消息是否被處理完成外靴寂,還有一個(gè)原因就是防止內(nèi)存被打滿。因?yàn)镾torm使用內(nèi)存來跟蹤每個(gè)元組是否被處理完成召耘,所以如果不調(diào)用ack或fail百炬,很容易將內(nèi)存打滿。

Storm提供了IBasicBolt和BaseBasicBolt接口來隱式調(diào)用ACK機(jī)制污它,也就是說我們?nèi)绻褂盟鼈儗?shí)現(xiàn)Bolt組件剖踊,就不需要手動(dòng)錨定和調(diào)用ack/fail方法了。

public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

針對(duì)于多重錨定衫贬,IBasicBolt和BaseBasicBolt是無法處理的德澈。需要我們顯示完成,也就是需要實(shí)現(xiàn)IRichBolt或BaseRichBolt定義Bolt組件固惯。

acker框架

Storm使用Acker框架來跟蹤消息是否被成功處理梆造。Acker是Storm中一組特殊的任務(wù),用于跟蹤每個(gè)Spout發(fā)送tuple的DAG葬毫。當(dāng)acker發(fā)現(xiàn)DAG中節(jié)點(diǎn)都完全被處理完成后镇辉,它會(huì)向創(chuàng)建該tuple的Spout發(fā)送一條消息(成功或失敗)屡穗。
我們可以使用Config.TOPOLOGY_ACKERS在拓?fù)渑渲弥性O(shè)置Acker數(shù)量,默認(rèn)情況下每個(gè)Woker進(jìn)程會(huì)啟動(dòng)一個(gè)Acker任務(wù)忽肛。

當(dāng)在Topology中創(chuàng)建一個(gè)新的元組時(shí)村砂,會(huì)為每個(gè)元組分配一個(gè)64bit的隨機(jī)id(無論spout還是bolt組件)。Acker使用這些id來跟蹤tuple tree麻裁。每個(gè)tuple被創(chuàng)建后tuple tree中的根id都會(huì)被復(fù)制到這個(gè)消息中箍镜,當(dāng)這個(gè)消息處理完成后,它會(huì)根據(jù)根id來找到跟蹤這棵樹的Acker煎源,并向該Acker發(fā)送狀態(tài)變更信息色迂,比如:該元組已經(jīng)處理完成,又產(chǎn)生了新元組手销,需要你跟蹤下歇僧。
這里有個(gè)點(diǎn)需要考慮下,就是每個(gè)元組如何知道自己的tuple tree對(duì)應(yīng)著哪個(gè)Acker锋拖。Storm使用一種哈希算法根據(jù)Spout tupel id來確定那個(gè)Acker負(fù)責(zé)該tuple tree诈悍,而每個(gè)消息都知道根id,因此就知道與哪個(gè)Acker通信了兽埃。

我們知道了Acker與Spout tuple對(duì)應(yīng)關(guān)系侥钳,知道了每個(gè)tuple tree 元組如何找到對(duì)應(yīng)的Acker與其通信。接下來還需要考慮一點(diǎn)柄错,Acker如何跟蹤tuple tree舷夺。
我們知道每個(gè)tuple tree都有可能有成千上萬個(gè)節(jié)點(diǎn),如果跟蹤每個(gè)節(jié)點(diǎn)售貌,那么內(nèi)存很容易就被打滿了给猾。Storm采用了一個(gè)不同的跟蹤策略,每個(gè)Spout元組只需要固定數(shù)據(jù)量的空間(大約20字節(jié))颂跨,就可以跟蹤tuple tree敢伸。這種跟蹤算法是Storm能夠正常工作的關(guān)鍵,也是其重大突破之一恒削。
我們看下Storm是如何做的池颈。Acker為每個(gè)Spout元組存儲(chǔ)一個(gè)消息ID(隨機(jī)分配的那個(gè)ID)到一對(duì)值的映射,這對(duì)值的第一個(gè)元素就是Spout任務(wù)ID蔓同,第二個(gè)元素是64bit數(shù)字饶辙,稱為“ack val”,它是tuple tree中所有消息id的異或結(jié)果斑粱。ack val代表了整棵樹的狀態(tài)弃揽,當(dāng)這個(gè)ack val為0時(shí)就代表整棵樹已經(jīng)被處理完成了。

它的異或原理就是,當(dāng)我們無論創(chuàng)建一個(gè)節(jié)點(diǎn)還是完成一個(gè)節(jié)點(diǎn)都使用消息ID來與之異或矿微,這樣同一個(gè)消息ID一來一回異或結(jié)果就為0了痕慢。

image.png

如上圖,ack val最終值為T1T2T3T4T5T1T2T3T4^T5=0涌矢。

選擇合適的可靠性

Acker任務(wù)是輕量級(jí)的掖举,所以拓?fù)渲胁恍枰芏郃cker任務(wù),我們可以通過Storm UI來查看Acker吞吐量娜庇,如果吞吐量很差塔次,可以適當(dāng)添加Acker任務(wù)。
如果我們認(rèn)為消息可靠性不是必要的(處理失敗情況下丟失消息沒有關(guān)系)名秀,我們可以關(guān)閉消息可靠性励负。這樣拓?fù)湫阅芤矔?huì)提升,因?yàn)椴桓櫾M樹匕得,傳輸?shù)南?huì)減半(因?yàn)樵M樹中每個(gè)元組都需要發(fā)送一條確認(rèn)信息)继榆。并且每個(gè)下游元組中保留更少的數(shù)據(jù)(不需要存儲(chǔ)根ID),從而減少帶寬使用汁掠。
關(guān)閉消息可靠性的方式:

  1. 將Config.TOPOLOGY_ACKERS設(shè)置為0略吨,這樣Spout發(fā)送元組后,它的ack方法會(huì)被立即調(diào)用考阱。
  2. 在Spout中使用SpoutOutputCollector.emit發(fā)送消息時(shí)不指定消息ID翠忠。這樣可以對(duì)一些特定消息關(guān)閉消息可靠性。
  3. 如果不在意某個(gè)消息派生出子消息的可靠性乞榨,那么在對(duì)應(yīng)的botl組件中可以不進(jìn)行錨定(不指定輸入tuple)负间。

學(xué)習(xí)資料

  • Storm官網(wǎng)(http://storm.apache.org/releases/1.2.2/index.html),看了許多Storm書籍姜凄,大部分都是直接翻譯的Storm官方文檔。
  • 《從零開始學(xué)Storm》對(duì)Storm講解的非常全面趾访,由淺入深态秧,覆蓋面比較廣。
  • 《Storm源碼分析》對(duì)Storm從源碼級(jí)別進(jìn)行了講解扼鞋,有些地方原理講的還是不錯(cuò)的申鱼,就是該書講解的Storm版本比較低。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末云头,一起剝皮案震驚了整個(gè)濱河市捐友,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌溃槐,老刑警劉巖匣砖,帶你破解...
    沈念sama閱讀 222,000評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡猴鲫,警方通過查閱死者的電腦和手機(jī)对人,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拂共,“玉大人牺弄,你說我怎么就攤上這事∫撕” “怎么了势告?”我有些...
    開封第一講書人閱讀 168,561評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)抚恒。 經(jīng)常有香客問我咱台,道長(zhǎng),這世上最難降的妖魔是什么柑爸? 我笑而不...
    開封第一講書人閱讀 59,782評(píng)論 1 298
  • 正文 為了忘掉前任吵护,我火速辦了婚禮,結(jié)果婚禮上表鳍,老公的妹妹穿的比我還像新娘馅而。我一直安慰自己,他們只是感情好譬圣,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,798評(píng)論 6 397
  • 文/花漫 我一把揭開白布瓮恭。 她就那樣靜靜地躺著,像睡著了一般厘熟。 火紅的嫁衣襯著肌膚如雪屯蹦。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,394評(píng)論 1 310
  • 那天绳姨,我揣著相機(jī)與錄音登澜,去河邊找鬼。 笑死飘庄,一個(gè)胖子當(dāng)著我的面吹牛脑蠕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播跪削,決...
    沈念sama閱讀 40,952評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼谴仙,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了碾盐?” 一聲冷哼從身側(cè)響起晃跺,我...
    開封第一講書人閱讀 39,852評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎毫玖,沒想到半個(gè)月后掀虎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體凌盯,經(jīng)...
    沈念sama閱讀 46,409評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,483評(píng)論 3 341
  • 正文 我和宋清朗相戀三年涩盾,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了十气。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,615評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡春霍,死狀恐怖砸西,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情址儒,我是刑警寧澤芹枷,帶...
    沈念sama閱讀 36,303評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站莲趣,受9級(jí)特大地震影響鸳慈,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜喧伞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,979評(píng)論 3 334
  • 文/蒙蒙 一走芋、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧潘鲫,春花似錦翁逞、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至浊竟,卻和暖如春怨喘,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背振定。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評(píng)論 1 272
  • 我被黑心中介騙來泰國打工必怜, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人后频。 一個(gè)月前我還...
    沈念sama閱讀 49,041評(píng)論 3 377
  • 正文 我出身青樓棚赔,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親徘郭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,630評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • 目錄 場(chǎng)景假設(shè) 調(diào)優(yōu)步驟和方法 Storm 的部分特性 Storm 并行度 Storm 消息機(jī)制 Storm UI...
    mtide閱讀 17,139評(píng)論 30 60
  • Storm 是一個(gè)分布式的丧肴,可靠的残揉,容錯(cuò)的數(shù)據(jù)流處理系統(tǒng)。下面我將分別從storm的整體架構(gòu)以及部分原理進(jìn)行講解芋浮。...
    瘋狂的哈丘閱讀 4,628評(píng)論 0 1
  • 一抱环、Storm簡(jiǎn)介 1.1 Storm是什么 Apache Storm(http://storm.apache.o...
    這一刻_776b閱讀 1,371評(píng)論 0 1
  • Storm是一個(gè)免費(fèi)并開源的分布式實(shí)時(shí)計(jì)算系統(tǒng)壳快。利用Storm可以很容易做到可靠地處理無限的數(shù)據(jù)流,像Hadoop...
    timothyue1閱讀 593評(píng)論 0 0
  • 這是一個(gè)JStorm使用教程镇草,不包含環(huán)境搭建教程眶痰,直接在公司現(xiàn)有集群上跑任務(wù),關(guān)于JStorm集群環(huán)境搭建梯啤,后續(xù)研...
    Coselding閱讀 6,348評(píng)論 1 9