當(dāng)storm遇上python

storm是什么

他的官方文檔是這樣介紹的

Storm is a distributed realtime computation system.

關(guān)鍵詞:分布式塘装、實時硬耍、計算

你什么時候需要storm

當(dāng)你有海量數(shù)據(jù)需要進行實時處理的時候,在這種場景下你往往需要利用到多臺機器赠制,而且讓你關(guān)注的某一類數(shù)據(jù)按一定的規(guī)則路由到確切的節(jié)點赂摆,從而實現(xiàn)對信息流(往往需是有狀態(tài)的)的連續(xù)計算。
實際上分布式計算就是一大堆節(jié)點(一般是在多臺機器上)之間的互相通信钟些,而storm管理了這些節(jié)點烟号,定義了一個計算的模型(topology)讓開發(fā)者可以忽略很多細(xì)節(jié)(比如集群管理、消息隊列)政恍,從而把實現(xiàn)實時分布式計算任務(wù)簡單化汪拥。

storm的哲學(xué)

storm的組件

Paste_Image.png
  • Nimbus: 分發(fā)代碼,分發(fā)任務(wù)篙耗,監(jiān)控錯誤
  • Zookeeper: 管理各個組件迫筑,保持系統(tǒng)穩(wěn)定
  • Supervisor: 執(zhí)行任務(wù),往往多個組成一個拓?fù)洌═opology)

storm的計算模型

Paste_Image.png
  • topology: 拓?fù)渥谕洌瑢嶋H上是一副圖脯燃,代表了對某個計算過程的描述,他的組成部分有 Spout, Bolt, stream
  • Spout: 產(chǎn)生數(shù)據(jù)流蒙保,數(shù)據(jù)流的起點
  • Bolt: 接收數(shù)據(jù)流辕棚,執(zhí)行計算或者重新轉(zhuǎn)發(fā)出數(shù)據(jù)流
  • Stream: 數(shù)據(jù)流,即上圖的箭頭
  • Tuple: 數(shù)據(jù)流在計算模型中是由無數(shù)個tuple組成的
    所有的節(jié)點在這個拓?fù)渲卸际遣l(fā)執(zhí)行的邓厕。

storm的幾種路由方式

路由(grouping)定義了stream如何在各個節(jié)點之中流動逝嚎,下面只介紹幾種常見方式,如下:
Shuffle grouping: 洗牌模式详恼。隨機平均地發(fā)配到下游節(jié)點上补君。
Fields grouping: 按照某一個字段來分配,擁有相同值的字段會分配到同一個節(jié)點上(即可連續(xù)跟蹤某個固定特征的數(shù)據(jù)流)
Global grouping: 強制到某唯一的節(jié)點昧互,實際上如果有多個節(jié)點去到任務(wù)號最低的節(jié)點赚哗。
all grouping: 強制到所有節(jié)點她紫,需小心使用。
Partial Key grouping: 最新支持的屿储,帶負(fù)載均衡的Fields grouping。
Direct grouping: 手動指定要流動到的節(jié)點渐逃。

[關(guān)于storm的組成部分與計算哲學(xué)的更詳細(xì)文檔]

(http://storm.apache.org/documentation/Concepts.html)

hand on the code

假設(shè)你已經(jīng)安裝好了storm(請參照官方文檔够掠,或者其他一切所能參照的文檔,而且請裝0.9.2版本茄菊,下面會有說明)疯潭。
這時候一般的入門會讓你開始你的第一個java程序來提交topology,但如標(biāo)題所預(yù)示面殖, 我們這里會使用python(對竖哩,只需要python)來進行示例。
首先我們需要一個開源項目的支持脊僚,它叫pyleus(yelp公司出品)相叁,這里有一個不幸的消息,它對storm的支持僅到0.9.2(最新版本0.9.4的支持正在開發(fā)中)辽幌。

第一次的提交

$ pip install pyleus
$ git clone https://github.com/Yelp/pyleus.git
$ pyleus build  pyleus/examples/exclamation_topology/pyleus_topology.yaml
$ pyleus local exclamation_topology.jar

只要以上簡單幾個操作增淹,即可把這個topology提交到本地,如果沒有任何錯誤我們就可以繼續(xù)接下來的實際例子乌企。

更有意義的例子-數(shù)單詞

這個例子是pyleus項目自帶的虑润,examples目錄下還有其他詳細(xì)的例子可以參考。

這個Topology的目錄樹
word_count/
|-- word_count/
|   |-- __init__.py
|   |-- count_words.py
|   |-- line_spout.py
|   |-- log_results.py
|   |--split_words.py
|-- pyleus_topology.yaml
pyleus_topology.yaml

此文件定義了這個拓?fù)浠窘M成與數(shù)據(jù)流動

# An ultra-simple topology which shows off Storm and the pyleus.storm library

name: word_count # 自定義拓?fù)涞拿?
topology:

    - spout:
        name: line-spout # 自定義spout組件的名字
        module: word_count.line_spout # 代碼是word_count文件夾下的line_spout.py

    - bolt:
        name: split-words # 自定義bolt組件的名字
        module: word_count.split_words # 代碼是word_count文件夾下的split_words.py
        parallelism_hint: 3 # 并發(fā)的節(jié)點數(shù)
        groupings: 
            - shuffle_grouping: line-spout # 以洗牌模式接收來自line-spout組件的數(shù)據(jù)流

    - bolt:
        name: count-words # 自定義bolt的名字
        module: word_count.count_words # 代碼是word_count文件夾下的count_words.py
        parallelism_hint: 3 # 并發(fā)的節(jié)點數(shù)
        groupings:
            - fields_grouping:
                component: split-words
                fields:
                    - word # 以filed grouping模式接收來自split-words組件的數(shù)據(jù)流加酵,field字段為word拳喻。

    - bolt:
        name: log-results # 自定義bolt的名字
        module: word_count.log_results # 代碼是word_count文件夾下的log_results.py文件
        groupings:
            - global_grouping: count-words # 以global grouping方式接收來自count-words組件的數(shù)據(jù)流

這里數(shù)據(jù)的流動可以描述為

line-spout > split-words > count-words > log-results

line_spout.py
import logging
import random

from pyleus.storm import Spout

log = logging.getLogger('counter')

LINES = """
Lorem ipsum dolor sit amet, consectetur
adipiscing elit. Curabitur pharetra ante eget
nunc blandit vestibulum. Curabitur tempus mi
a risus lacinia egestas. Nulla faucibus
elit vitae dignissim euismod. Fusce ac
elementum leo, ut elementum dui. Ut
consequat est magna, eu posuere mi
pulvinar eget. Integer adipiscing, quam vitae
pretium facilisis, mi ligula viverra sapien,
nec elementum lacus metus ac mi.
Morbi sodales diam non velit accumsan
mollis. Donec eleifend quam in metus
faucibus auctor. Cras auctor sapien non
mauris vehicula, vel aliquam libero luctus.
Sed eu lobortis sapien. Maecenas eu
fringilla enim. Ut in velit nec
lectus tincidunt varius. Sed vel dictum
nunc. Morbi mollis nunc augue, eget
sagittis libero laoreet id. Suspendisse lobortis
nibh mauris, non bibendum magna iaculis
sed. Mauris interdum massa ut sagittis
vestibulum. In ipsum lacus, faucibus eu
hendrerit at, egestas non nisi. Duis
erat mauris, aliquam in hendrerit eget,
aliquam vel nibh. Proin molestie porta
imperdiet. Interdum et malesuada fames ac
ante ipsum primis in faucibus. Praesent
vitae cursus leo, a congue justo.
Ut interdum tellus non odio adipiscing
malesuada. Mauris in ante nec erat
lobortis eleifend. Morbi condimentum interdum elit,
quis iaculis ante pharetra id. In
""".strip().split('\n')


class LineSpout(Spout):

    OUTPUT_FIELDS = ["line"] # 定義要輸出的字段名與數(shù)量

    def next_tuple(self):
        line = random.choice(LINES)
        log.debug(line)
        # 這里tup_id是可選的
        # 如果你要讓storm跟蹤你的tuple流動的話需要加上
        # storm的可靠性保證需要這個
        # (line,) 這個tuple (剛好python的屬于與storm的屬于對上了)
        # 對應(yīng)之前設(shè)置的OUTPUT_FIELDS
        self.emit((line,), tup_id=random.randrange(999999999))


if __name__ == '__main__':
    # 這里是無法通過print的方式從終端輸出調(diào)試結(jié)果的
    # 所以這里采取的方式是寫臨時文件
    # 實際上如果多個節(jié)點同時寫一個文件會存在競爭的情況
    # 不過這里僅供調(diào)試,所以暫時忽略這個問題
    logging.basicConfig(
        level=logging.DEBUG,
        filename='/tmp/word_count_lines.log',
        format="%(message)s",
        filemode='a',
    )

    LineSpout().run()

這個spout的作用是把一個文本分拆成行猪腕,每行作為一個tuple輸出給下游的bolt冗澈。
看下word_count_lines.log的內(nèi)容:

$ head word_count_lines.log
lobortis eleifend. Morbi condimentum interdum elit,
erat mauris, aliquam in hendrerit eget,
vestibulum. In ipsum lacus, faucibus eu
quis iaculis ante pharetra id. In
Ut interdum tellus non odio adipiscing
nunc. Morbi mollis nunc augue, eget
elit vitae dignissim euismod. Fusce ac
nunc. Morbi mollis nunc augue, eget
lectus tincidunt varius. Sed vel dictum
lobortis eleifend. Morbi condimentum interdum elit,
split_words.py

import logging

from pyleus.storm import SimpleBolt

log = logging.getLogger('splitter')


class SplitWordsBolt(SimpleBolt):

    OUTPUT_FIELDS = ["word"] # 定義輸出的字段只有一個福压,名為word

    def process_tuple(self, tup):
        line, = tup.values # 接收到上游的tuple
        log.debug(line)
        for word in line.split():
            log.debug(word)
            # 這里bolt用于跟蹤tuple的參數(shù)是anchors
            # 并且需要把上游的tuple傳入
            # 把word傳給下游
            self.emit((word,), anchors=[tup]) 


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.DEBUG,
        filename='/tmp/word_count_split_words.log',
        format="%(message)s",
        filemode='a',
    )
    SplitWordsBolt().run()

這個bolt的作用是郑藏,對收到的line拆解成word單詞,并傳給下游
他的輸出是 word_count_split_words.log

$head word_count_split_words.log
erat mauris, aliquam in hendrerit eget,
erat
lobortis eleifend. Morbi condimentum interdum elit,
lobortis
mauris,
vestibulum. In ipsum lacus, faucibus eu
aliquam
vestibulum.
in
eleifend.
count_words.py

count-words這個組件使用了field grouping
以field grouping模式接收來自split-words組件的數(shù)據(jù)流矾芙,field字段為word
所以脖岛,相同的word單詞朵栖,會流動到同一個節(jié)點。

from collections import defaultdict
from collections import namedtuple
import logging

from pyleus.storm import SimpleBolt

log = logging.getLogger('counter')

Counter = namedtuple("Counter", "word count") # 輸出是兩個字段


class CountWordsBolt(SimpleBolt):

    OUTPUT_FIELDS = Counter # 輸出是兩個字段

    def initialize(self):
        # 在bolt啟動的時候初始化
        # bolt是作為單例一直運行的
        self.words = defaultdict(int) 


    def process_tuple(self, tup):
        word, = tup.values # 獲得上游的word
        self.words[word] += 1 # 計數(shù)
        log.debug("{0} {1}".format(word, self.words[word]))
        # 注意這里輸出到下游的是兩個字段 word 與 word的計數(shù)
        self.emit((word, self.words[word]), anchors=[tup])


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.DEBUG,
        filename='/tmp/word_count_count_words.log',
        format="%(message)s",
        filemode='a',
    )

    CountWordsBolt().run()

這個bolt的作用是對通用的單詞進行計數(shù)柴梆,并且把兩個字段:單詞本身跟單詞計數(shù)傳遞給下游
word_count_count_words.log的內(nèi)容:

$ tail word_count_count_words.log
Ut 35894
laoreet 9472
Maecenas 8294
id. 19047
sapien, 11816
Suspendisse 9472
blandit 9599
Mauris 24100
erat 19047
a 16677
log_results.py

log-results組件用的是global grouping
在這種情況下所有來自上游的數(shù)據(jù)流都會到同一個節(jié)點陨溅,這樣log寫文件的話就不會有競爭問題了

import logging

from pyleus.storm import SimpleBolt

log = logging.getLogger('log_results')


class LogResultsBolt(SimpleBolt):

    def process_tuple(self, tup):
        word, count = tup.values # 從上游接收兩個字段
        log.debug("%s: %d", word, count)


if __name__ == '__main__':
    logging.basicConfig(
        level=logging.DEBUG,
        filename='/tmp/word_count_results.log',
        format="%(message)s",
        filemode='a',
    )

    LogResultsBolt().run()

這個bolt的作用只是落地,這里落地的方式是寫log文件(一般生產(chǎn)場景會落地到數(shù)據(jù)庫)绍在,落地文件內(nèi)容:

tail word_count_results.log
faucibus: 28713
adipiscing,: 8526
molestie: 11949
at,: 9742
Maecenas: 8392
diam: 11934
eget.: 9714
quam: 17203
mauris,: 24189
tincidunt: 9662

提交此topology到遠(yuǎn)程

pyleus build examples/word_count/pyleus_topology.yaml
pyleus submit -n NIMBUS_HOST exclamation_topology.jar

這里NIMBUS_HOST為遠(yuǎn)程的nimbus地址

pyleus的詳細(xì)文檔

關(guān)于spout

可以看到上面數(shù)單詞的例子實際上數(shù)據(jù)流的來源是一個測試程序隨機產(chǎn)生的门扇,在實際生產(chǎn)環(huán)境中雹有,我們一般會采用kafka來作為數(shù)據(jù)產(chǎn)生的源頭,
一個kafka的spout定義如下
(https://github.com/Yelp/pyleus/tree/develop/examples/kafka_spout)

# 這里的定義是沒有任何操作的臼寄,可以通過為它增加bolt來實現(xiàn)功能
name: kafka_spout_example # 自定義topology名字

topology:

    - spout:
        name: kafka-my_topic # 自定義spout名字
        type: kafka # 制定類型為kafka
        options:
            # 配置kafka的topic
            topic: my_topic

            # 配置zookeeper地址霸奕,多個用逗號隔開
            zk_hosts: zookeeper1:2181,zookeeper2:2181

            # 配置給kafka存儲consumer offsets 的ZooKeeper Root path
            # 默認(rèn)為: /pyleus-kafka-offsets/<topology name>
            zk_root: /pyleus-kafka-offsets/kafka_spout_example

            # Kafka consumer ID.
            # 默認(rèn)為: pyleus-<topology name>
            consumer_id: pyleus-kafka_spout_example

            # 需要從某個offset開始嗎
            # 默認(rèn)是false.
            from_start: false

            # 如果需要從某個offset開始則定義該offset
            start_offset_time: 1398971060
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市吉拳,隨后出現(xiàn)的幾起案子质帅,更是在濱河造成了極大的恐慌,老刑警劉巖留攒,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件煤惩,死亡現(xiàn)場離奇詭異,居然都是意外死亡炼邀,警方通過查閱死者的電腦和手機魄揉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拭宁,“玉大人洛退,你說我怎么就攤上這事『斓” “怎么了不狮?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長在旱。 經(jīng)常有香客問我摇零,道長,這世上最難降的妖魔是什么桶蝎? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任驻仅,我火速辦了婚禮,結(jié)果婚禮上登渣,老公的妹妹穿的比我還像新娘噪服。我一直安慰自己,他們只是感情好胜茧,可當(dāng)我...
    茶點故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布粘优。 她就那樣靜靜地躺著,像睡著了一般呻顽。 火紅的嫁衣襯著肌膚如雪雹顺。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天廊遍,我揣著相機與錄音嬉愧,去河邊找鬼。 笑死喉前,一個胖子當(dāng)著我的面吹牛没酣,可吹牛的內(nèi)容都是我干的王财。 我是一名探鬼主播,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼裕便,長吁一口氣:“原來是場噩夢啊……” “哼绒净!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起闪金,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤疯溺,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后哎垦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡恃疯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年漏设,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片今妄。...
    茶點故事閱讀 38,716評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡郑口,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出盾鳞,到底是詐尸還是另有隱情犬性,我是刑警寧澤,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布腾仅,位于F島的核電站乒裆,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏推励。R本人自食惡果不足惜鹤耍,卻給世界環(huán)境...
    茶點故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望验辞。 院中可真熱鬧稿黄,春花似錦、人聲如沸跌造。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽壳贪。三九已至陵珍,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間撑碴,已是汗流浹背撑教。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留醉拓,地道東北人伟姐。 一個月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓收苏,卻偏偏與公主長得像,于是被迫代替她去往敵國和親愤兵。 傳聞我的和親對象是個殘疾皇子鹿霸,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,612評論 2 350

推薦閱讀更多精彩內(nèi)容

  • Clojure實戰(zhàn)(5):Storm實時計算框架 | Ji ZHANG's Bloghttp://shzhangj...
    葡萄喃喃囈語閱讀 1,272評論 0 2
  • 原文鏈接Storm Tutorial 本人原創(chuàng)翻譯,轉(zhuǎn)載請注明出處 這個教程內(nèi)容包含如何創(chuàng)建topologies及...
    quiterr閱讀 1,615評論 0 6
  • 目錄 場景假設(shè) 調(diào)優(yōu)步驟和方法 Storm 的部分特性 Storm 并行度 Storm 消息機制 Storm UI...
    mtide閱讀 17,074評論 30 60
  • 好多好多年不用文字記憶自己的生活了秆乳! 不記得自己從什么時候開始愛寫寫記記的懦鼠,也許是從初中時吧!但卻記得是從什么時候...
    a放下a閱讀 276評論 1 0
  • 因為孩子的教育方式問題,又和孩子爸爸發(fā)生了分歧扯键。我不禁想起《阿甘正傳》和一句話:只有你能欣賞我睦袖。 很多家長,尤其是...
    嘉寶媽咪閱讀 191評論 0 0