這幾天工作需要使用storm+kafka档悠,基本場(chǎng)景是應(yīng)用出現(xiàn)錯(cuò)誤廊鸥,發(fā)送日志到kafka的某個(gè)topic,storm訂閱該topic辖所,然后進(jìn)行后續(xù)處理惰说。場(chǎng)景非常簡(jiǎn)單,但是在學(xué)習(xí)過(guò)程中缘回,遇到一個(gè)奇怪的異常情況:使用KafkaSpout讀取topic數(shù)據(jù)時(shí)吆视,沒(méi)有向ZK寫(xiě)offset數(shù)據(jù),致使每次都從頭開(kāi)始讀取酥宴。糾結(jié)了兩天啦吧,終于碰巧找到原因:應(yīng)該使用BaseBasicBolt
作為bolt的父類,而不是BaseRichBolt
拙寡。
通過(guò)本文記錄一下這種情況授滓,后文中根據(jù)上述場(chǎng)景提供幾個(gè)簡(jiǎn)單的例子。基礎(chǔ)理論查看storm筆記:storm基本概念般堆,或查看Storm 簡(jiǎn)介在孝。
基本訂閱
基本場(chǎng)景:訂閱kafka的某個(gè)topic,然后在讀取的消息前加上自定義的字符串淮摔,然后寫(xiě)回到kafka另外一個(gè)topic私沮。
從Kafka讀取數(shù)據(jù)的Spout使用storm.kafka.KafkaSpout
,向Kafka寫(xiě)數(shù)據(jù)的Bolt使用storm.kafka.bolt.KafkaBolt
和橙。中間進(jìn)行進(jìn)行數(shù)據(jù)處理的Bolt定義為TopicMsgBolt
仔燕。閑言少敘,奉上代碼:
public class TopicMsgTopology {
public static void main(String[] args) throws Exception {
// 配置Zookeeper地址
BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381");
// 配置Kafka訂閱的Topic胃碾,以及zookeeper中數(shù)據(jù)節(jié)點(diǎn)目錄和名字
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "msgTopic1", "/topology/root", "topicMsgTopology");
// 配置KafkaBolt中的kafka.broker.properties
Config conf = new Config();
Properties props = new Properties();
// 配置Kafka broker地址
props.put("metadata.broker.list", "dev2_55.wfj-search:9092");
// serializer.class為消息的序列化類
props.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("kafka.broker.properties", props);
// 配置KafkaBolt生成的topic
conf.put("topic", "msgTopic2");
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("msgKafkaSpout", new KafkaSpout(spoutConfig));
builder.setBolt("msgSentenceBolt", new TopicMsgBolt()).shuffleGrouping("msgKafkaSpout");
builder.setBolt("msgKafkaBolt", new KafkaBolt<String, Integer>()).shuffleGrouping("msgSentenceBolt");
if (args.length == 0) {
String topologyName = "kafkaTopicTopology";
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, conf, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology(topologyName);
cluster.shutdown();
} else {
conf.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
}
}
storm.kafka.ZkHosts
構(gòu)造方法的參數(shù)是zookeeper標(biāo)準(zhǔn)配置地址的形式(ZooKeeper環(huán)境搭建可以查看ZooKeeper安裝部署)涨享,zk1筋搏、zk2仆百、zk3在本地配置了host,因?yàn)榉?wù)器使用的偽分布式模式奔脐,因此幾個(gè)端口號(hào)不是默認(rèn)的2181俄周。
storm.kafka.SpoutConfig
構(gòu)造方法第一個(gè)參數(shù)為上述的storm.kafka.ZkHosts
對(duì)象,第二個(gè)為待訂閱的topic名稱髓迎,第三個(gè)參數(shù)zkRoot為寫(xiě)讀取topic時(shí)的偏移量offset數(shù)據(jù)的節(jié)點(diǎn)(zk node)峦朗,第四個(gè)參數(shù)為該節(jié)點(diǎn)上的次級(jí)節(jié)點(diǎn)名(有個(gè)地方說(shuō)這個(gè)是spout的id)。
backtype.storm.Config
對(duì)象是配置storm的topology(拓?fù)洌┧枰幕A(chǔ)配置排龄。
backtype.storm.spout.SchemeAsMultiScheme
的構(gòu)造方法輸入的參數(shù)是訂閱kafka數(shù)據(jù)的處理參數(shù)波势,這里的MessageScheme
是自定義的,代碼如下:
public class MessageScheme implements Scheme {
private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class);
@Override
public List<Object> deserialize(byte[] ser) {
try {
String msg = new String(ser, "UTF-8");
logger.info("get one message is {}", msg);
return new Values(msg);
} catch (UnsupportedEncodingException ignored) {
return null;
}
}
@Override
public Fields getOutputFields() {
return new Fields("msg");
}
}
MessageScheme
類中g(shù)etOutputFields方法是KafkaSpout向后發(fā)送tuple(storm傳輸數(shù)據(jù)的最小結(jié)構(gòu))的名字橄维,需要與接收數(shù)據(jù)的Bolt中統(tǒng)一(在這個(gè)例子中可以不統(tǒng)一尺铣,因?yàn)楹竺嬷苯尤〉?條數(shù)據(jù),但是在wordCount的那個(gè)例子中就需要統(tǒng)一了)争舞。
TopicMsgBolt
類是從storm.kafka.KafkaSpout
接收數(shù)據(jù)的Bolt凛忿,對(duì)接收到的數(shù)據(jù)進(jìn)行處理,然后向后傳輸給storm.kafka.bolt.KafkaBolt
竞川。代碼如下:
public class TopicMsgBolt extends BaseBasicBolt {
private static final Logger logger = LoggerFactory.getLogger(TopicMsgBolt.class);
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String word = (String) input.getValue(0);
String out = "Message got is '" + word + "'!";
logger.info("out={}", out);
collector.emit(new Values(out));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
}
此處需要特別注意的是店溢,要使用
backtype.storm.topology.base.BaseBasicBolt
對(duì)象作為父類,否則不會(huì)在zk記錄偏移量offset數(shù)據(jù)委乌。
需要編寫(xiě)的代碼已完成床牧,接下來(lái)就是在搭建好的storm、kafka中進(jìn)行測(cè)試:
# 創(chuàng)建topic
./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic1
./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic2
接下來(lái)需要分別對(duì)msgTopic1遭贸、msgTopic2啟動(dòng)producer(生產(chǎn)者)與consumer(消費(fèi)者):
# 對(duì)msgTopic1啟動(dòng)producer戈咳,用于發(fā)送數(shù)據(jù)
./bin/kafka-console-producer.sh --broker-list dev2_55.wfj-search:9092 --topic msgTopic1
# 對(duì)msgTopic2啟動(dòng)consumer,用于查看發(fā)送數(shù)據(jù)的處理結(jié)果
./bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2281,zk3:2381 --topic msgTopic2 --from-beginning
然后將打好的jar包上傳到storm的nimbus(可以使用遠(yuǎn)程上傳或先上傳jar包到nimbus節(jié)點(diǎn)所在服務(wù)器,然后本地執(zhí)行):
# ./bin/storm jar topology TopicMsgTopology.jar cn.howardliu.demo.storm.kafka.topicMsg.TopicMsgTopology TopicMsgTopology
待對(duì)應(yīng)的worker啟動(dòng)好之后除秀,就可以在msgTopic1的producer對(duì)應(yīng)終端輸入數(shù)據(jù)糯累,然后在msgTopic2的consumer對(duì)應(yīng)終端查看輸出結(jié)果了。
有幾點(diǎn)需要注意的:
- 必須先創(chuàng)建msgTopic1册踩、msgTopic2兩個(gè)topic泳姐;
- 定義的bolt必須使用BaseBasicBolt作為父類,不能夠使用BaseRichBolt暂吉,否則無(wú)法記錄偏移量胖秒;
- zookeeper最好使用至少三個(gè)節(jié)點(diǎn)的分布式模式或偽分布式模式,否則會(huì)出現(xiàn)一些異常情況慕的;
- 在整個(gè)storm下阎肝,spout、bolt的id必須唯一肮街,否則會(huì)出現(xiàn)異常风题。
-
TopicMsgBolt
類作為storm.kafka.bolt.KafkaBolt
前的最后一個(gè)Bolt,需要將輸出數(shù)據(jù)名稱定義為message嫉父,否則KafkaBolt無(wú)法接收數(shù)據(jù)沛硅。
wordCount
簡(jiǎn)單的輸入輸出做完了,來(lái)點(diǎn)復(fù)雜點(diǎn)兒的場(chǎng)景:從某個(gè)topic定于消息绕辖,然后根據(jù)空格分詞摇肌,統(tǒng)計(jì)單詞數(shù)量,然后將當(dāng)前輸入的單詞數(shù)量推送到另一個(gè)topic仪际。
首先規(guī)劃需要用到的類:
- 從KafkaSpout接收數(shù)據(jù)并進(jìn)行處理的
backtype.storm.spout.Scheme
子類围小; - 數(shù)據(jù)切分bolt:
SplitSentenceBolt
; - 計(jì)數(shù)bolt:
WordCountBolt
树碱; - 報(bào)表bolt:
ReportBolt
肯适; - topology定義:
WordCountTopology
; - 最后再加一個(gè)原樣顯示訂閱數(shù)據(jù)的bolt:
SentenceBolt
赴恨。
backtype.storm.spout.Scheme
子類可以使用上面已經(jīng)定義過(guò)的MessageScheme
疹娶,此處不再贅述。
SplitSentenceBolt
是對(duì)輸入數(shù)據(jù)進(jìn)行分割伦连,簡(jiǎn)單的使用String類的split方法雨饺,然后將每個(gè)單詞命名為“word”,向后傳輸惑淳,代碼如下:
public class SplitSentenceBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String sentence = input.getStringByField("msg");
String[] words = sentence.split(" ");
Arrays.asList(words).forEach(word -> collector.emit(new Values(word)));
}
}
SentenceBolt
是從KafkaSpout接收數(shù)據(jù)额港,然后直接輸出。在拓?fù)鋱D上就是從輸入分叉歧焦,一個(gè)進(jìn)入SplitSentenceBolt
移斩,一個(gè)進(jìn)入SentenceBolt
肚医。這種結(jié)構(gòu)可以應(yīng)用在Lambda架構(gòu)中,代碼如下:
public class SentenceBolt extends BaseBasicBolt {
private static final Logger logger = LoggerFactory.getLogger(SentenceBolt.class);
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String msg = tuple.getStringByField("msg");
logger.info("get one message is {}", msg);
basicOutputCollector.emit(new Values(msg));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
WordCountBolt
是對(duì)接收到的單詞進(jìn)行匯總統(tǒng)一向瓷,然后將單詞“word”及其對(duì)應(yīng)數(shù)量“count”向后傳輸肠套,代碼如下:
public class WordCountBolt extends BaseBasicBolt {
private Map<String, Long> counts = null;
@Override
public void prepare(Map stormConf, TopologyContext context) {
this.counts = new ConcurrentHashMap<>();
super.prepare(stormConf, context);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "count"));
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String word = input.getStringByField("word");
Long count = this.counts.get(word);
if (count == null) {
count = 0L;
}
count++;
this.counts.put(word, count);
collector.emit(new Values(word, count));
}
}
ReportBolt
是對(duì)接收到的單詞及數(shù)量進(jìn)行整理,拼成json格式猖任,然后繼續(xù)向后傳輸你稚,代碼如下:
public class ReportBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String word = input.getStringByField("word");
Long count = input.getLongByField("count");
String reportMessage = "{'word': '" + word + "', 'count': '" + count + "'}";
collector.emit(new Values(reportMessage));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("message"));
}
}
最后是定義topology(拓?fù)洌?code>WordCountTopology,代碼如下:
public class WordCountTopology {
private static final String KAFKA_SPOUT_ID = "kafkaSpout";
private static final String SENTENCE_BOLT_ID = "sentenceBolt";
private static final String SPLIT_BOLT_ID = "sentenceSplitBolt";
private static final String WORD_COUNT_BOLT_ID = "sentenceWordCountBolt";
private static final String REPORT_BOLT_ID = "reportBolt";
private static final String KAFKA_BOLT_ID = "kafkabolt";
private static final String CONSUME_TOPIC = "sentenceTopic";
private static final String PRODUCT_TOPIC = "wordCountTopic";
private static final String ZK_ROOT = "/topology/root";
private static final String ZK_ID = "wordCount";
private static final String DEFAULT_TOPOLOGY_NAME = "sentenceWordCountKafka";
public static void main(String[] args) throws Exception {
// 配置Zookeeper地址
BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381");
// 配置Kafka訂閱的Topic朱躺,以及zookeeper中數(shù)據(jù)節(jié)點(diǎn)目錄和名字
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, CONSUME_TOPIC, ZK_ROOT, ZK_ID);
spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(KAFKA_SPOUT_ID, new KafkaSpout(spoutConfig));
builder.setBolt(SENTENCE_BOLT_ID, new SentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
builder.setBolt(SPLIT_BOLT_ID, new SplitSentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
builder.setBolt(WORD_COUNT_BOLT_ID, new WordCountBolt()).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
builder.setBolt(REPORT_BOLT_ID, new ReportBolt()).shuffleGrouping(WORD_COUNT_BOLT_ID);
builder.setBolt(KAFKA_BOLT_ID, new KafkaBolt<String, Long>()).shuffleGrouping(REPORT_BOLT_ID);
Config config = new Config();
Map<String, String> map = new HashMap<>();
map.put("metadata.broker.list", "dev2_55.wfj-search:9092");// 配置Kafka broker地址
map.put("serializer.class", "kafka.serializer.StringEncoder");// serializer.class為消息的序列化類
config.put("kafka.broker.properties", map);// 配置KafkaBolt中的kafka.broker.properties
config.put("topic", PRODUCT_TOPIC);// 配置KafkaBolt生成的topic
if (args.length == 0) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology(DEFAULT_TOPOLOGY_NAME);
cluster.shutdown();
} else {
config.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
}
}
}
除了上面提過(guò)應(yīng)該注意的地方刁赖,此處還需要注意,
storm.kafka.SpoutConfig
定義的zkRoot與id應(yīng)該與第一個(gè)例子中不同(至少保證id不同长搀,否則兩個(gè)topology將使用一個(gè)節(jié)點(diǎn)記錄偏移量)宇弛。
個(gè)人主頁(yè): http://www.howardliu.cn
個(gè)人博文: storm筆記:Storm+Kafka簡(jiǎn)單應(yīng)用
CSDN主頁(yè): http://blog.csdn.net/liuxinghao