storm是什么
他的官方文檔是這樣介紹的
Storm is a distributed realtime computation system.
關(guān)鍵詞:分布式塘装、實時硬耍、計算
你什么時候需要storm
當(dāng)你有海量數(shù)據(jù)需要進行實時處理的時候,在這種場景下你往往需要利用到多臺機器赠制,而且讓你關(guān)注的某一類數(shù)據(jù)按一定的規(guī)則路由到確切的節(jié)點赂摆,從而實現(xiàn)對信息流(往往需是有狀態(tài)的)的連續(xù)計算。
實際上分布式計算就是一大堆節(jié)點(一般是在多臺機器上)之間的互相通信钟些,而storm管理了這些節(jié)點烟号,定義了一個計算的模型(topology)讓開發(fā)者可以忽略很多細(xì)節(jié)(比如集群管理、消息隊列)政恍,從而把實現(xiàn)實時分布式計算任務(wù)簡單化汪拥。
storm的哲學(xué)
storm的組件
- Nimbus: 分發(fā)代碼,分發(fā)任務(wù)篙耗,監(jiān)控錯誤
- Zookeeper: 管理各個組件迫筑,保持系統(tǒng)穩(wěn)定
- Supervisor: 執(zhí)行任務(wù),往往多個組成一個拓?fù)洌═opology)
storm的計算模型
- topology: 拓?fù)渥谕洌瑢嶋H上是一副圖脯燃,代表了對某個計算過程的描述,他的組成部分有 Spout, Bolt, stream
- Spout: 產(chǎn)生數(shù)據(jù)流蒙保,數(shù)據(jù)流的起點
- Bolt: 接收數(shù)據(jù)流辕棚,執(zhí)行計算或者重新轉(zhuǎn)發(fā)出數(shù)據(jù)流
- Stream: 數(shù)據(jù)流,即上圖的箭頭
- Tuple: 數(shù)據(jù)流在計算模型中是由無數(shù)個tuple組成的
所有的節(jié)點在這個拓?fù)渲卸际遣l(fā)執(zhí)行的邓厕。
storm的幾種路由方式
路由(grouping)定義了stream如何在各個節(jié)點之中流動逝嚎,下面只介紹幾種常見方式,如下:
Shuffle grouping: 洗牌模式详恼。隨機平均地發(fā)配到下游節(jié)點上补君。
Fields grouping: 按照某一個字段來分配,擁有相同值的字段會分配到同一個節(jié)點上(即可連續(xù)跟蹤某個固定特征的數(shù)據(jù)流)
Global grouping: 強制到某唯一的節(jié)點昧互,實際上如果有多個節(jié)點去到任務(wù)號最低的節(jié)點赚哗。
all grouping: 強制到所有節(jié)點她紫,需小心使用。
Partial Key grouping: 最新支持的屿储,帶負(fù)載均衡的Fields grouping。
Direct grouping: 手動指定要流動到的節(jié)點渐逃。
[關(guān)于storm的組成部分與計算哲學(xué)的更詳細(xì)文檔]
(http://storm.apache.org/documentation/Concepts.html)
hand on the code
假設(shè)你已經(jīng)安裝好了storm(請參照官方文檔够掠,或者其他一切所能參照的文檔,而且請裝0.9.2版本茄菊,下面會有說明)疯潭。
這時候一般的入門會讓你開始你的第一個java程序來提交topology,但如標(biāo)題所預(yù)示面殖, 我們這里會使用python(對竖哩,只需要python)來進行示例。
首先我們需要一個開源項目的支持脊僚,它叫pyleus(yelp公司出品)相叁,這里有一個不幸的消息,它對storm的支持僅到0.9.2(最新版本0.9.4的支持正在開發(fā)中)辽幌。
第一次的提交
$ pip install pyleus
$ git clone https://github.com/Yelp/pyleus.git
$ pyleus build pyleus/examples/exclamation_topology/pyleus_topology.yaml
$ pyleus local exclamation_topology.jar
只要以上簡單幾個操作增淹,即可把這個topology提交到本地,如果沒有任何錯誤我們就可以繼續(xù)接下來的實際例子乌企。
更有意義的例子-數(shù)單詞
這個例子是pyleus項目自帶的虑润,examples目錄下還有其他詳細(xì)的例子可以參考。
這個Topology的目錄樹
word_count/
|-- word_count/
| |-- __init__.py
| |-- count_words.py
| |-- line_spout.py
| |-- log_results.py
| |--split_words.py
|-- pyleus_topology.yaml
pyleus_topology.yaml
此文件定義了這個拓?fù)浠窘M成與數(shù)據(jù)流動
# An ultra-simple topology which shows off Storm and the pyleus.storm library
name: word_count # 自定義拓?fù)涞拿?
topology:
- spout:
name: line-spout # 自定義spout組件的名字
module: word_count.line_spout # 代碼是word_count文件夾下的line_spout.py
- bolt:
name: split-words # 自定義bolt組件的名字
module: word_count.split_words # 代碼是word_count文件夾下的split_words.py
parallelism_hint: 3 # 并發(fā)的節(jié)點數(shù)
groupings:
- shuffle_grouping: line-spout # 以洗牌模式接收來自line-spout組件的數(shù)據(jù)流
- bolt:
name: count-words # 自定義bolt的名字
module: word_count.count_words # 代碼是word_count文件夾下的count_words.py
parallelism_hint: 3 # 并發(fā)的節(jié)點數(shù)
groupings:
- fields_grouping:
component: split-words
fields:
- word # 以filed grouping模式接收來自split-words組件的數(shù)據(jù)流加酵,field字段為word拳喻。
- bolt:
name: log-results # 自定義bolt的名字
module: word_count.log_results # 代碼是word_count文件夾下的log_results.py文件
groupings:
- global_grouping: count-words # 以global grouping方式接收來自count-words組件的數(shù)據(jù)流
這里數(shù)據(jù)的流動可以描述為
line-spout > split-words > count-words > log-results
line_spout.py
import logging
import random
from pyleus.storm import Spout
log = logging.getLogger('counter')
LINES = """
Lorem ipsum dolor sit amet, consectetur
adipiscing elit. Curabitur pharetra ante eget
nunc blandit vestibulum. Curabitur tempus mi
a risus lacinia egestas. Nulla faucibus
elit vitae dignissim euismod. Fusce ac
elementum leo, ut elementum dui. Ut
consequat est magna, eu posuere mi
pulvinar eget. Integer adipiscing, quam vitae
pretium facilisis, mi ligula viverra sapien,
nec elementum lacus metus ac mi.
Morbi sodales diam non velit accumsan
mollis. Donec eleifend quam in metus
faucibus auctor. Cras auctor sapien non
mauris vehicula, vel aliquam libero luctus.
Sed eu lobortis sapien. Maecenas eu
fringilla enim. Ut in velit nec
lectus tincidunt varius. Sed vel dictum
nunc. Morbi mollis nunc augue, eget
sagittis libero laoreet id. Suspendisse lobortis
nibh mauris, non bibendum magna iaculis
sed. Mauris interdum massa ut sagittis
vestibulum. In ipsum lacus, faucibus eu
hendrerit at, egestas non nisi. Duis
erat mauris, aliquam in hendrerit eget,
aliquam vel nibh. Proin molestie porta
imperdiet. Interdum et malesuada fames ac
ante ipsum primis in faucibus. Praesent
vitae cursus leo, a congue justo.
Ut interdum tellus non odio adipiscing
malesuada. Mauris in ante nec erat
lobortis eleifend. Morbi condimentum interdum elit,
quis iaculis ante pharetra id. In
""".strip().split('\n')
class LineSpout(Spout):
OUTPUT_FIELDS = ["line"] # 定義要輸出的字段名與數(shù)量
def next_tuple(self):
line = random.choice(LINES)
log.debug(line)
# 這里tup_id是可選的
# 如果你要讓storm跟蹤你的tuple流動的話需要加上
# storm的可靠性保證需要這個
# (line,) 這個tuple (剛好python的屬于與storm的屬于對上了)
# 對應(yīng)之前設(shè)置的OUTPUT_FIELDS
self.emit((line,), tup_id=random.randrange(999999999))
if __name__ == '__main__':
# 這里是無法通過print的方式從終端輸出調(diào)試結(jié)果的
# 所以這里采取的方式是寫臨時文件
# 實際上如果多個節(jié)點同時寫一個文件會存在競爭的情況
# 不過這里僅供調(diào)試,所以暫時忽略這個問題
logging.basicConfig(
level=logging.DEBUG,
filename='/tmp/word_count_lines.log',
format="%(message)s",
filemode='a',
)
LineSpout().run()
這個spout的作用是把一個文本分拆成行猪腕,每行作為一個tuple輸出給下游的bolt冗澈。
看下word_count_lines.log的內(nèi)容:
$ head word_count_lines.log
lobortis eleifend. Morbi condimentum interdum elit,
erat mauris, aliquam in hendrerit eget,
vestibulum. In ipsum lacus, faucibus eu
quis iaculis ante pharetra id. In
Ut interdum tellus non odio adipiscing
nunc. Morbi mollis nunc augue, eget
elit vitae dignissim euismod. Fusce ac
nunc. Morbi mollis nunc augue, eget
lectus tincidunt varius. Sed vel dictum
lobortis eleifend. Morbi condimentum interdum elit,
split_words.py
import logging
from pyleus.storm import SimpleBolt
log = logging.getLogger('splitter')
class SplitWordsBolt(SimpleBolt):
OUTPUT_FIELDS = ["word"] # 定義輸出的字段只有一個福压,名為word
def process_tuple(self, tup):
line, = tup.values # 接收到上游的tuple
log.debug(line)
for word in line.split():
log.debug(word)
# 這里bolt用于跟蹤tuple的參數(shù)是anchors
# 并且需要把上游的tuple傳入
# 把word傳給下游
self.emit((word,), anchors=[tup])
if __name__ == '__main__':
logging.basicConfig(
level=logging.DEBUG,
filename='/tmp/word_count_split_words.log',
format="%(message)s",
filemode='a',
)
SplitWordsBolt().run()
這個bolt的作用是郑藏,對收到的line拆解成word單詞,并傳給下游
他的輸出是 word_count_split_words.log
$head word_count_split_words.log
erat mauris, aliquam in hendrerit eget,
erat
lobortis eleifend. Morbi condimentum interdum elit,
lobortis
mauris,
vestibulum. In ipsum lacus, faucibus eu
aliquam
vestibulum.
in
eleifend.
count_words.py
count-words這個組件使用了field grouping
以field grouping模式接收來自split-words組件的數(shù)據(jù)流矾芙,field字段為word
所以脖岛,相同的word單詞朵栖,會流動到同一個節(jié)點。
from collections import defaultdict
from collections import namedtuple
import logging
from pyleus.storm import SimpleBolt
log = logging.getLogger('counter')
Counter = namedtuple("Counter", "word count") # 輸出是兩個字段
class CountWordsBolt(SimpleBolt):
OUTPUT_FIELDS = Counter # 輸出是兩個字段
def initialize(self):
# 在bolt啟動的時候初始化
# bolt是作為單例一直運行的
self.words = defaultdict(int)
def process_tuple(self, tup):
word, = tup.values # 獲得上游的word
self.words[word] += 1 # 計數(shù)
log.debug("{0} {1}".format(word, self.words[word]))
# 注意這里輸出到下游的是兩個字段 word 與 word的計數(shù)
self.emit((word, self.words[word]), anchors=[tup])
if __name__ == '__main__':
logging.basicConfig(
level=logging.DEBUG,
filename='/tmp/word_count_count_words.log',
format="%(message)s",
filemode='a',
)
CountWordsBolt().run()
這個bolt的作用是對通用的單詞進行計數(shù)柴梆,并且把兩個字段:單詞本身跟單詞計數(shù)傳遞給下游
word_count_count_words.log的內(nèi)容:
$ tail word_count_count_words.log
Ut 35894
laoreet 9472
Maecenas 8294
id. 19047
sapien, 11816
Suspendisse 9472
blandit 9599
Mauris 24100
erat 19047
a 16677
log_results.py
log-results組件用的是global grouping
在這種情況下所有來自上游的數(shù)據(jù)流都會到同一個節(jié)點陨溅,這樣log寫文件的話就不會有競爭問題了
import logging
from pyleus.storm import SimpleBolt
log = logging.getLogger('log_results')
class LogResultsBolt(SimpleBolt):
def process_tuple(self, tup):
word, count = tup.values # 從上游接收兩個字段
log.debug("%s: %d", word, count)
if __name__ == '__main__':
logging.basicConfig(
level=logging.DEBUG,
filename='/tmp/word_count_results.log',
format="%(message)s",
filemode='a',
)
LogResultsBolt().run()
這個bolt的作用只是落地,這里落地的方式是寫log文件(一般生產(chǎn)場景會落地到數(shù)據(jù)庫)绍在,落地文件內(nèi)容:
tail word_count_results.log
faucibus: 28713
adipiscing,: 8526
molestie: 11949
at,: 9742
Maecenas: 8392
diam: 11934
eget.: 9714
quam: 17203
mauris,: 24189
tincidunt: 9662
提交此topology到遠(yuǎn)程
pyleus build examples/word_count/pyleus_topology.yaml
pyleus submit -n NIMBUS_HOST exclamation_topology.jar
這里NIMBUS_HOST為遠(yuǎn)程的nimbus地址
pyleus的詳細(xì)文檔
關(guān)于spout
可以看到上面數(shù)單詞的例子實際上數(shù)據(jù)流的來源是一個測試程序隨機產(chǎn)生的门扇,在實際生產(chǎn)環(huán)境中雹有,我們一般會采用kafka來作為數(shù)據(jù)產(chǎn)生的源頭,
一個kafka的spout定義如下
(https://github.com/Yelp/pyleus/tree/develop/examples/kafka_spout)
# 這里的定義是沒有任何操作的臼寄,可以通過為它增加bolt來實現(xiàn)功能
name: kafka_spout_example # 自定義topology名字
topology:
- spout:
name: kafka-my_topic # 自定義spout名字
type: kafka # 制定類型為kafka
options:
# 配置kafka的topic
topic: my_topic
# 配置zookeeper地址霸奕,多個用逗號隔開
zk_hosts: zookeeper1:2181,zookeeper2:2181
# 配置給kafka存儲consumer offsets 的ZooKeeper Root path
# 默認(rèn)為: /pyleus-kafka-offsets/<topology name>
zk_root: /pyleus-kafka-offsets/kafka_spout_example
# Kafka consumer ID.
# 默認(rèn)為: pyleus-<topology name>
consumer_id: pyleus-kafka_spout_example
# 需要從某個offset開始嗎
# 默認(rèn)是false.
from_start: false
# 如果需要從某個offset開始則定義該offset
start_offset_time: 1398971060