Storm介紹之安裝部署及API

安裝:
1.下載并解壓縮Zookeeper

           官網(wǎng)地址:http://hadoop.apache.org/zookeeper/releases.html
           解壓縮步驟此處省略....

2.修改zookeeper的配置文件

          把zookeeper對(duì)應(yīng)的conf目錄下的zoo-sample.cfg重命名為zoo.cfg,配置工作目錄和端口號(hào)
         # The number of milliseconds of each 
              ticktickTime=2000
         # The number of ticks that the initial
         # synchronization phase can take
            initLimit=10
         # The number of ticks that can pass between# sending a request and getting an acknowledgement
          syncLimit=5
         # the directory where the snapshot is stored.
         # do not use /tmp for storage, /tmp here is just
         # example sakes.
            dataDir=/usr/local/dev/zookeeper-3.4.9/data
         # the port at which the clients will connect
            clientPort=2181
            server.id_num1=hostname1:2888:3888
            server.id_num2=hostname2:2888:3888
     注意:集群模式下分別在datadir目錄下創(chuàng)建文件myid壁榕,其中的內(nèi)容為id_num,例如:
            echo id_num1 > myid

3.下載并壓縮storm

          官網(wǎng)地址:http://storm.apache.org/downloads.html
          解壓縮步驟此處省略.....

4.修改storm配置文件

          修改conf/storm.yaml,conf/storm.yaml中的配置選項(xiàng)將覆蓋conf/defaults.yaml
        1):storm.zookeeper.servers:storm集群中使用的zookeeper集群的地址
            storm.zookeeper.servers:
                -"host_ip"    (此處填寫(xiě)zookeeper集群的主機(jī)名或Ip,多個(gè)用逗號(hào)分隔)
            nimbus.host:"host_ip"  (此處填下nimbus進(jìn)程的主機(jī),即主節(jié)點(diǎn)ip)
            storm.local.dir:"/dest/to/path"  (此處填寫(xiě)storm存儲(chǔ)目錄)
            supervisor.slots.ports:
                -  "host_ip:port"    (此處填寫(xiě)從節(jié)點(diǎn)的端口號(hào),可隨意矛紫,當(dāng)采用單機(jī)模式的時(shí)候,需要寫(xiě)不同的端口號(hào),具體的個(gè)數(shù)根據(jù)從節(jié)點(diǎn)的個(gè)數(shù)來(lái)定)

5.啟動(dòng)zookeeper

注意:集群中的每臺(tái)機(jī)器都要啟動(dòng),且啟動(dòng)命令一致
  進(jìn)入zookeeper的安裝目錄
  #bin/zkServer.sh start

6.啟動(dòng)storm nimbus

進(jìn)入storm安裝目錄
 #bin/storm nimbus

7.啟動(dòng)storm supervisor

  注意:如果是單機(jī)模式,即啟動(dòng)一次即可牌里,如果是集群模式颊咬,需要每臺(tái)都要啟動(dòng),命令一致    
    進(jìn)入storm安裝目錄
     #bin/storm supervisor

8.啟動(dòng)storm ui

進(jìn)入storm安裝目錄
#bin/storm ui

然后訪問(wèn)localhost:8080(或者主節(jié)點(diǎn)主機(jī)名:8080)就會(huì)看到storm的基本信息,到此,storm的安裝部署已經(jīng)成功

接下來(lái)進(jìn)入storm的API牡辽,首先先要了解storm中的頂層接口IComponent
storm中Spout和Bolt都是其Component(部件的意思),所以storm定義了一個(gè)名叫IComponent的接口喳篇,全家普如下:

Paste_Image.png

注意:

綠色部分是我們常用的類(lèi),紅色部分是與事務(wù)有關(guān)的

BaseComponent是Storm提供的"偷懶"的類(lèi),它及其子類(lèi)态辛,或多或少實(shí)現(xiàn)了接口的部分方法麸澜,這樣我們?cè)谑褂玫臅r(shí)候,不用自己每次都寫(xiě)所有的方法,值得一提的是:像BaseXXX的類(lèi),它所實(shí)現(xiàn)的方法奏黑,都是空的炊邦,直接返回null,如果繼承這樣的類(lèi),需要自己重寫(xiě)方法熟史。下面介紹Spout和Bolt組件相關(guān)的Api

Spout
首先看一下總體圖:

Paste_Image.png

從圖中很明顯的看出Spout最頂層抽象的是ISpout接口,簡(jiǎn)單介紹一樣接口中的方法:

Paste_Image.png

open():初始化動(dòng)作,可以在該Spout初始化的時(shí)候做一些動(dòng)作,傳遞上下文等

close():該Spout關(guān)閉之前執(zhí)行,但不能得到保證一定可以執(zhí)行.Spout是作為T(mén)ask運(yùn)行在Worker中的,在Cluster模式下馁害,supervisor會(huì)直接kill -9 worker的進(jìn)程,這樣它就無(wú)法執(zhí)行了.而在本地模式下,如果是發(fā)送停止命令,是可以保證close方法的執(zhí)行的.

activate()和deactivate():一個(gè)Spout可以被暫時(shí)激活和關(guān)閉以故,這兩個(gè)方法可以在對(duì)應(yīng)的時(shí)刻調(diào)用執(zhí)行

nextTuple():用來(lái)發(fā)射數(shù)據(jù),Spout中最核心的部分,一些具體的需求可以在該方法中實(shí)現(xiàn)

ack():一個(gè)Tuple會(huì)有唯一一個(gè)id,當(dāng)該Tuple被成功處理蜗细,會(huì)執(zhí)行該方法

fail():與ack()方法同理,當(dāng)Tuple處理失敗會(huì)調(diào)用該方法

總結(jié):

通常情況下(shell和事務(wù)除外),實(shí)現(xiàn)一個(gè)Spout怒详,可以直接實(shí)現(xiàn)IRichSpout,如果不想寫(xiě)多余的代碼,可以繼承BaseRichSpout

Bolt
同樣炉媒,首先看下總體圖:

Paste_Image.png

可以看出為什么IBasicBolt沒(méi)有繼承IBolt?
我們先看下IBolt的方法:

Paste_Image.png

我們需要知道的是IBolt繼承了java.io.Serializable,我們?cè)趎imbus上提交了Topology后,創(chuàng)建出來(lái)的Bolt會(huì)序列化發(fā)送到具體執(zhí)行的Worker上,Worker在執(zhí)行該Bolt時(shí)昆烁,會(huì)首先調(diào)用prepare方法傳入當(dāng)前執(zhí)行的上下文

execute(Tuple):接收一個(gè)Tuple進(jìn)行處理,并用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail方法(表示失敗)來(lái)反饋結(jié)果

cleanup():同Ispout的close方法吊骤,不能保證其一定被執(zhí)行

好了,現(xiàn)在可以回答為什么IBasicBolt沒(méi)有繼承IBolt這個(gè)問(wèn)題了静尼,Storm提供了IBasicBolt接口白粉,其目的就是實(shí)現(xiàn)該接口的Bolt不用在代碼中反饋結(jié)果了,storm內(nèi)部會(huì)自動(dòng)反饋結(jié)果

總結(jié):

通常情況下實(shí)現(xiàn)一個(gè)Bolt,可以實(shí)現(xiàn)IRichBolt接口或繼承BaseRichBolt,如果不想自己處理反饋結(jié)果,可以實(shí)現(xiàn)IBasicBolt接口或繼承BaseBasicBolt,它實(shí)際上是自己做掉了prepare方法和collector.emit.ack(inputTuple)方法.

OK,介紹完了簡(jiǎn)單的方法,下面寫(xiě)一個(gè)簡(jiǎn)單的Demo,加深一下對(duì)Spout和Bolt的理解

簡(jiǎn)單需求:對(duì)名稱(chēng)加后綴并轉(zhuǎn)換成大寫(xiě)

RandomWordSpout.java

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class RandomWordSpout extends BaseRichSpout{

          private SpoutOutputCollector  collector;
          
          //模擬一些數(shù)據(jù)
          String[] str = {"hello","word","you","how","are"};
          

        //初始化方法鼠渺,在spout組件實(shí)例化時(shí)調(diào)用一次
          @Override
          public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                  this.collector = collector;
          }

          @Override
          public void nextTuple{
                 //隨機(jī)挑選出一個(gè)名稱(chēng)
                  Random random = new Random();
                  int index = random.nextInt(str.length);
                
                  //獲取名稱(chēng)
                  String name = str[index];

                  //將名稱(chēng)進(jìn)行封裝成tuple鸭巴,發(fā)送消息給下一個(gè)組件
                  collector.emit(new Vaules(name));
            }

        //聲明本spout組件發(fā)送出去的tuple中的數(shù)據(jù)的字段名
           @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
                  declarer.declare(new Fields("orignname"));
              }
}

UpperBolt.java

      import backtype.storm.topology.BasicOutputCollector;
      import backtype.storm.topology.OutputFieldsDeclarer;
      import backtype.storm.topology.base.BaseBasicBolt;
      import backtype.storm.tuple.Fields;
      import backtype.storm.tuple.Tuple;
      import backtype.storm.tuple.Values;
    
      public class UpperBolt extends BaseBasicBolt{

              //業(yè)務(wù)處理邏輯
               @Override
              public void execute(Tuple tuple, BasicOutputCollector collector) {
                      //先獲取到上一個(gè)組件傳遞過(guò)來(lái)的數(shù)據(jù),數(shù)據(jù)在tuple里面
                      String godName = tuple.getString(0);
    
                       //將名稱(chēng)轉(zhuǎn)換成大寫(xiě)
                      String godName_upper = godName.toUpperCase();
    
                      //將轉(zhuǎn)換完成的商品名發(fā)送出去
                      collector.emit(new Values(godName_upper));
            }



             //聲明該bolt組件要發(fā)出去的tuple的字段
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                    declarer.declare(new Fields("uppername"));
              }
    }

SuffixBolt.java

    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Tuple;

    public class SuffixBolt extends BaseBasicBolt{
          FileWriter fileWriter = null;
        
          //在bolt組件運(yùn)行過(guò)程中只會(huì)被調(diào)用一次
            @Override
          public void prepare(Map stormConf, TopologyContext context) {  
                  try {
                      fileWriter = new   FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
                      } catch (IOException e) {
                              throw new RuntimeException(e);
                      }
}

        //該bolt組件的核心處理邏輯
        //每收到一個(gè)tuple消息,就會(huì)被調(diào)用一次
            @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            //先拿到上一個(gè)組件發(fā)送過(guò)來(lái)的名稱(chēng)
              String upper_name = tuple.getString(0);
              String suffix_name = upper_name + "_itisok";

          //為上一個(gè)組件發(fā)送過(guò)來(lái)的商品名稱(chēng)添加后綴
              try {
                  fileWriter.write(suffix_name);
                  fileWriter.write("\n");
                  fileWriter.flush();
              } catch (IOException e) {
                  throw new RuntimeException(e);
              }
        }
}

TopoMain.java

    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;

  /**
   * 組織各個(gè)處理組件形成一個(gè)完整的處理流程拦盹,就是所謂的topology(類(lèi)似于mapreduce程序中的job)
 * 并且將該topology提交給storm集群去運(yùn)行鹃祖,topology提交到集群后就將永無(wú)休止地運(yùn)行,除非人為或者異常退出
   */
    public class TopoMain {
        public static void main(String[] args) throws Exception {   
              TopologyBuilder builder = new TopologyBuilder();

            //將我們的spout組件設(shè)置到topology中去 
            //parallelism_hint :4  表示用4個(gè)excutor來(lái)執(zhí)行這個(gè)組件
            //setNumTasks(8) 設(shè)置的是該組件執(zhí)行時(shí)的并發(fā)task數(shù)量普舆,也就意味著1個(gè)excutor會(huì)運(yùn)行2個(gè)task

              builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
    
          //將大寫(xiě)轉(zhuǎn)換bolt組件設(shè)置到topology恬口,并且指定它接收randomspout組件的消息
          //.shuffleGrouping("randomspout")包含兩層含義:
          //1校读、upperbolt組件接收的tuple消息一定來(lái)自于randomspout組件
          //2、randomspout組件和upperbolt組件的大量并發(fā)task實(shí)例之間收發(fā)消息時(shí)采用的分組策略是隨機(jī)分組shuffleGrouping

              builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
    
          //將添加后綴的bolt組件設(shè)置到topology祖能,并且指定它接收upperbolt組件的消息

              builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
    
          //用builder來(lái)創(chuàng)建一個(gè)topology
               StormTopology demotop = builder.createTopology();

          //配置一些topology在集群中運(yùn)行時(shí)的參數(shù)
               Config conf = new Config();
         //這里設(shè)置的是整個(gè)demotop所占用的槽位數(shù)歉秫,也就是worker的數(shù)量
               conf.setNumWorkers(4);
              conf.setDebug(true);
              conf.setNumAckers(0);
    
        //將這個(gè)topology提交給storm集群運(yùn)行
             StormSubmitter.submitTopology("demotopo", conf, demotop);  
}

}

最后將工程打包,在集群上運(yùn)行
#storm jar jar_name.jar class_name args0 ....

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末养铸,一起剝皮案震驚了整個(gè)濱河市雁芙,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌揭厚,老刑警劉巖却特,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異筛圆,居然都是意外死亡裂明,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)太援,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)闽晦,“玉大人,你說(shuō)我怎么就攤上這事提岔∠沈龋” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵碱蒙,是天一觀的道長(zhǎng)荠瘪。 經(jīng)常有香客問(wèn)我,道長(zhǎng)赛惩,這世上最難降的妖魔是什么哀墓? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮喷兼,結(jié)果婚禮上篮绰,老公的妹妹穿的比我還像新娘。我一直安慰自己季惯,他們只是感情好吠各,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著勉抓,像睡著了一般贾漏。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上藕筋,一...
    開(kāi)封第一講書(shū)人閱讀 51,624評(píng)論 1 305
  • 那天纵散,我揣著相機(jī)與錄音,去河邊找鬼。 笑死困食,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的翎承。 我是一名探鬼主播硕盹,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼叨咖!你這毒婦竟也來(lái)了瘩例?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤甸各,失蹤者是張志新(化名)和其女友劉穎垛贤,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體趣倾,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡聘惦,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了儒恋。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片善绎。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖诫尽,靈堂內(nèi)的尸體忽然破棺而出禀酱,到底是詐尸還是另有隱情,我是刑警寧澤牧嫉,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布剂跟,位于F島的核電站,受9級(jí)特大地震影響酣藻,放射性物質(zhì)發(fā)生泄漏曹洽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一臊恋、第九天 我趴在偏房一處隱蔽的房頂上張望衣洁。 院中可真熱鬧,春花似錦抖仅、人聲如沸坊夫。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)环凿。三九已至,卻和暖如春放吩,著一層夾襖步出監(jiān)牢的瞬間智听,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人雏逾。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓月趟,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親颜骤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • Date: Nov 17-24, 2017 1. 目的 積累Storm為主的流式大數(shù)據(jù)處理平臺(tái)對(duì)實(shí)時(shí)數(shù)據(jù)處理的相關(guān)...
    一只很努力爬樹(shù)的貓閱讀 2,174評(píng)論 0 4
  • 目錄 場(chǎng)景假設(shè) 調(diào)優(yōu)步驟和方法 Storm 的部分特性 Storm 并行度 Storm 消息機(jī)制 Storm UI...
    mtide閱讀 17,112評(píng)論 30 60
  • 一捣卤、Storm是什么 Storm是一個(gè)免費(fèi)并開(kāi)源的分布式實(shí)時(shí)計(jì)算系統(tǒng)忍抽。利用Storm可以很容易做到可靠地處理無(wú)限的...
    Graceleeman閱讀 3,024評(píng)論 0 6
  • 原文鏈接Storm Tutorial 本人原創(chuàng)翻譯,轉(zhuǎn)載請(qǐng)注明出處 這個(gè)教程內(nèi)容包含如何創(chuàng)建topologies及...
    quiterr閱讀 1,626評(píng)論 0 6
  • 推酷誠(chéng)意滿(mǎn)滿(mǎn)的設(shè)計(jì)周刊《設(shè)計(jì)匠藝》董朝, 下面是內(nèi)容列表鸠项,干貨多多,也可以移步到官網(wǎng)進(jìn)一步閱讀子姜。 產(chǎn)品之道 譯文|如何...
    推酷閱讀 217評(píng)論 0 1