storm筆記:Storm+Kafka簡(jiǎn)單應(yīng)用

這幾天工作需要使用storm+kafka档悠,基本場(chǎng)景是應(yīng)用出現(xiàn)錯(cuò)誤廊鸥,發(fā)送日志到kafka的某個(gè)topic,storm訂閱該topic辖所,然后進(jìn)行后續(xù)處理惰说。場(chǎng)景非常簡(jiǎn)單,但是在學(xué)習(xí)過(guò)程中缘回,遇到一個(gè)奇怪的異常情況:使用KafkaSpout讀取topic數(shù)據(jù)時(shí)吆视,沒(méi)有向ZK寫(xiě)offset數(shù)據(jù),致使每次都從頭開(kāi)始讀取酥宴。糾結(jié)了兩天啦吧,終于碰巧找到原因:應(yīng)該使用BaseBasicBolt作為bolt的父類,而不是BaseRichBolt拙寡。

通過(guò)本文記錄一下這種情況授滓,后文中根據(jù)上述場(chǎng)景提供幾個(gè)簡(jiǎn)單的例子。基礎(chǔ)理論查看storm筆記:storm基本概念般堆,或查看Storm 簡(jiǎn)介在孝。

基本訂閱

基本場(chǎng)景:訂閱kafka的某個(gè)topic,然后在讀取的消息前加上自定義的字符串淮摔,然后寫(xiě)回到kafka另外一個(gè)topic私沮。

從Kafka讀取數(shù)據(jù)的Spout使用storm.kafka.KafkaSpout,向Kafka寫(xiě)數(shù)據(jù)的Bolt使用storm.kafka.bolt.KafkaBolt和橙。中間進(jìn)行進(jìn)行數(shù)據(jù)處理的Bolt定義為TopicMsgBolt仔燕。閑言少敘,奉上代碼:

public class TopicMsgTopology {
    public static void main(String[] args) throws Exception {
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381");
        // 配置Kafka訂閱的Topic胃碾,以及zookeeper中數(shù)據(jù)節(jié)點(diǎn)目錄和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "msgTopic1", "/topology/root", "topicMsgTopology");
        // 配置KafkaBolt中的kafka.broker.properties
        Config conf = new Config();
        Properties props = new Properties();
        // 配置Kafka broker地址
        props.put("metadata.broker.list", "dev2_55.wfj-search:9092");
        // serializer.class為消息的序列化類
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", props);
        // 配置KafkaBolt生成的topic
        conf.put("topic", "msgTopic2");
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("msgKafkaSpout", new KafkaSpout(spoutConfig));
        builder.setBolt("msgSentenceBolt", new TopicMsgBolt()).shuffleGrouping("msgKafkaSpout");
        builder.setBolt("msgKafkaBolt", new KafkaBolt<String, Integer>()).shuffleGrouping("msgSentenceBolt");
        if (args.length == 0) {
            String topologyName = "kafkaTopicTopology";
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, conf, builder.createTopology());
            Utils.sleep(100000);
            cluster.killTopology(topologyName);
            cluster.shutdown();
        } else {
            conf.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
    }
}

storm.kafka.ZkHosts構(gòu)造方法的參數(shù)是zookeeper標(biāo)準(zhǔn)配置地址的形式(ZooKeeper環(huán)境搭建可以查看ZooKeeper安裝部署)涨享,zk1筋搏、zk2仆百、zk3在本地配置了host,因?yàn)榉?wù)器使用的偽分布式模式奔脐,因此幾個(gè)端口號(hào)不是默認(rèn)的2181俄周。

storm.kafka.SpoutConfig構(gòu)造方法第一個(gè)參數(shù)為上述的storm.kafka.ZkHosts對(duì)象,第二個(gè)為待訂閱的topic名稱髓迎,第三個(gè)參數(shù)zkRoot為寫(xiě)讀取topic時(shí)的偏移量offset數(shù)據(jù)的節(jié)點(diǎn)(zk node)峦朗,第四個(gè)參數(shù)為該節(jié)點(diǎn)上的次級(jí)節(jié)點(diǎn)名(有個(gè)地方說(shuō)這個(gè)是spout的id)。

backtype.storm.Config對(duì)象是配置storm的topology(拓?fù)洌┧枰幕A(chǔ)配置排龄。

backtype.storm.spout.SchemeAsMultiScheme的構(gòu)造方法輸入的參數(shù)是訂閱kafka數(shù)據(jù)的處理參數(shù)波势,這里的MessageScheme是自定義的,代碼如下:

public class MessageScheme implements Scheme {
    private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class);

    @Override
    public List<Object> deserialize(byte[] ser) {
        try {
            String msg = new String(ser, "UTF-8");
            logger.info("get one message is {}", msg);
            return new Values(msg);
        } catch (UnsupportedEncodingException ignored) {
            return null;
        }
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("msg");
    }
}

MessageScheme類中g(shù)etOutputFields方法是KafkaSpout向后發(fā)送tuple(storm傳輸數(shù)據(jù)的最小結(jié)構(gòu))的名字橄维,需要與接收數(shù)據(jù)的Bolt中統(tǒng)一(在這個(gè)例子中可以不統(tǒng)一尺铣,因?yàn)楹竺嬷苯尤〉?條數(shù)據(jù),但是在wordCount的那個(gè)例子中就需要統(tǒng)一了)争舞。

TopicMsgBolt類是從storm.kafka.KafkaSpout接收數(shù)據(jù)的Bolt凛忿,對(duì)接收到的數(shù)據(jù)進(jìn)行處理,然后向后傳輸給storm.kafka.bolt.KafkaBolt竞川。代碼如下:

public class TopicMsgBolt extends BaseBasicBolt {
    private static final Logger logger = LoggerFactory.getLogger(TopicMsgBolt.class);

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = (String) input.getValue(0);
        String out = "Message got is '" + word + "'!";
        logger.info("out={}", out);
        collector.emit(new Values(out));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }
}

此處需要特別注意的是店溢,要使用backtype.storm.topology.base.BaseBasicBolt對(duì)象作為父類,否則不會(huì)在zk記錄偏移量offset數(shù)據(jù)委乌。

需要編寫(xiě)的代碼已完成床牧,接下來(lái)就是在搭建好的storm、kafka中進(jìn)行測(cè)試:

# 創(chuàng)建topic
./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic1
./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic2

接下來(lái)需要分別對(duì)msgTopic1遭贸、msgTopic2啟動(dòng)producer(生產(chǎn)者)與consumer(消費(fèi)者):

# 對(duì)msgTopic1啟動(dòng)producer戈咳,用于發(fā)送數(shù)據(jù)
./bin/kafka-console-producer.sh --broker-list dev2_55.wfj-search:9092 --topic msgTopic1
# 對(duì)msgTopic2啟動(dòng)consumer,用于查看發(fā)送數(shù)據(jù)的處理結(jié)果
./bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2281,zk3:2381 --topic msgTopic2 --from-beginning

然后將打好的jar包上傳到storm的nimbus(可以使用遠(yuǎn)程上傳或先上傳jar包到nimbus節(jié)點(diǎn)所在服務(wù)器,然后本地執(zhí)行):

# ./bin/storm jar topology TopicMsgTopology.jar cn.howardliu.demo.storm.kafka.topicMsg.TopicMsgTopology TopicMsgTopology

待對(duì)應(yīng)的worker啟動(dòng)好之后除秀,就可以在msgTopic1的producer對(duì)應(yīng)終端輸入數(shù)據(jù)糯累,然后在msgTopic2的consumer對(duì)應(yīng)終端查看輸出結(jié)果了。

有幾點(diǎn)需要注意的:

  1. 必須先創(chuàng)建msgTopic1册踩、msgTopic2兩個(gè)topic泳姐;
  2. 定義的bolt必須使用BaseBasicBolt作為父類,不能夠使用BaseRichBolt暂吉,否則無(wú)法記錄偏移量胖秒;
  3. zookeeper最好使用至少三個(gè)節(jié)點(diǎn)的分布式模式或偽分布式模式,否則會(huì)出現(xiàn)一些異常情況慕的;
  4. 在整個(gè)storm下阎肝,spout、bolt的id必須唯一肮街,否則會(huì)出現(xiàn)異常风题。
  5. TopicMsgBolt類作為storm.kafka.bolt.KafkaBolt前的最后一個(gè)Bolt,需要將輸出數(shù)據(jù)名稱定義為message嫉父,否則KafkaBolt無(wú)法接收數(shù)據(jù)沛硅。

wordCount

簡(jiǎn)單的輸入輸出做完了,來(lái)點(diǎn)復(fù)雜點(diǎn)兒的場(chǎng)景:從某個(gè)topic定于消息绕辖,然后根據(jù)空格分詞摇肌,統(tǒng)計(jì)單詞數(shù)量,然后將當(dāng)前輸入的單詞數(shù)量推送到另一個(gè)topic仪际。

首先規(guī)劃需要用到的類:

  1. 從KafkaSpout接收數(shù)據(jù)并進(jìn)行處理的backtype.storm.spout.Scheme子類围小;
  2. 數(shù)據(jù)切分bolt:SplitSentenceBolt
  3. 計(jì)數(shù)bolt:WordCountBolt树碱;
  4. 報(bào)表bolt:ReportBolt肯适;
  5. topology定義:WordCountTopology
  6. 最后再加一個(gè)原樣顯示訂閱數(shù)據(jù)的bolt:SentenceBolt赴恨。

backtype.storm.spout.Scheme子類可以使用上面已經(jīng)定義過(guò)的MessageScheme疹娶,此處不再贅述。

SplitSentenceBolt是對(duì)輸入數(shù)據(jù)進(jìn)行分割伦连,簡(jiǎn)單的使用String類的split方法雨饺,然后將每個(gè)單詞命名為“word”,向后傳輸惑淳,代碼如下:

public class SplitSentenceBolt extends BaseBasicBolt {
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getStringByField("msg");
        String[] words = sentence.split(" ");
        Arrays.asList(words).forEach(word -> collector.emit(new Values(word)));
    }
}

SentenceBolt是從KafkaSpout接收數(shù)據(jù)额港,然后直接輸出。在拓?fù)鋱D上就是從輸入分叉歧焦,一個(gè)進(jìn)入SplitSentenceBolt移斩,一個(gè)進(jìn)入SentenceBolt肚医。這種結(jié)構(gòu)可以應(yīng)用在Lambda架構(gòu)中,代碼如下:

public class SentenceBolt extends BaseBasicBolt {
    private static final Logger logger = LoggerFactory.getLogger(SentenceBolt.class);

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String msg = tuple.getStringByField("msg");
        logger.info("get one message is {}", msg);
        basicOutputCollector.emit(new Values(msg));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }
}

WordCountBolt是對(duì)接收到的單詞進(jìn)行匯總統(tǒng)一向瓷,然后將單詞“word”及其對(duì)應(yīng)數(shù)量“count”向后傳輸肠套,代碼如下:

public class WordCountBolt extends BaseBasicBolt {
    private Map<String, Long> counts = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counts = new ConcurrentHashMap<>();
        super.prepare(stormConf, context);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word", "count"));
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = input.getStringByField("word");
        Long count = this.counts.get(word);
        if (count == null) {
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        collector.emit(new Values(word, count));
    }
}

ReportBolt是對(duì)接收到的單詞及數(shù)量進(jìn)行整理,拼成json格式猖任,然后繼續(xù)向后傳輸你稚,代碼如下:

public class ReportBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = input.getStringByField("word");
        Long count = input.getLongByField("count");
        String reportMessage = "{'word': '" + word + "', 'count': '" + count + "'}";
        collector.emit(new Values(reportMessage));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("message"));
    }
}

最后是定義topology(拓?fù)洌?code>WordCountTopology,代碼如下:

public class WordCountTopology {
    private static final String KAFKA_SPOUT_ID = "kafkaSpout";
    private static final String SENTENCE_BOLT_ID = "sentenceBolt";
    private static final String SPLIT_BOLT_ID = "sentenceSplitBolt";
    private static final String WORD_COUNT_BOLT_ID = "sentenceWordCountBolt";
    private static final String REPORT_BOLT_ID = "reportBolt";
    private static final String KAFKA_BOLT_ID = "kafkabolt";
    private static final String CONSUME_TOPIC = "sentenceTopic";
    private static final String PRODUCT_TOPIC = "wordCountTopic";
    private static final String ZK_ROOT = "/topology/root";
    private static final String ZK_ID = "wordCount";
    private static final String DEFAULT_TOPOLOGY_NAME = "sentenceWordCountKafka";

    public static void main(String[] args) throws Exception {
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381");
        // 配置Kafka訂閱的Topic朱躺,以及zookeeper中數(shù)據(jù)節(jié)點(diǎn)目錄和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, CONSUME_TOPIC, ZK_ROOT, ZK_ID);
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(KAFKA_SPOUT_ID, new KafkaSpout(spoutConfig));
        builder.setBolt(SENTENCE_BOLT_ID, new SentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
        builder.setBolt(SPLIT_BOLT_ID, new SplitSentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
        builder.setBolt(WORD_COUNT_BOLT_ID, new WordCountBolt()).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
        builder.setBolt(REPORT_BOLT_ID, new ReportBolt()).shuffleGrouping(WORD_COUNT_BOLT_ID);
        builder.setBolt(KAFKA_BOLT_ID, new KafkaBolt<String, Long>()).shuffleGrouping(REPORT_BOLT_ID);

        Config config = new Config();
        Map<String, String> map = new HashMap<>();
        map.put("metadata.broker.list", "dev2_55.wfj-search:9092");// 配置Kafka broker地址
        map.put("serializer.class", "kafka.serializer.StringEncoder");// serializer.class為消息的序列化類
        config.put("kafka.broker.properties", map);// 配置KafkaBolt中的kafka.broker.properties
        config.put("topic", PRODUCT_TOPIC);// 配置KafkaBolt生成的topic

        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology());
            Utils.sleep(100000);
            cluster.killTopology(DEFAULT_TOPOLOGY_NAME);
            cluster.shutdown();
        } else {
            config.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        }
    }
}

除了上面提過(guò)應(yīng)該注意的地方刁赖,此處還需要注意,storm.kafka.SpoutConfig定義的zkRoot與id應(yīng)該與第一個(gè)例子中不同(至少保證id不同长搀,否則兩個(gè)topology將使用一個(gè)節(jié)點(diǎn)記錄偏移量)宇弛。


個(gè)人主頁(yè): http://www.howardliu.cn

個(gè)人博文: storm筆記:Storm+Kafka簡(jiǎn)單應(yīng)用

CSDN主頁(yè): http://blog.csdn.net/liuxinghao

CSDN博文: storm筆記:Storm+Kafka簡(jiǎn)單應(yīng)用

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市源请,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌巢钓,老刑警劉巖病苗,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件症汹,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡贷腕,警方通過(guò)查閱死者的電腦和手機(jī)背镇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)泽裳,“玉大人瞒斩,你說(shuō)我怎么就攤上這事′套埽” “怎么了胸囱?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)瀑梗。 經(jīng)常有香客問(wèn)我烹笔,道長(zhǎng),這世上最難降的妖魔是什么抛丽? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任谤职,我火速辦了婚禮,結(jié)果婚禮上亿鲜,老公的妹妹穿的比我還像新娘允蜈。我一直安慰自己,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開(kāi)白布饶套。 她就那樣靜靜地躺著漩蟆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪妓蛮。 梳的紋絲不亂的頭發(fā)上爆安,一...
    開(kāi)封第一講書(shū)人閱讀 51,763評(píng)論 1 307
  • 那天,我揣著相機(jī)與錄音仔引,去河邊找鬼扔仓。 笑死,一個(gè)胖子當(dāng)著我的面吹牛咖耘,可吹牛的內(nèi)容都是我干的翘簇。 我是一名探鬼主播,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼儿倒,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼版保!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起夫否,我...
    開(kāi)封第一講書(shū)人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤彻犁,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后凰慈,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡森篷,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年仲智,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了姻氨。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡前联,死狀恐怖蛀恩,靈堂內(nèi)的尸體忽然破棺而出茂浮,到底是詐尸還是另有隱情壳咕,我是刑警寧澤谓厘,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布竟稳,位于F島的核電站熊痴,受9級(jí)特大地震影響果善,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜巾陕,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一晾匠、第九天 我趴在偏房一處隱蔽的房頂上張望梯刚。 院中可真熱鬧,春花似錦句喜、人聲如沸沟于。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)冻记。三九已至,卻和暖如春演顾,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背葛虐。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工屿脐, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留宪卿,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓西疤,卻偏偏與公主長(zhǎng)得像次绘,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子管跺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容