Storm 性能優(yōu)化

目錄


  1. 場(chǎng)景假設(shè)
  2. 調(diào)優(yōu)步驟和方法
  3. Storm 的部分特性
  4. Storm 并行度
  5. Storm 消息機(jī)制
  6. Storm UI 解析
  7. 性能優(yōu)化

場(chǎng)景假設(shè)


在介紹 Storm 的性能調(diào)優(yōu)方法之前京痢,假設(shè)一個(gè)場(chǎng)景:
項(xiàng)目組部署了3臺(tái)機(jī)器斩披,計(jì)劃運(yùn)行且僅運(yùn)行 Storm(1.0.1) + Kafka(0.9.0.1) + Redis(3.2.1) 的小規(guī)模實(shí)驗(yàn)集群堆巧,集群的配置情況如下表:

主機(jī)名 硬件配置 角色描述
hd01 2CPUs, 4G RAM, 2TB 機(jī)械硬盤 nimbus, supervisor, ui, kafka, zk
hd02 2CPUs, 4G RAM, 2TB 機(jī)械硬盤 supervisor, kafka, zk
hd03 2CPUs, 4G RAM, 2TB 機(jī)械硬盤 supervisor, kafka, zk

現(xiàn)有一個(gè)任務(wù),需要實(shí)時(shí)計(jì)算訂單的各項(xiàng)匯總統(tǒng)計(jì)信息闯割。訂單數(shù)據(jù)通過(guò) kafka 傳輸。在 Storm 中創(chuàng)建了一個(gè) topology 來(lái)執(zhí)行此項(xiàng)任務(wù),并采用 Storm kafkaSpout 讀取該 topic 的數(shù)據(jù)官扣。kafka 和 Storm topology 的基本信息如下:

  • kafka topic partitions = 3
  • topology 的配置情況:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", new kafkaSpout(), 3);
builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout");
builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order"));

Config conf = new Config();
conf.setNumWorkers(2);
StormSubmitter.submitTopologyWithProgressBar("topology-name", conf, builder.createTopology());

那么墩莫,在此假設(shè)下芙委,Storm topology 的數(shù)據(jù)怎么分發(fā)?性能如何調(diào)優(yōu)狂秦?這就是下文要討論的內(nèi)容灌侣,其中性能調(diào)優(yōu)是最終目的侧啼,數(shù)據(jù)分發(fā)即 Storm 的消息機(jī)制堪簿,則是進(jìn)行調(diào)優(yōu)前的知識(shí)儲(chǔ)備慨菱。

調(diào)優(yōu)步驟和方法


Storm topology 的性能優(yōu)化方法符喝,整體來(lái)說(shuō),可依次劃分為以下幾個(gè)步驟:

  1. 硬件配置的優(yōu)化
  2. 代碼層面的優(yōu)化
  3. topology 并行度的優(yōu)化
  4. Storm 集群配置參數(shù)和 topology 運(yùn)行參數(shù)的優(yōu)化

其中第一點(diǎn)不是討論的重點(diǎn)甜孤,無(wú)外乎增加機(jī)器的硬件資源,提高機(jī)器的硬件配置等茉稠,但是這一步卻也不能忽略而线,因?yàn)闄C(jī)器配置太低恋日,很可能后面的步驟怎么調(diào)優(yōu)都無(wú)濟(jì)于事岂膳。

Storm 的一些特性和原理谈截,是進(jìn)行調(diào)優(yōu)的必要知識(shí)儲(chǔ)備

Storm 的部分特性

目前 Storm 的最新版本為 2.0.0-SNAPSHOT涧偷。該版本太新燎潮,未經(jīng)過(guò)大量驗(yàn)證和測(cè)試跟啤,因此本文的討論都基于 2.0 以前的版本唉锌。Storm 有如下幾個(gè)重要的特性:

  • DAG
  • 常駐內(nèi)存袄简,topology 啟動(dòng)后除非 kill 掉否則一直運(yùn)行
  • 提供 DRPC 服務(wù)
  • Pacemaker(1.0以后的新特性)心跳守護(hù)進(jìn)程绿语,常駐內(nèi)存,比ZooKeeper性能更好
  • 采用了 ZeroMQ 和 Netty 作為底層框架
  • 采用了 ACK/fail 方式的 tuple 追蹤機(jī)制
    并且 ack/fail 只能由創(chuàng)建該tuple的task所承載的spout觸發(fā)

了解這些機(jī)制對(duì)優(yōu)化 Storm 的運(yùn)行性能有一定幫助

Storm 并行度


Storm 是一個(gè)分布式的實(shí)時(shí)計(jì)算軟件种柑,各節(jié)點(diǎn)聚请,各組件間的通信依賴于 zookeeper驶赏。從組件的角度看既鞠,Storm 運(yùn)作機(jī)制構(gòu)建在 nimbus, supervisor, woker, executor, task, spout/bolt 之上嘱蛋,如果再加上 topology洒敏,有時(shí)也可以稱這些組件為 Concepts(概念)。詳見(jiàn)官網(wǎng)介紹文章 http://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html
http://storm.apache.org/releases/current/Tutorial.html

在介紹 Storm 并行度之前篙挽,先概括地了解下 Storm 的幾個(gè)概念铣卡,此處假定讀者有一定的 Storm 背景知識(shí)偏竟,至少曾經(jīng)跑過(guò)一個(gè) topology 實(shí)例。

nimbus 和 supervisor

nimbus 是 Storm 集群的管理和調(diào)度進(jìn)程蝉仇,一個(gè)集群?jiǎn)?dòng)一個(gè) nimbus轿衔,主要用于管理 topology害驹,執(zhí)行rebalance宛官,管理 supervisor瓦糕,分發(fā) task咕娄,監(jiān)控集群的健康狀況等圣勒。nimbus 依賴 zookeeper 來(lái)實(shí)現(xiàn)上述職責(zé),nimbus 與 supervisor 等其他組件并沒(méi)有直接的溝通胡控。運(yùn)行 nimbus 的節(jié)點(diǎn)成為主節(jié)點(diǎn)昼激,運(yùn)行 supervisor 的節(jié)點(diǎn)成為工作節(jié)點(diǎn)橙困,nimbus 向 supervisor 分派任務(wù)耕餐,因此 Storm 集群也是一個(gè) master/slave 集群肠缔。簡(jiǎn)單來(lái)說(shuō),nimbus 就是工頭壹蔓,supervisor 就是工人猫态,nimbus 通過(guò) zookeeper 來(lái)管理 supervisor亲雪。

supervisor 是一個(gè)工作進(jìn)程,負(fù)責(zé)監(jiān)聽(tīng) nimbus 分派的任務(wù)套像。當(dāng)它接到任務(wù)后夺巩,會(huì)啟動(dòng)一個(gè) worker 進(jìn)程柳譬,由 worker 運(yùn)行 topology 的一個(gè)子集美澳。為什么說(shuō)是子集呢制跟?因?yàn)楫?dāng)一個(gè) topology 提交到集群后雨膨,nimbus 便會(huì)根據(jù)該 topology 的配置(此處假定 numWorker=3)聊记,將 topology 分配給3個(gè) worker 并行執(zhí)行(正常情況下是這樣排监,也有不是均勻分配的舆床,比如有一個(gè) supervisor 節(jié)點(diǎn)內(nèi)存不足了)挨队。如果剛好集群有3個(gè) supervisor瞒瘸,則每個(gè) supervisor 會(huì)啟動(dòng)1個(gè) worker情臭,即一個(gè)節(jié)點(diǎn)啟動(dòng)一個(gè) worker(一個(gè)節(jié)點(diǎn)只能有一個(gè) supervisor 有效運(yùn)行)俯在。因此跷乐,worker 進(jìn)程運(yùn)行的是 topology 的一個(gè)子集愕提。supervisor 同樣通過(guò) zookeeper 與 nimbus 進(jìn)行交流浅侨,因此 nimbus 和 supervisor 都可以快速失敗/停止如输,因?yàn)樗械臓顟B(tài)信息都保存在本地文件系統(tǒng)的 zookeeper 中不见, 當(dāng)失敗停止運(yùn)行后稳吮,只需要重新啟動(dòng) nimbus 或 supervisor 進(jìn)程以快速恢復(fù)盖高。當(dāng)然喻奥,如果集群中正在工作的 supervisor 停止了撞蚕,其上運(yùn)行著的 topology 子集也會(huì)跟著停止甥厦,不過(guò)一旦 supervisor 啟動(dòng)起來(lái)刀疙,topology 子集又立刻恢復(fù)正常了谦秧。

nimbus 和 supervisor 的協(xié)作關(guān)系
worker

worker 是一個(gè)JVM進(jìn)程疚鲤,由 supervisor 啟動(dòng)和關(guān)閉集歇。當(dāng) supervisor 接到任務(wù)后诲宇,會(huì)根據(jù) topology 的配置啟動(dòng)若干 worker姑蓝,實(shí)際的任務(wù)執(zhí)行便由 worker 進(jìn)行它掂。worker 進(jìn)程會(huì)占用固定的可由配置進(jìn)行修改的內(nèi)存空間(默認(rèn)768M)虐秋。通常使用 conf.setNumWorkers() 函數(shù)來(lái)指定一個(gè) topolgoy 的 worker 數(shù)量用押。

executor

executor 是一個(gè)線程蜻拨,由 worker 進(jìn)程派生(spawned)缎讼。executor 線程負(fù)責(zé)根據(jù)配置派生 task 線程血崭,默認(rèn)一個(gè) executor 創(chuàng)建一個(gè) task夹纫,可通過(guò) setNumTask() 函數(shù)指定每個(gè) executor 的 task 數(shù)量舰讹。executor 將實(shí)例化后的 spout/bolt 傳遞給 task月匣。

task

task 可以說(shuō)是 topology 最終的實(shí)際的任務(wù)執(zhí)行者桶错,每個(gè) task 承載一個(gè) spout 或 bolt 的實(shí)例院刁,并調(diào)用其中的 spout.nexTuple()退腥,bolt.execute() 等方法,而 spout.nexTuple() 是數(shù)據(jù)的發(fā)射器嗅蔬,bolt.execute() 則是數(shù)據(jù)的接收方澜术,業(yè)務(wù)邏輯的代碼基本上都在這兩個(gè)函數(shù)里面處理了鸟废,因此可以說(shuō) task 是最終搬磚的苦逼盒延。

topology

topology 中文翻譯為拓?fù)涮硭拢愃朴?hdfs 上的一個(gè) mapreduce 任務(wù)计露。一個(gè) topology 定義了運(yùn)行一個(gè) Storm 任務(wù)的所有必要元件薄坏,主要包括 spout 和 bolt君账,以及 spout 和 bolt 之間的流向關(guān)系乡数。

topology 結(jié)構(gòu)
并行度

什么是并行度净赴?在 Storm 的設(shè)定里玖翅,并行度大體分為3個(gè)方面:

  1. 一個(gè) topology 指定多少個(gè) worker 進(jìn)程并行運(yùn)行金度;
  2. 一個(gè) worker 進(jìn)程指定多少個(gè) executor 線程并行運(yùn)行;
  3. 一個(gè) executor 線程指定多少個(gè) task 并行運(yùn)行跟伏。

一般來(lái)說(shuō)受扳,并行度設(shè)置越高辞色,topology 運(yùn)行的效率就越高相满,但是也不能一股腦地給出一個(gè)很高的值立美,還得考慮給每個(gè) worker 分配的內(nèi)存的大小碌更,還得平衡系統(tǒng)的硬件資源痛单,以避免浪費(fèi)旭绒。
Storm 集群可以運(yùn)行一個(gè)或多個(gè) topology挥吵,而每個(gè) topology 包含一個(gè)或多個(gè) worker 進(jìn)程忽匈,每個(gè) worer 進(jìn)程可派生一個(gè)或多個(gè) executor 線程,而每個(gè) executor 線程則派生一個(gè)或多個(gè) task嫌松,task 是實(shí)際的數(shù)據(jù)處理單元,也是 Storm 概念里最小的工作單元, spout 或 bolt 的實(shí)例便是由 task 承載碳默。

worker executor task 的關(guān)系

為了更好地解釋 worker、executor 和 task 之間的工作機(jī)制该抒,我們用官網(wǎng)的一個(gè)簡(jiǎn)單 topology 示例來(lái)介紹凑保。先看此 topology 的配置:

Config conf = new Config();
conf.setNumWorkers(2); // 為此 topology 配置兩個(gè) worker 進(jìn)程

topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // blue-spout 并行度=2

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) // green-bolt 并行度=2
               .setNumTasks(4)  // 為此 green-bolt 配置 4 個(gè) task
               .shuffleGrouping("blue-spout");

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)  // yellow-bolt 并行度=6
               .shuffleGrouping("green-bolt");

StormSubmitter.submitTopology(
    "mytopology",
    conf,
    topologyBuilder.createTopology()
);

從上面的代碼可以知道:

  • 這個(gè) topology 裝備了2個(gè) worker 進(jìn)程,也就是同樣的工作會(huì)有 2 個(gè)進(jìn)程并行進(jìn)行芝此,可以肯定地說(shuō)婚苹,2個(gè) worker 肯定比1個(gè) worker 執(zhí)行效率要高很多膊升,但是并沒(méi)有2倍的差距结胀;
  • 配置了一個(gè) blue-spout糟港,并且為其指定了 2 個(gè) executor秸抚,即并行度為2剥汤;
  • 配置了一個(gè) green-bolt吭敢,并且為其指定了 2 個(gè) executor,即并行度為2畜晰;
  • 配置了一個(gè) yellow-bolt凄鼻,并且為其指定了 6 個(gè) executor块蚌,即并行度為6;

大家看官方給出的下圖:

一個(gè) topology 的結(jié)構(gòu)圖示

可以看出虎敦,這個(gè)圖片完整無(wú)缺地還原了代碼里設(shè)定的 topology 結(jié)構(gòu):

  • 圖左最大的灰色方框其徙,表示這個(gè) topology访锻;
  • topology 里面剛好有兩個(gè)白色方框期犬,表示2個(gè) worker 進(jìn)程;
  • 每個(gè) worker 里面的灰色方框表示 executor 線程鲤妥,可以看到2個(gè) worker 方框里各有5個(gè) executor棉安,為什么呢贡耽?因?yàn)榇a里面指定的 spout 并行度=2,green-bolt并行度=2,yellow-bolt并行度=6职祷,加起來(lái)剛好是10有梆,而配置的 worker 數(shù)量為2,那么自然地痰催,這10個(gè) executor 會(huì)均勻地分配到2個(gè) worker 里面夸溶;
  • 每個(gè) executor 里面的黃藍(lán)綠(寫著Task)的方框扫皱,就是最小的處理單元 task 了韩脑。大家仔細(xì)看綠色的 Task 方框,與其他 Task 不同的是衩匣,兩個(gè)綠色方框同時(shí)出現(xiàn)在一個(gè) executor 方框內(nèi)琅捏。為什么會(huì)這樣呢?大家回到上文看 topology 的定義代碼搜吧,topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2).setNumTasks(4)滤奈,這里面的 setNumTasks(4) 表示為該 green-bolt 指定了4個(gè) task,且 executor 的并行度為2昭躺,那么自然地领炫,這4個(gè) task 會(huì)均勻地分配到2個(gè) executor 里面似舵;
  • 圖右的三個(gè)圓圈,依次是藍(lán)色的 blue-spout碟狞,綠色的 green-bolt 和黃色的 yellow-bolt啄枕,并且用箭頭指示了三個(gè)組件之間的關(guān)系。spout 是數(shù)據(jù)的產(chǎn)生元件族沃,而 green-bolt 則是數(shù)據(jù)的中間接收節(jié)點(diǎn),yellow-bolt 則是數(shù)據(jù)的最后接收節(jié)點(diǎn)脆淹。這也是 DAG 的體現(xiàn)常空,有向的(箭頭不能往回走)無(wú)環(huán)圖。

參考
http://storm.apache.org/releases/1.0.1/Understanding-the-parallelism-of-a-Storm-topology.html
http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/

一個(gè) topology 的代碼較完整例子
TopologyBuilder builder = new TopologyBuilder();

BrokerHosts hosts       = new ZkHosts(zkConns);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, clintId);
spoutConfig.scheme      = new SchemeAsMultiScheme(new StringScheme());

/** 指示 kafkaSpout 從 kafka topic 最后記錄的 ofsset 開始讀取數(shù)據(jù) */
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("kafkaSpout", kafkaSpout, 3); // spout 并行度=3
builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout"); // FilterBolt 并行度=3
builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order")); // AlertBolt 并行度=3

Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(3); // 為此 topology 配置3個(gè) worker 進(jìn)程
conf.setMaxSpoutPending(10000);

try {
    StormSubmitter.submitTopologyWithProgressBar(topology, conf, builder.createTopology());
} catch (Exception e) {
    e.printStackTrace();
}

Storm 消息機(jī)制


Storm 主要提供了兩種消息保證機(jī)制(Message Processing Guarantee)

  • 至少一次 At least once
  • 僅且一次 exactly once

其中 exactly once 是通過(guò) Trident 方式實(shí)現(xiàn)的(exactly once through Trident)盖溺。兩種模式的選擇要視業(yè)務(wù)情況而定漓糙,有些場(chǎng)景要求精確的僅且一次消費(fèi),比如訂單處理烘嘱,決不能允許重復(fù)的處理訂單昆禽,因?yàn)楹芸赡軙?huì)導(dǎo)致訂單金額、交易手?jǐn)?shù)等計(jì)算錯(cuò)誤蝇庭;有些場(chǎng)景允許一定的重復(fù)醉鳖,比如頁(yè)面點(diǎn)擊統(tǒng)計(jì),訪客統(tǒng)計(jì)等哮内〉量茫總之,不管何種模式北发,Storm 都能保證數(shù)據(jù)不會(huì)丟失纹因,開發(fā)者需要關(guān)心的是,如何保證數(shù)據(jù)不會(huì)重復(fù)消費(fèi)琳拨。

At least once 的消息處理機(jī)制瞭恰,在運(yùn)用時(shí)需要格外小心,Storm 采用 ack/fail 機(jī)制來(lái)追蹤消息的流向从绘,當(dāng)一個(gè)消息(tuple)發(fā)送到下游時(shí)寄疏,如果超時(shí)未通知 spout,或者發(fā)送失敗僵井,Storm 默認(rèn)會(huì)根據(jù)配置策略進(jìn)行重發(fā),可通過(guò)調(diào)節(jié)重發(fā)策略來(lái)盡量減少消息的重復(fù)發(fā)送驳棱。一個(gè)常見(jiàn)情況是批什,Storm 集群經(jīng)常會(huì)超負(fù)載運(yùn)行,導(dǎo)致下游的 bolt 未能及時(shí) ack社搅,從而導(dǎo)致 spout 不斷的重發(fā)一個(gè) tuple驻债,進(jìn)而導(dǎo)致消息大量的重復(fù)消費(fèi)乳规。

在與 Kafka 集成時(shí),常用 Storm 提供的 kafkaSpout 作為 spout 消費(fèi) kafka 中的消息合呐。Storm 提供的 kafkaSpout 默認(rèn)有兩種實(shí)現(xiàn)方式:至少一次消費(fèi)的 core Storm spouts 和僅且一次消費(fèi)的 Trident spouts :(We support both Trident and core Storm spouts)暮的。

在 Storm 里面,消息的處理淌实,通過(guò)兩個(gè)組件進(jìn)行:spout 和 bolt冻辩。其中 spout 負(fù)責(zé)產(chǎn)生數(shù)據(jù),bolt 負(fù)責(zé)接收并處理數(shù)據(jù)拆祈,業(yè)務(wù)邏輯代碼一般都寫入 bolt 中恨闪。可以定義多個(gè) bolt 放坏,bolt 與 bolt 之間可以指定單向鏈接關(guān)系咙咽。通常的作法是钧敞,在 spout 里面讀取諸如 kafka麸粮,mysql溉苛,redis炊昆,elasticsearch 等數(shù)據(jù)源的數(shù)據(jù),并發(fā)射(emit)給下游的 bolt威根,定義多個(gè) bolt洛搀,分別進(jìn)行多個(gè)不同階段的數(shù)據(jù)處理留美,比如第一個(gè) bolt 負(fù)責(zé)過(guò)濾清洗數(shù)據(jù)谎砾,第二個(gè) bolt 負(fù)責(zé)邏輯計(jì)算景图,并產(chǎn)生最終運(yùn)算結(jié)果,寫入 redis扣典,mysql,hdfs 等目標(biāo)源湿硝。

Storm 將消息封裝在一個(gè) Tuple 對(duì)象里图柏,Tuple 對(duì)象經(jīng)由 spout 產(chǎn)生后通過(guò) emit() 方法發(fā)送給下游 bolt蚤吹,下游的所有 bolt 也同樣通過(guò) emit() 方法將 tuple 傳遞下去裁着。一個(gè) tuple 可能是一行 mysql 記錄二驰,也可能是一行文件內(nèi)容,具體視 spout 如何讀入數(shù)據(jù)源矗积,并如何發(fā)射給下游棘捣。

如下圖乍恐,是一個(gè) spout/bolt 的執(zhí)行過(guò)程:


spout/bolt 的執(zhí)行過(guò)程

spout -> open(pending狀態(tài)) -> nextTuple -> emit -> bolt -> execute -> ack(spout) / fail(spout) -> message-provider 將該消息移除隊(duì)列(complete) / 將消息重新壓回隊(duì)列

ACK/Fail

上文說(shuō)到,Storm 保證了數(shù)據(jù)不會(huì)丟失瞧毙,ack/fail 機(jī)制便是實(shí)現(xiàn)此機(jī)制的法寶。Storm 在內(nèi)部構(gòu)建了一個(gè) tuple tree 來(lái)表示每一個(gè) tuple 的流向释漆,當(dāng)一個(gè) tuple 被 spout 發(fā)射給下游 bolt 時(shí)男图,默認(rèn)會(huì)帶上一個(gè) messageId逊笆,可以由代碼指定但默認(rèn)是自動(dòng)生成的,當(dāng)下游的 bolt 成功處理 tuple 后乃戈,會(huì)通過(guò) acker 進(jìn)程通知 spout 調(diào)用 ack 方法症虑,當(dāng)處理超時(shí)或處理失敗谍憔,則會(huì)調(diào)用 fail 方法。當(dāng) fail 方法被調(diào)用沈条,消息可能被重發(fā)蜡歹,具體取決于重發(fā)策略的配置,和所使用的 spout父款。

對(duì)于一個(gè)消息憨攒,Storm 提出了『完全處理』的概念肝集。即一個(gè)消息是否被完全處理所刀,取決于這個(gè)消息是否被 tuple tree 里的每一個(gè) bolt 完全處理浮创,當(dāng) tuple tree 中的所有 bolt 都完全處理了這條消息后,才會(huì)通知 acker 進(jìn)程并調(diào)用該消息的原始發(fā)射 spout 的 ack 方法雏掠,否則會(huì)調(diào)用 fail 方法乡话。

ack/fail 只能由創(chuàng)建該 tuple 的 task 所承載的 spout 觸發(fā)

默認(rèn)情況下,Storm 會(huì)在每個(gè) worker 進(jìn)程里面啟動(dòng)1個(gè) acker 線程闸婴,以為 spout/bolt 提供 ack/fail 服務(wù)邪乍,該線程通常不太耗費(fèi)資源,因此也無(wú)須配置過(guò)多吕晌,大多數(shù)情況下1個(gè)就足夠了睛驳。


ack/fail 示意
Worker 間通信

上文所說(shuō)是在一個(gè) worker 內(nèi)的情況淫茵,但是 Storm 是一個(gè)分布式的并行計(jì)算框架,而實(shí)現(xiàn)并行的一個(gè)關(guān)鍵方式,便是一個(gè) topology 可以由多個(gè) worker 進(jìn)程分布在多個(gè) supervisor 節(jié)點(diǎn)并行地執(zhí)行扼劈。那么荐吵,多個(gè) worker 之間必然是會(huì)有通信機(jī)制的。nimbus 和 supervsor 之間僅靠 zookeeper 進(jìn)行溝通薯蝎,那么為何 worker 之間不通過(guò) zookeeper 之類的中間件進(jìn)行溝通呢?其中的一個(gè)原因我想消略,應(yīng)該是組件隔離的原則艺演。worker 是 supervisor 管理下的一個(gè)進(jìn)程,那么 worker 如果也采用 zookeeper 進(jìn)行溝通哩照,那么就有一種越級(jí)操作的嫌疑了识藤。

Worker 間通信

大家看上圖,一個(gè) worker 進(jìn)程裝配了如下幾個(gè)元件:

  • 一個(gè) receive 線程赶撰,該線程維護(hù)了一個(gè) ArrayList豪娜,負(fù)責(zé)接收其他 worker 的 sent 線程發(fā)送過(guò)來(lái)的數(shù)據(jù),并將數(shù)據(jù)存儲(chǔ)到 ArrayList 中鸣奔。數(shù)據(jù)首先存入 receive 線程的一個(gè)緩沖區(qū),可通過(guò) topology.receiver.buffer.size (此項(xiàng)配置在 Storm 1.0 版本以后被刪除了锨匆,該參數(shù)指示 receive 線程批量讀取并轉(zhuǎn)發(fā)消息的最大數(shù)量)來(lái)配置該緩沖區(qū)存儲(chǔ)消息的最大數(shù)量,默認(rèn)為8(個(gè)數(shù)侥蒙,并且得是2的倍數(shù))鞭衩,然后被推送到 ArrayList 中。receive 線程接收數(shù)據(jù)坯台,是通過(guò)監(jiān)聽(tīng) TCP的端口稠炬,該端口有 storm 配置文件中 supervisor.slots.prots 來(lái)配置首启,比如 6700;
  • 一個(gè) sent 線程钥飞,該線程維護(hù)了一個(gè)消息隊(duì)列,負(fù)責(zé)將隊(duì)里中的消息發(fā)送給其他 worker 的 receive 線程。同樣具有緩沖區(qū),可通過(guò) topology.transfer.buffer.size 來(lái)配置緩沖區(qū)存儲(chǔ)消息的最大數(shù)量耻讽,默認(rèn)為1024(個(gè)數(shù),并且得是2的倍數(shù))慰枕。該參數(shù)指示 sent 線程批量讀取并轉(zhuǎn)發(fā)消息的最大數(shù)量具帮。sent 線程發(fā)送數(shù)據(jù),是通過(guò)一個(gè)隨機(jī)分配的TCP端口來(lái)進(jìn)行的唇跨。
  • 一個(gè)或多個(gè) executor 線程。executor 內(nèi)部同樣擁有一個(gè) receive buffer 和一個(gè) sent buffer,其中 receive buffer 接收來(lái)自 receive 線程的的數(shù)據(jù)兼砖,sent buffer 向 sent 線程發(fā)送數(shù)據(jù);而 task 線程則介于 receive buffer 和 sent buffer 之間耽梅。receive buffer 的大小可通過(guò) Conf.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE 參數(shù)配置,sent buffer 的大小可通過(guò) Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE 配置众旗,兩個(gè)參數(shù)默認(rèn)都是 1024(個(gè)數(shù),并且得是2的倍數(shù))利朵。
Config conf = new Config();
conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE,             16); // 默認(rèn)8
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,            32);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,      16384);

參考
http://storm.apache.org/releases/1.0.1/Guaranteeing-message-processing.html
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/

Storm UI 解析


首頁(yè)
  • Cluster Summary


    Cluster Summary
參數(shù)名 說(shuō)明
Supervisors 集群中配置的 supervisor 數(shù)量
Used slots 集群中已用掉的 workers 數(shù)量
Free slots 集群中空閑的 workers 數(shù)量
Total slots 集群中總的的 workers 數(shù)量
Executors 當(dāng)前集群中總的 Executor 線程數(shù)量益眉,該值等于集群中所有 topology 的所有 spout/bolt 配置的 executor 數(shù)量之和年碘,其中默認(rèn)情況下每個(gè) worker 進(jìn)程還會(huì)派生一個(gè) acker executor 線程,也一并計(jì)算在內(nèi)了
Tasks 當(dāng)前集群中總的 task 數(shù)量涤久,也是所有 executor 派生的 task 數(shù)量之和
  • Nimbus Summary
    比較簡(jiǎn)單考抄,就略過(guò)了

  • Topology Summary


    Topology Summary

這部分也比較簡(jiǎn)單,值得注意的是 Assigned Mem (MB)贫途,這里值得是分配給該 topolgoy 下所有 worker 工作內(nèi)存之和,單個(gè) worker 的內(nèi)存配置可由 Config.WORKER_HEAP_MEMORY_MB 和 Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB 指定怨酝,默認(rèn)為 768M,另外再加上默認(rèn) 64M 的 logwritter 進(jìn)程內(nèi)存空間瞄摊,則有 832M楔壤。
此處 fast-pay 的值為 2496M = 3*832

  • Supervisor Summary


    Supervisor Summary

此處也比較簡(jiǎn)單,值得注意的是 slot 和 used slot 分別表示當(dāng)前節(jié)點(diǎn)總的可用 worker 數(shù)隙畜,及已用掉的 worker 數(shù)。

  • Nimbus Configuration


    Nimbus Configuration

可搜索和查看當(dāng)前 topology 的各項(xiàng)配置參數(shù)

topology 頁(yè)面
  • Topology summary


    Topology summary

此處的大部分配置與上文中出現(xiàn)的意義一樣,值得注意的是
Num executors 和 Num tasks 的值夫啊。其中 Num executors 的數(shù)量等于當(dāng)前 topology 下所有 spout/bolt 的并行度總和,再加上所有 worker 下的 acker executor 線程總數(shù)(默認(rèn)情況下一個(gè) worker 派生一個(gè) acker executor)。

  • Topology actions


    Topology actions
按鈕 說(shuō)明
Activate 激活此 topology
Deactivate 暫停此 topology 運(yùn)行
Rebalance 調(diào)整并行度并重新平衡資源
Kill 關(guān)閉并刪除此 topology
Debug 調(diào)試此 topology 運(yùn)行,需要設(shè)置 topology.eventlogger.executors 數(shù)量 > 0
Stop Debug 停止調(diào)試
Change Log Level 調(diào)整日志級(jí)別
  • Topology stats


    Topology stats
參數(shù) 說(shuō)明
Window 時(shí)間窗口,比如"10m 0s"表示在topology啟動(dòng)后10m 0s之內(nèi)
Emitted 此時(shí)間窗口內(nèi)發(fā)射的總tuple數(shù)
Transferred 此時(shí)間窗口內(nèi)成功轉(zhuǎn)移到下一個(gè)bolt的tuple數(shù)
Complete latency (ms) 此時(shí)間窗口內(nèi)每個(gè)tuple在tuple tree中完全處理所花費(fèi)的平均時(shí)間
Acked 此時(shí)間窗口內(nèi)成功處理的tuple數(shù)
Failed 此時(shí)間窗口內(nèi)處理失敗或超時(shí)的tuple數(shù)
  • Spouts (All time)


    Spouts (All time)
參數(shù) 說(shuō)明
Id topologoy 中 spout 的名稱,一般是在代碼里設(shè)置的
Executors 當(dāng)前分配給此 spout 的 executor 線程總數(shù)
Tasks 當(dāng)前分配給此 spout 的 task 線程總數(shù)
Emitted 截止當(dāng)前發(fā)射的總tuple數(shù)
Transferred 截止當(dāng)前成功轉(zhuǎn)移到下一個(gè)bolt的tuple數(shù)
Complete latency (ms) 截止當(dāng)前每個(gè)tuple在tuple tree中完全處理所花費(fèi)的平均時(shí)間
Acked 截止當(dāng)前成功處理的tuple數(shù)
Failed 截止當(dāng)前處理失敗或超時(shí)的tuple數(shù)
  • Bolts (All time)


    Bolts (All time)
參數(shù) 說(shuō)明
Id topologoy 中 bolt 的名稱日矫,一般是在代碼里設(shè)置的
Executors 當(dāng)前分配給此 bolt 的 executor 線程總數(shù)
Tasks 當(dāng)前分配給此 bolt 的 task 線程總數(shù)
Emitted 截止當(dāng)前發(fā)射的總tuple數(shù)
Transferred 截止當(dāng)前成功轉(zhuǎn)移到下一個(gè)bolt的tuple數(shù)
Complete latency (ms) 截止當(dāng)前每個(gè)tuple在tuple tree中完全處理所花費(fèi)的平均時(shí)間
Capacity (last 10m) 性能指標(biāo)盈魁,取值越小越好,當(dāng)接近1的時(shí)候,說(shuō)明負(fù)載很嚴(yán)重辅搬,需要增加并行度,正常是在 0.0x 到 0.1 0.2 左右堪遂。該值計(jì)算方式為 (number executed * average execute latency) / measurement time
Execute latency (ms) 截止當(dāng)前成功處理的tuple數(shù)
Executed 截止當(dāng)前處理過(guò)的tuple數(shù)
Process latency (ms) 截止當(dāng)前單個(gè) tuple 的平均處理時(shí)間介蛉,越小越好,正常也是 0.0x 級(jí)別溶褪;如果很大币旧,可以考慮增加并行度猿妈,但主要以 Capacity 為準(zhǔn)
Acked 截止當(dāng)前成功處理的tuple數(shù)
Failed 截止當(dāng)前處理失敗或超時(shí)的tuple數(shù)
spout 頁(yè)面

這個(gè)頁(yè)面吹菱,大部分都比較簡(jiǎn)單,就不一一說(shuō)明了彭则,值得注意的是下面這個(gè) Tab:

  • Executors (All time)


    Executors (All time)

這個(gè)Tab的參數(shù)鳍刷,應(yīng)該不用解釋了,但是要注意看俯抖,Emitted输瓜,Transferred 和 Acked 這幾個(gè)參數(shù),看看是否所有的 executor 都均勻地分擔(dān)了 tuple 的處理工作芬萍。

bolt 頁(yè)面

這個(gè)頁(yè)面與 spout 頁(yè)面類似尤揣,也不贅述了。

參考:這個(gè)頁(yè)面通過(guò) API 的方式柬祠,對(duì) UI 界面的參數(shù)做了一些解釋
http://storm.apache.org/releases/1.0.1/STORM-UI-REST-API.html

Storm debug


Storm 提供了良好的 debug 措施北戏,許多操作可以再 UI 上完成,也可以在命令行完成漫蛔。比如 Change log level 在不重啟 topology 的情況下動(dòng)態(tài)修改日志記錄的級(jí)別嗜愈,在 UI 界面上查看某個(gè) bolt 的日志等,當(dāng)然也可以在命令行上操作莽龟。

# 幾個(gè)與 debug 相關(guān)的命令
bin/storm set_log_level [topology name]-l [logger name]=[LEVEL]:[TIMEOUT]
bin/storm logviewer &

下面的參考文章寫的很詳細(xì)芝硬,大家有興趣可以去閱讀一下,本文就不再討論了轧房。

參考
https://community.hortonworks.com/articles/36151/debugging-an-apache-storm-topology.html

性能調(diào)優(yōu)


上文說(shuō)了這么多,這才進(jìn)入主題绍绘。

1奶镶、合理地配置硬件資源

此處暫不討論

2、優(yōu)化代碼的執(zhí)行性能

要優(yōu)化代碼的性能陪拘,如果嚴(yán)謹(jǐn)一點(diǎn)厂镇,首先要有一個(gè)衡量代碼執(zhí)行效率的方式。在數(shù)學(xué)上左刽,通常使用大O函數(shù)來(lái)衡量一個(gè)算法的時(shí)間復(fù)雜度捺信。我們可以考慮使用大O函數(shù)來(lái)近似地估計(jì)一個(gè)代碼片段的執(zhí)行時(shí)間:假定一行代碼花費(fèi)1個(gè)單位時(shí)間,那么代碼片段的時(shí)間復(fù)雜度可以近似地用大O表示為 O(n),其中n表示代碼的行數(shù)或執(zhí)行次數(shù)迄靠。當(dāng)然秒咨,如果代碼里引入了其他的類和函數(shù),或者處于循環(huán)體內(nèi)掌挚,那么其他類雨席、函數(shù)的代碼行數(shù),以及循環(huán)體內(nèi)代碼的重復(fù)執(zhí)行次數(shù)也需要統(tǒng)計(jì)在內(nèi)吠式。這里說(shuō)到大O函數(shù)的概念陡厘,在實(shí)際中也很少用到,我們往往會(huì)用第三方工具來(lái)較為準(zhǔn)確地計(jì)算代碼的實(shí)際執(zhí)行時(shí)間特占,但是理解這個(gè)概念有助于優(yōu)化我們的代碼糙置。有興趣的同學(xué)可以閱讀《算法概論》這本書。

這里順便舉一個(gè)斐波那契數(shù)列的例子:

/** C代碼:用遞歸實(shí)現(xiàn)的斐波那契數(shù)列 */
int fibonacci(unsigned int n)
{
    if (n <= 0) return 0;
    if (n == 1) return 1;
    return fibonacci(n - 1) + fibonacci(n - 2);
}
/** C代碼:用循環(huán)實(shí)現(xiàn)的斐波那契數(shù)列 */
int fibonacci(unsigned int n)
{
    int r, a, b;
    int i;
    int result[2] = {0, 1};

    if (n < 2) return result[n];

    a = 0;
    b = 1;
    r = 0;
    for (i = 2; i <= n; i++)
    {
        r = a + b;
        a = b;
        b = r;
    }
    return r;
}

觀看兩個(gè)不同實(shí)現(xiàn)的例子是目,第一種遞歸的方式谤饭,當(dāng)傳入的n很大時(shí),代碼執(zhí)行的時(shí)間將會(huì)呈指數(shù)級(jí)增長(zhǎng)胖笛,這時(shí)T(n)接近于 O(2^n)网持;第二種循環(huán)的方式,即使傳入很大的n长踊,代碼也可以在較短的時(shí)間內(nèi)執(zhí)行完畢功舀,這時(shí)T(n)接近于O(n),為什么是O(n)呢身弊,比如說(shuō)n=1000辟汰,那么整個(gè)算法的執(zhí)行時(shí)間基本集中在那個(gè) for 循環(huán)里了,相當(dāng)于執(zhí)行了 for 循環(huán)內(nèi)3行代碼1000多次阱佛,所以差不多是n帖汞。這其實(shí)就是一種用空間換時(shí)間的概念,利用循環(huán)代替遞歸的方式凑术,從而大大地優(yōu)化了代碼的執(zhí)行效率翩蘸。

回到我們的 Storm。代碼優(yōu)化淮逊,歸結(jié)起來(lái)催首,應(yīng)該有這幾種:

  • 在算法層面進(jìn)行優(yōu)化
  • 在業(yè)務(wù)邏輯層面進(jìn)行優(yōu)化
  • 在技術(shù)層面進(jìn)行優(yōu)化
  • 特定于 Storm,合理地規(guī)劃 topology泄鹏,即安排多少個(gè) bolt郎任,每個(gè) bolt 做什么,鏈接關(guān)系如何

在技術(shù)層面進(jìn)行優(yōu)化备籽,手法就非常多了舶治,比如連接數(shù)據(jù)庫(kù)時(shí),運(yùn)用連接池,常用的連接池有 alibaba 的 druid霉猛,還有 redis 的連接池尺锚;比如合理地使用多線程,合理地優(yōu)化JVM參數(shù)等等韩脏。這里舉一個(gè)工作中可能會(huì)遇到的例子來(lái)介紹一下:

在配置了多個(gè)并行度的 bolt 中缩麸,存取 redis 數(shù)據(jù)時(shí),如果不使用 redis 線程池赡矢,那么很可能會(huì)遇到 topology 運(yùn)行緩慢杭朱,spout 不斷重發(fā),甚至直接掛掉的情況吹散。首先 redis 的單個(gè)實(shí)例并不是線程安全的弧械,其次在不使用 redis-pool 的情況下,每次讀取 redis 都創(chuàng)建一個(gè) redis 連接空民,同創(chuàng)建一個(gè) mysql 連接一樣刃唐,在連接 redis 時(shí)所耗費(fèi)的時(shí)間相較于 get/set 本身是非常巨大的。

/**
 * redis-cli 操作工具類
 */
package net.mtide.dbtool;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisCli {

    private static JedisPool pool = null;
    
    private final static Logger logger = LoggerFactory.getLogger(RedisCli.class);
    
    /**
     * 同步初始化 JedisPool
     */
    private static synchronized void initPool() {
        if (pool == null) {
            String hosts = "HOST";
            String port  = "PORT";
            String pass  = "PASS";
            pool = new JedisPool(new JedisPoolConfig(), hosts, Integer.parseInt(port), 2000, pass);
        }
    }

    /**
     * 將連接 put back to pool
     * 
     * @param jedis
     */
    private static void returnResource(final Jedis jedis) {
        if (pool != null && jedis != null) {
            pool.returnResource(jedis);
        }
    }
    
    /**
     * 同步獲取 Jedis 實(shí)例
     * 
     * @return Jedis
     */
    public synchronized static Jedis getJedis() {
        if (pool == null) {
            initPool();
        }
        return pool.getResource();
    }
    
    public static void set(final String key, final String value) {
        Jedis jedis = getJedis();
        try {
            jedis.set(key, value);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }
    }

    public static void set(final String key, final String value, final int seconds) {
        Jedis jedis = getJedis();
        try {
            jedis.set(key, value);
            jedis.expire(key, seconds);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }
    }

    public static String get(final String key) {
        String value = null;
        
        Jedis jedis = getJedis();
        try {
            value = jedis.get(key);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }
        
        return value;
    }

    public static List<String> mget(final String... keys) {
        List<String> value = null;
        
        Jedis jedis = getJedis();
        try {
            value = jedis.mget(keys);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }
        
        return value;
    }

    public static Long del(final String key) {
        Long value = null;
        
        Jedis jedis = getJedis();
        try {
            value = jedis.del(key);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }
        
        return value;
    }

    public static Long expire(final String key, final int seconds) {
        Long value = null;
        
        Jedis jedis = getJedis();
        try {
            value = jedis.expire(key, seconds);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }
        
        return value;
    }

    public static Long incr(final String key) {
        Long value = null;
        
        Jedis jedis = getJedis();
        try {
            value = jedis.incr(key);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }
        
        return value;
    }
}

當(dāng)一個(gè)配置了多個(gè)并行度的 topology 運(yùn)行在集群上時(shí)界轩,如果 redis 操作不當(dāng)画饥,很可能會(huì)造成運(yùn)行該 redis 的 bolt 長(zhǎng)時(shí)間阻塞,從而造成 tuple 傳遞超時(shí)浊猾,默認(rèn)情況下 spout 在 fail 后會(huì)重發(fā)該 tuple抖甘,然而 redis 阻塞的問(wèn)題沒(méi)有解決,重發(fā)不僅不能解決問(wèn)題葫慎,反而會(huì)加重集群的運(yùn)行負(fù)擔(dān)衔彻,那么 spout 重發(fā)越來(lái)越多,fail 的次數(shù)也越來(lái)越多偷办, 最終導(dǎo)致數(shù)據(jù)重復(fù)消費(fèi)越來(lái)越嚴(yán)重艰额。上面貼出來(lái)的 RedisCli 工具類,可以在多線程的環(huán)境下安全的使用 redis椒涯,從而解決了阻塞的問(wèn)題柄沮。

3、合理的配置并行度

有幾個(gè)手段可以配置 topology 的并行度:

  • conf.setNumWorkers() 配置 worker 的數(shù)量
  • builder.setBolt("NAME", new Bolt(), 并行度) 設(shè)置 executor 數(shù)量
  • spout/bolt.setNumTask() 設(shè)置 spout/bolt 的 task 數(shù)量

現(xiàn)在回到我們的一開始的場(chǎng)景假設(shè):

項(xiàng)目組部署了3臺(tái)機(jī)器废岂,計(jì)劃運(yùn)行一個(gè) Storm(1.0.1) + Kafka(0.9.0.1) + Redis(3.2.1) 的小規(guī)模實(shí)驗(yàn)集群铡溪,每臺(tái)機(jī)器的配置為 2CPUs,4G RAM

/** 初始配置 */
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", new kafkaSpout(), 3);
builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout");
builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order"));

Config conf = new Config();
conf.setNumWorkers(2);
StormSubmitter.submitTopologyWithProgressBar("topology-name", conf, builder.createTopology());

那么問(wèn)題是:

  1. setNumWorkers 應(yīng)該取多少泪喊?取決于哪些因素?
  2. kafkaSpout 的并行度應(yīng)該取多少髓涯?取決于哪些因素袒啼?
  3. FilterBolt 的并行度應(yīng)該取多少?取決于哪些因素?
  4. AlertBolt 的并行度應(yīng)該取多少蚓再?取決于哪些因素滑肉?
  5. FilterBolt 用 shuffleGrouping 是最好的嗎?
  6. AlertBolt 用 fieldsGrouping 是最好的嗎摘仅?

回答如下:
第一個(gè)問(wèn)題:關(guān)于 worker 的并行度:worker 可以分配到不同的 supervisor 節(jié)點(diǎn)靶庙,這也是 Storm 實(shí)現(xiàn)多節(jié)點(diǎn)并行計(jì)算的主要配置手段。據(jù)此娃属, workers 的數(shù)量六荒,可以說(shuō)是越多越好,但也不能造成浪費(fèi)矾端,而且也要看硬件資源是否足夠掏击。所以主要考慮集群各節(jié)點(diǎn)的內(nèi)存情況:默認(rèn)情況下,一個(gè) worker 分配 768M 的內(nèi)存秩铆,外加 64M 給 logwriter 進(jìn)程砚亭;因此一個(gè) worker 會(huì)耗費(fèi) 832M 內(nèi)存;題設(shè)的集群有3個(gè)節(jié)點(diǎn)殴玛,每個(gè)節(jié)點(diǎn)4G內(nèi)存捅膘,除去 linux 系統(tǒng)、kafka滚粟、zookeeper 等的消耗寻仗,保守估計(jì)僅有2G內(nèi)存可用來(lái)運(yùn)行 topology,由此可知坦刀,當(dāng)集群只有一個(gè) topology 在運(yùn)行的情況下愧沟,最多可以配置6個(gè) worker。
另外鲤遥,我們還可以調(diào)節(jié) worker 的內(nèi)存空間沐寺。這取決于流過(guò) topology 的數(shù)據(jù)量的大小以及各 bolt 單元的業(yè)務(wù)代碼的執(zhí)行時(shí)間。如果數(shù)據(jù)量特別大盖奈,代碼執(zhí)行時(shí)間較長(zhǎng)混坞,那么可以考慮增加單個(gè) worker 的工作內(nèi)存。有一點(diǎn)需要注意的是钢坦,一個(gè) worker 下的所有 executor 和 task 都是共享這個(gè) worker 的內(nèi)存的究孕,也就是假如一個(gè) worker 分配了 768M 內(nèi)存,3個(gè) executor爹凹,6個(gè) task厨诸,那么這個(gè) 3 executor 和 6 task 其實(shí)是共用這 768M 內(nèi)存的,但是好處是可以充分利用多核 CPU 的運(yùn)算性能禾酱。

總結(jié)起來(lái)微酬,worker 的數(shù)量绘趋,取值因素有:

  • 節(jié)點(diǎn)數(shù)量,及其內(nèi)存容量
  • 數(shù)據(jù)量的大小和代碼執(zhí)行時(shí)間

機(jī)器的CPU颗管、帶寬陷遮、磁盤性能等也會(huì)對(duì) Storm 性能有影響,但是這些外在因素一般不影響 worker 數(shù)量的決策垦江。

需要注意的是帽馋,Storm 在默認(rèn)情況下,每個(gè) supervisor 節(jié)點(diǎn)只允許最多4個(gè) worker(slot)進(jìn)程運(yùn)行比吭;如果所配置的 worker 數(shù)量超過(guò)這個(gè)限制绽族,則需要在 storm 配置文件中修改。

第二個(gè)問(wèn)題:關(guān)于 FilterBolt 的并行度:如果 spout 讀取的是 kafka 的數(shù)據(jù)梗逮,那么正常情況下项秉,設(shè)置為 topic 的分區(qū)數(shù)量即可。計(jì)算 kafkaSpout 的最佳取值慷彤,有一個(gè)最簡(jiǎn)單的辦法娄蔼,就是在 Storm UI里面,點(diǎn)開 topology 的首頁(yè)底哗,在 Spouts (All time) 下岁诉,查看以下幾個(gè)參數(shù)的值:

  • Emitted 已發(fā)射出去的tuple數(shù)
  • Transferred 已轉(zhuǎn)移到下一個(gè)bolt的tuple數(shù)
  • Complete latency (ms) 每個(gè)tuple在tuple tree中完全處理所花費(fèi)的平均時(shí)間
  • Acked 成功處理的tuple數(shù)
  • Failed 處理失敗或超時(shí)的tuple數(shù)
Paste_Image.png

怎么看這幾個(gè)參數(shù)呢?有幾個(gè)技巧:

  • 正常情況下 Failed 值為0跋选,如果不為0涕癣,考慮增加該 spout 的并行度。這是最重要的一個(gè)判斷依據(jù)前标;
  • 正常情況下坠韩,Emitted、Transferred和Acked這三個(gè)值應(yīng)該是相等或大致相等的炼列,如果相差太遠(yuǎn)只搁,要么該 spout 負(fù)載太重,要么下游負(fù)載過(guò)重俭尖,需要調(diào)節(jié)該 spout 的并行度氢惋,或下游 bolt 的并行度;
  • Complete latency (ms) 時(shí)間稽犁,如果很長(zhǎng)焰望,十秒以上就已經(jīng)算很長(zhǎng)的了。當(dāng)然具體時(shí)間取決于代碼邏輯已亥,bolt 的結(jié)構(gòu)熊赖,機(jī)器的性能等。

kafka 只能保證同一分區(qū)下消息的順序性虑椎,當(dāng) spout 配置了多個(gè) executor 的時(shí)候震鹉,不同分區(qū)的消息會(huì)均勻的分發(fā)到不同的 executor 上消費(fèi)的妖,那么消息的整體順序性就難以保證了,除非將 spout 并行度設(shè)為 1

第三個(gè)問(wèn)題:關(guān)于 FilterBolt 的并行度:其取值也有一個(gè)簡(jiǎn)單辦法足陨,就是在 Storm UI里面,點(diǎn)開 topology 的首頁(yè)娇未,在 Bolts (All time) 下墨缘,查看以下幾個(gè)參數(shù)的值:

  • Capacity (last 10m) 取值越小越好,當(dāng)接近1的時(shí)候零抬,說(shuō)明負(fù)載很嚴(yán)重镊讼,需要增加并行度,正常是在 0.0x 到 0.1 0.2 左右
  • Process latency (ms) 單個(gè) tuple 的平均處理時(shí)間平夜,越小越好蝶棋,正常也是 0.0x 級(jí)別;如果很大忽妒,可以考慮增加并行度玩裙,但主要以 Capacity 為準(zhǔn)
Paste_Image.png

一般情況下,按照該 bolt 的代碼時(shí)間復(fù)雜度段直,設(shè)置一個(gè) spout 并行度的 1-3倍即可吃溅。

第四個(gè)問(wèn)題:AlertBolt 的并行度同 FilterBolt。

第五個(gè)問(wèn)題:shuffleGrouping 會(huì)將 tuple 均勻地隨機(jī)分發(fā)給下游 bolt鸯檬,一般情況下用它就是最好的了决侈。

總之,要找出并行度的最佳取值喧务,主要結(jié)合 Storm UI 來(lái)做決策赖歌。

4、優(yōu)化配置參數(shù)
/** tuple發(fā)送失敗重試策略功茴,一般情況下不需要調(diào)整 */
spoutConfig.retryInitialDelayMs  = 0;
spoutConfig.retryDelayMultiplier = 1.0;
spoutConfig.retryDelayMaxMs = 60 * 1000;

/** 此參數(shù)比較重要庐冯,可適當(dāng)調(diào)大一點(diǎn) */
/** 通常情況下 spout 的發(fā)射速度會(huì)快于下游的 bolt 的消費(fèi)速度,當(dāng)下游的 bolt 還有 TOPOLOGY_MAX_SPOUT_PENDING 個(gè) tuple 沒(méi)有消費(fèi)完時(shí)痊土,spout 會(huì)停下來(lái)等待肄扎,該配置作用于 spout 的每個(gè) task。  */
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10000)

/** 調(diào)整分配給每個(gè) worker 的內(nèi)存赁酝,關(guān)于內(nèi)存的調(diào)節(jié)犯祠,上文已有描述 */
conf.put(Config.WORKER_HEAP_MEMORY_MB,             768);
conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB,  768);

/** 調(diào)整 worker 間通信相關(guān)的緩沖參數(shù),以下是一種推薦的配置 */
conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE,             8); // 1.0 以上已移除
conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,            32);
conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,    16384);

可以在 Storm UI 上查看當(dāng)前集群的 Topology Configuration

5酌呆、rebalance

可以直接采用 rebalance 命令(也可以在 Storm UI上操作)重新配置 topology 的并行度:

storm rebalance TOPOLOGY-NAME -n 5 -e SPOUT/BOLT1-NAME=3 -e SPOUT/BOLT2-NAME=10
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末衡载,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子隙袁,更是在濱河造成了極大的恐慌痰娱,老刑警劉巖弃榨,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異梨睁,居然都是意外死亡鲸睛,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門坡贺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)官辈,“玉大人,你說(shuō)我怎么就攤上這事遍坟∪冢” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵愿伴,是天一觀的道長(zhǎng)肺魁。 經(jīng)常有香客問(wèn)我,道長(zhǎng)隔节,這世上最難降的妖魔是什么鹅经? 我笑而不...
    開封第一講書人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮官帘,結(jié)果婚禮上瞬雹,老公的妹妹穿的比我還像新娘。我一直安慰自己刽虹,他們只是感情好酗捌,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著涌哲,像睡著了一般胖缤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上阀圾,一...
    開封第一講書人閱讀 48,970評(píng)論 1 284
  • 那天哪廓,我揣著相機(jī)與錄音,去河邊找鬼初烘。 笑死涡真,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的肾筐。 我是一名探鬼主播哆料,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼吗铐!你這毒婦竟也來(lái)了东亦?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤唬渗,失蹤者是張志新(化名)和其女友劉穎典阵,沒(méi)想到半個(gè)月后奋渔,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡壮啊,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年嫉鲸,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片歹啼。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡充坑,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出染突,到底是詐尸還是另有隱情,我是刑警寧澤辈灼,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布份企,位于F島的核電站,受9級(jí)特大地震影響巡莹,放射性物質(zhì)發(fā)生泄漏司志。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一降宅、第九天 我趴在偏房一處隱蔽的房頂上張望骂远。 院中可真熱鬧,春花似錦腰根、人聲如沸激才。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)瘸恼。三九已至,卻和暖如春册养,著一層夾襖步出監(jiān)牢的瞬間东帅,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工球拦, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留靠闭,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓坎炼,卻偏偏與公主長(zhǎng)得像愧膀,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子点弯,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 性能優(yōu)化1:kryo序列化 定制序列化 自定義的bolt之間emit數(shù)據(jù)是實(shí)體類的時(shí)候扇调,注冊(cè)kryo Storm ...
    尼小摩閱讀 1,165評(píng)論 0 3
  • Storm架構(gòu) Storm是一個(gè)分布式、可靠的實(shí)時(shí)計(jì)算系統(tǒng)抢肛。與Hadoop不同的是狼钮,它采用流式的消息處理方法碳柱,對(duì)于...
    零度沸騰_yjz閱讀 2,775評(píng)論 0 6
  • 本文借鑒官文莲镣,添加了一些解釋和看法,其中有些理解涎拉,寫的比較粗糙瑞侮,有問(wèn)題的地方希望大家指出。寫這篇文章鼓拧,是想把一些官...
    達(dá)微閱讀 936評(píng)論 0 0
  • 一半火、為什么用Storm storm是一個(gè)分布式開源的實(shí)時(shí)計(jì)算系統(tǒng)〖玖可以用來(lái)做實(shí)時(shí)分析钮糖、在線機(jī)器學(xué)習(xí)、etl等酌住。 計(jì)...
    青芒v5閱讀 771評(píng)論 2 5
  • storm的集群提交方式 StormSubmitter.subnitTopology()方法 問(wèn)題一店归、如何把sto...
    夙夜M閱讀 514評(píng)論 0 0