如何在Storm編程實現(xiàn)與Kafka的集成

問題導(dǎo)讀

1.如何編程實現(xiàn)Storm與Kafka集成?

2.Storm中Topology如何實現(xiàn)的失尖?

3.如何驗證集成效果?

一菇夸、實現(xiàn)模型

數(shù)據(jù)流程:

1胧辽、Kafka Producter生成topic1主題的消息

2、Storm中有個Topology摄咆,包含了KafkaSpout人断、SenqueceBolt、KafkaBolt三個組件涩金。其中KafkaSpout訂閱了topic1主題消息步做,然后發(fā)送

給SenqueceBolt加工處理奈附,最后數(shù)據(jù)由KafkaBolt生成topic2主題消息發(fā)送給Kafka

3、Kafka Consumer負(fù)責(zé)消費topic2主題的消息

二、Topology實現(xiàn)

1佑颇、創(chuàng)建maven工程挑胸,配置pom.xml

需要依賴storm-core、kafka_2.10簿透、storm-kafka三個包

org.apache.storm

storm-core

0.9.2-incubating

provided

org.apache.kafka

kafka_2.10

0.8.1.1

org.apache.zookeeper

zookeeper

log4j

log4j

org.apache.storm

storm-kafka

0.9.2-incubating

maven-assembly-plugin

2.4

jar-with-dependencies

make-assembly

package

single

復(fù)制代碼

2萎战、KafkaSpout

KafkaSpout是Storm中自帶的Spout舆逃,源碼在https://github.com/apache/incubator-storm/tree/master/external

使用KafkaSpout時需要子集實現(xiàn)Scheme接口,它主要負(fù)責(zé)從消息流中解析出需要的數(shù)據(jù)

public class MessageScheme implements Scheme {

/* (non-Javadoc)

* @see backtype.storm.spout.Scheme#deserialize(byte[])

*/

public List deserialize(byte[] ser) {

try {

String msg = new String(ser, "UTF-8");

return new Values(msg);

} catch (UnsupportedEncodingException e) {

}

return null;

}

/* (non-Javadoc)

* @see backtype.storm.spout.Scheme#getOutputFields()

*/

public Fields getOutputFields() {

// TODO Auto-generated method stub

return new Fields("msg");

}

}

復(fù)制代碼

3蔚约、SenqueceBolt

SenqueceBolt實現(xiàn)很簡單涂籽,在接收的spout的消息前面加上“I‘m”

public class SenqueceBolt extends BaseBasicBolt{

/* (non-Javadoc)

* @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)

*/

public void execute(Tuple input, BasicOutputCollector collector) {

// TODO Auto-generated method stub

String word = (String) input.getValue(0);

String out = "I'm " + word +??"!";

System.out.println("out=" + out);

collector.emit(new Values(out));

}

/* (non-Javadoc)

* @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)

*/

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("message"));

}

}

復(fù)制代碼

4评雌、KafkaBolt

KafkaBolt是Storm中自帶的Bolt景东,負(fù)責(zé)向Kafka發(fā)送主題消息

5、Topology

public class StormKafkaTopo {

public static void main(String[] args) throws Exception {

// 配置Zookeeper地址

BrokerHosts brokerHosts = new ZkHosts("node04:2181,node05:2181,node06:2181");

// 配置Kafka訂閱的Topic搔涝,以及zookeeper中數(shù)據(jù)節(jié)點目錄和名字

SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/zkkafkaspout" , "kafkaspout");

// 配置KafkaBolt中的kafka.broker.properties

Config conf = new Config();

Map map = new HashMap();

// 配置Kafka broker地址

map.put("metadata.broker.list", "node04:9092");

// serializer.class為消息的序列化類

map.put("serializer.class", "kafka.serializer.StringEncoder");

conf.put("kafka.broker.properties", map);

// 配置KafkaBolt生成的topic

conf.put("topic", "topic2");

spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new KafkaSpout(spoutConfig));

builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");

builder.setBolt("kafkabolt", new KafkaBolt()).shuffleGrouping("bolt");

if (args != null && args.length > 0) {

conf.setNumWorkers(3);

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

} else {

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("Topo", conf, builder.createTopology());

Utils.sleep(100000);

cluster.killTopology("Topo");

cluster.shutdown();

}

}

}

復(fù)制代碼

三、測試驗證

1派阱、使用Kafka client模擬Kafka Producter ,生成topic1主題

bin/kafka-console-producer.sh --broker-list node04:9092 --topic topic1

2颁褂、使用Kafka client模擬Kafka Consumer颁独,訂閱topic2主題

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic2 --from-beginning

3伪冰、運行Strom Topology

bin/storm jar storm-kafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar??StormKafkaTopo KafkaStorm

4贮聂、運行結(jié)果

原創(chuàng)文章吓懈,轉(zhuǎn)載請注明: 轉(zhuǎn)載自http://www.cnblogs.com/tovin/p/3974417.html

public class StormKafkaTopo {

public static void main(String[] args) throws Exception {

// 配置Zookeeper地址

BrokerHosts brokerHosts = new ZkHosts("storm1:2181,storm2:2181,storm3:2181");

// 配置Kafka訂閱的Topic,以及zookeeper中數(shù)據(jù)節(jié)點目錄和名字

SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/zkkafkaspout" , "kafkaspout");

// 配置KafkaBolt中的kafka.broker.properties

Config conf = new Config();

Map map = new HashMap();

// 配置Kafka broker地址

map.put("metadata.broker.list", "storm3:9092");

// serializer.class為消息的序列化類

map.put("serializer.class", "kafka.serializer.StringEncoder");

conf.put("kafka.broker.properties", map);

// 配置KafkaBolt生成的topic

conf.put("topic", "topic2");

spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new KafkaSpout(spoutConfig));

builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");

builder.setBolt("kafkabolt", new KafkaBolt()).shuffleGrouping("bolt");

if (args != null && args.length > 0) {

conf.setNumWorkers(3);

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

} else {

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("Topo", conf, builder.createTopology());

Utils.sleep(100000);

cluster.killTopology("Topo");

cluster.shutdown();

}

}

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末甸怕,一起剝皮案震驚了整個濱河市梢杭,隨后出現(xiàn)的幾起案子秸滴,更是在濱河造成了極大的恐慌,老刑警劉巖咒唆,帶你破解...
    沈念sama閱讀 222,627評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件钧排,死亡現(xiàn)場離奇詭異均澳,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)糟袁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評論 3 399
  • 文/潘曉璐 我一進(jìn)店門项戴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來槽惫,“玉大人周叮,你說我怎么就攤上這事〗缧保” “怎么了仿耽?”我有些...
    開封第一講書人閱讀 169,346評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長各薇。 經(jīng)常有香客問我项贺,道長,這世上最難降的妖魔是什么峭判? 我笑而不...
    開封第一講書人閱讀 60,097評論 1 300
  • 正文 為了忘掉前任开缎,我火速辦了婚禮,結(jié)果婚禮上林螃,老公的妹妹穿的比我還像新娘奕删。我一直安慰自己疗认,他們只是感情好完残,可當(dāng)我...
    茶點故事閱讀 69,100評論 6 398
  • 文/花漫 我一把揭開白布砌滞。 她就那樣靜靜地躺著,像睡著了一般坏怪。 火紅的嫁衣襯著肌膚如雪贝润。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,696評論 1 312
  • 那天铝宵,我揣著相機(jī)與錄音打掘,去河邊找鬼。 笑死鹏秋,一個胖子當(dāng)著我的面吹牛尊蚁,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播侣夷,決...
    沈念sama閱讀 41,165評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼横朋,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了百拓?” 一聲冷哼從身側(cè)響起琴锭,我...
    開封第一講書人閱讀 40,108評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎衙传,沒想到半個月后决帖,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,646評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡蓖捶,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,709評論 3 342
  • 正文 我和宋清朗相戀三年地回,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片俊鱼。...
    茶點故事閱讀 40,861評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡刻像,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出并闲,到底是詐尸還是另有隱情细睡,我是刑警寧澤,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布焙蚓,位于F島的核電站纹冤,受9級特大地震影響洒宝,放射性物質(zhì)發(fā)生泄漏购公。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,196評論 3 336
  • 文/蒙蒙 一雁歌、第九天 我趴在偏房一處隱蔽的房頂上張望宏浩。 院中可真熱鬧,春花似錦靠瞎、人聲如沸比庄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽佳窑。三九已至制恍,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間神凑,已是汗流浹背净神。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留溉委,地道東北人鹃唯。 一個月前我還...
    沈念sama閱讀 49,287評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像瓣喊,于是被迫代替她去往敵國和親坡慌。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,860評論 2 361

推薦閱讀更多精彩內(nèi)容