Apache Storm工作實(shí)例(demo)

Apache Storm工作實(shí)例

我們已經(jīng)經(jīng)歷了Apache Storm的核心技術(shù)細(xì)節(jié)眶根,現(xiàn)在是時(shí)候編寫一些簡單的場景。

場景 - 移動(dòng)呼叫日志分析器

移動(dòng)呼叫及其持續(xù)時(shí)間將作為對Apache Storm的輸入郑象,Storm將處理和分組在相同呼叫者和接收者之間的呼叫及其呼叫總數(shù)。

Spout創(chuàng)建

Spout是用于數(shù)據(jù)生成的組件茬末〕ч唬基本上,一個(gè)spout將實(shí)現(xiàn)一個(gè)IRichSpout接口。 “IRichSpout”接口有以下重要方法 -

  • open -為Spout提供執(zhí)行環(huán)境噪沙。執(zhí)行器將運(yùn)行此方法來初始化噴頭炼彪。
  • nextTuple -通過收集器發(fā)出生成的數(shù)據(jù)。
  • close -當(dāng)spout將要關(guān)閉時(shí)調(diào)用此方法正歼。
  • declareOutputFields -聲明元組的輸出模式辐马。
  • ack -確認(rèn)處理了特定元組。
  • fail -指定不處理和不重新處理特定元組局义。

open

open方法的簽名如下 -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf - 為此spout提供storm配置喜爷。
  • context - 提供有關(guān)拓?fù)渲械膕pout位置,其任務(wù)ID萄唇,輸入和輸出信息的完整信息檩帐。
  • collector - 使我們能夠發(fā)出將由bolts處理的元組。

nextTuple

nextTuple方法的簽名如下 -

nextTuple()

nextTuple()從與ack()和fail()方法相同的循環(huán)中定期調(diào)用另萤。它必須釋放線程的控制湃密,當(dāng)沒有工作要做,以便其他方法有機(jī)會被調(diào)用四敞。因此泛源,nextTuple的第一行檢查處理是否已完成。如果是這樣忿危,它應(yīng)該休眠至少一毫秒达箍,以減少處理器在返回之前的負(fù)載。

close

close方法的簽名如下-

close()

declareOutputFields

declareOutputFields方法的簽名如下-

declareOutputFields(OutputFieldsDeclarer declarer)

declarer -它用于聲明輸出流id铺厨,輸出字段等

此方法用于指定元組的輸出模式缎玫。

ack

ack方法的簽名如下 -

ack(Object msgId)

該方法確認(rèn)已經(jīng)處理了特定元組。

fail

nextTuple方法的簽名如下-

ack(Object msgId)

此方法通知特定元組尚未完全處理解滓。 Storm將重新處理特定的元組赃磨。

FakeCallLogReaderSpout

在我們的場景中,我們需要收集呼叫日志詳細(xì)信息洼裤。呼叫日志的信息包含煞躬。

  • 主叫號碼
  • 接收號碼
  • 持續(xù)時(shí)間

由于我們沒有呼叫日志的實(shí)時(shí)信息,我們將生成假呼叫日志逸邦。假信息將使用Random類創(chuàng)建。完整的程序代碼如下在扰。

編碼 - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
    
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
    
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
    
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
                
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
                
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Bolt創(chuàng)建

Bolt是一個(gè)使用元組作為輸入缕减,處理元組,并產(chǎn)生新的元組作為輸出的組件芒珠。Bolts將實(shí)現(xiàn)IRichBolt接口桥狡。在此程序中,使用兩個(gè)Bolts
CallLogCreatorBoltCallLogCounterBolt來執(zhí)行操作。

IRichBolt接口有以下方法 -

  • prepare -為bolt提供要執(zhí)行的環(huán)境裹芝。執(zhí)行器將運(yùn)行此方法來初始化spout部逮。
  • execute -處理單個(gè)元組的輸入
  • cleanup -當(dāng)spout要關(guān)閉時(shí)調(diào)用。
  • declareOutputFields -聲明元組的輸出模式嫂易。

Prepare

prepare方法的簽名如下 -

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf -為此bolt提供Storm配置兄朋。
  • context -提供有關(guān)拓?fù)渲械腷olt位置,其任務(wù)ID怜械,輸入和輸出信息等的完整信息颅和。
  • collector -使我們能夠發(fā)出處理的元組。

execute

execute方法的簽名如下-

execute(Tuple tuple)

這里的元組是要處理的輸入元組缕允。

execute方法一次處理單個(gè)元組峡扩。元組數(shù)據(jù)可以通過Tuple類的getValue方法訪問。不必立即處理輸入元組障本。多元組可以被處理和輸出為單個(gè)輸出元組教届。處理的元組可以通過使用OutputCollector類發(fā)出。

cleanup

cleanup方法的簽名如下 -

cleanup()

declareOutputFields

declareOutputFields方法的簽名如下-

declareOutputFields(OutputFieldsDeclarer declarer)

這里的參數(shù)declarer用于聲明輸出流id驾霜,輸出字段等案训。

此方法用于指定元組的輸出模式

呼叫日志創(chuàng)建者bolt

呼叫日志創(chuàng)建者bolt接收呼叫日志元組寄悯。呼叫日志元組具有主叫方號碼萤衰,接收方號碼和呼叫持續(xù)時(shí)間。此bolt通過組合主叫方號碼和接收方號碼簡單地創(chuàng)建一個(gè)新值猜旬。新值的格式為“來電號碼 - 接收方號碼”脆栋,并將其命名為新字段“呼叫”。完整的代碼如下洒擦。

編碼 - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
    
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

呼叫日志計(jì)數(shù)器Bolt

呼叫日志創(chuàng)建者bolt接收呼叫日志元組椿争。呼叫日志元組具有主叫方號碼,接收方號碼和呼叫持續(xù)時(shí)間熟嫩。此bolt通過組合主叫方號碼和接收方號碼簡單地創(chuàng)建一個(gè)新值秦踪。新值的格式為“來電號碼 - 接收方號碼”,并將其命名為新字段“呼叫”掸茅。完整的代碼如下椅邓。

編碼 - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
        
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
        
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
    
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
    
}

創(chuàng)建拓?fù)?/h2>

Storm拓?fù)浠旧鲜且粋€(gè)Thrift結(jié)構(gòu)。 TopologyBuilder類提供了簡單而容易的方法來創(chuàng)建復(fù)雜的拓?fù)涿潦āopologyBuilder類具有設(shè)置spout(setSpout)和設(shè)置bolt(setBolt)的方法景馁。最后,TopologyBuilder有createTopology來創(chuàng)建拓?fù)涠好J褂靡韵麓a片段創(chuàng)建拓?fù)?-

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGroupingfieldsGrouping方法有助于為spout和bolts設(shè)置流分組合住。

本地集群

為了開發(fā)目的绰精,我們可以使用“LocalCluster”對象創(chuàng)建本地集群,然后使用“LocalCluster”類的“submitTopology”方法提交拓?fù)洹?“submitTopology”的參數(shù)之一是“Config”類的實(shí)例透葛”渴梗“Config”類用于在提交拓?fù)渲霸O(shè)置配置選項(xiàng)。此配置選項(xiàng)將在運(yùn)行時(shí)與集群配置合并僚害,并使用prepare方法發(fā)送到所有任務(wù)(spout和bolt)硫椰。一旦拓?fù)涮峤坏郊海覀儗⒌却?0秒鐘贡珊,集群計(jì)算提交的拓?fù)渥钆溃缓笫褂谩癓ocalCluster”的“shutdown”方法關(guān)閉集群。完整的程序代碼如下 -

編碼 - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
        
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
            
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
        
      //Stop the topology
        
      cluster.shutdown();
   }
}

構(gòu)建和運(yùn)行應(yīng)用程序

完整的應(yīng)用程序有四個(gè)Java代碼门岔。它們是 -

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

應(yīng)用程序可以使用以下命令構(gòu)建 -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

應(yīng)用程序可以使用以下命令運(yùn)行 -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

輸出

一旦應(yīng)用程序啟動(dòng)爱致,它將輸出有關(guān)集群啟動(dòng)過程,spout和螺栓處理的完整詳細(xì)信息寒随,最后是集群關(guān)閉過程糠悯。在“CallLogCounterBolt”中,我們打印了呼叫及其計(jì)數(shù)詳細(xì)信息妻往。此信息將顯示在控制臺上如下 -

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

非JVM語言

Storm拓?fù)渫ㄟ^Thrift接口實(shí)現(xiàn)互艾,這使得輕松地提交任何語言的拓?fù)洹torm支持Ruby讯泣,Python和許多其他語言纫普。讓我們來看看python綁定。

Python綁定

Python是一種通用的解釋好渠,交互昨稼,面向?qū)ο蠛透呒壘幊陶Z言。Storm支持Python實(shí)現(xiàn)其拓?fù)淙ython支持發(fā)射假栓,錨定,acking和日志操作霍掺。

如你所知匾荆,bolt可以用任何語言定義。用另一種語言編寫的bolt作為子進(jìn)程執(zhí)行杆烁,Storm通過stdin / stdout與JSON消息進(jìn)行通信牙丽。首先拿一個(gè)支持python綁定的樣例bolt WordCount。

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
    
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

這里的類WordCount實(shí)現(xiàn)IRichBolt接口和運(yùn)行與python實(shí)現(xiàn)指定超級方法參數(shù)“splitword.py”⊥没辏現(xiàn)在創(chuàng)建一個(gè)名為“splitword.py”的python實(shí)現(xiàn)剩岳。

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

這是Python的示例實(shí)現(xiàn),它計(jì)算給定句子中的單詞入热。同樣拍棕,您也可以與其他支持語言綁定。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末勺良,一起剝皮案震驚了整個(gè)濱河市绰播,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌尚困,老刑警劉巖蠢箩,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異事甜,居然都是意外死亡谬泌,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進(jìn)店門逻谦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來掌实,“玉大人,你說我怎么就攤上這事邦马〖牵” “怎么了?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵滋将,是天一觀的道長邻悬。 經(jīng)常有香客問我,道長随闽,這世上最難降的妖魔是什么父丰? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮掘宪,結(jié)果婚禮上蛾扇,老公的妹妹穿的比我還像新娘。我一直安慰自己添诉,他們只是感情好屁桑,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著栏赴,像睡著了一般蘑斧。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上须眷,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天竖瘾,我揣著相機(jī)與錄音,去河邊找鬼花颗。 笑死捕传,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的扩劝。 我是一名探鬼主播庸论,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼职辅,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了聂示?” 一聲冷哼從身側(cè)響起域携,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎鱼喉,沒想到半個(gè)月后秀鞭,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡扛禽,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年锋边,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片编曼。...
    茶點(diǎn)故事閱讀 40,144評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡豆巨,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出灵巧,到底是詐尸還是另有隱情搀矫,我是刑警寧澤,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布刻肄,位于F島的核電站瓤球,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏敏弃。R本人自食惡果不足惜卦羡,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望麦到。 院中可真熱鬧绿饵,春花似錦、人聲如沸瓶颠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春荐绝,著一層夾襖步出監(jiān)牢的瞬間社痛,已是汗流浹背胀蛮。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工蹂析, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人借杰。 一個(gè)月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓过吻,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蔗衡。 傳聞我的和親對象是個(gè)殘疾皇子纤虽,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內(nèi)容