Storm hello world project
- pom.xml
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
- RedisBolt.java
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
/**
* Created by zhouwenchun on 17/3/24.
*/
public class RedisBolt extends BaseRichBolt {
private OutputCollector _collector;
private JedisPool pool;
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this._collector = outputCollector;
this.pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379);
}
public void execute(Tuple tuple) {
String log = tuple.getString(0);
System.out.println(sdf.format(new Date()));
System.out.println(log);
Jedis jedis = this.pool.getResource();
jedis.set("20151020", log);
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
- UserlogTopo.java
public class UserlogTopo {
private static String topicName = "test2";
private static String zkRoot = "/test/test2";
public static void main(String[] args) throws Exception{
BrokerHosts hosts = new ZkHosts("117.169.77.211:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName, zkRoot, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.metricsTimeBucketSizeInSecs = 5;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", kafkaSpout);
builder.setBolt("UserBolt", new RedisBolt(), 2)//parallelism_hint最后一個(gè)參數(shù)設(shè)置Executor的線程數(shù)量
.setNumTasks(4) //每個(gè)組件需要的執(zhí)行任務(wù)數(shù), 默認(rèn)1個(gè)Executor執(zhí)行1個(gè)任務(wù), 現(xiàn)在配置為2個(gè)
.shuffleGrouping("kafkaSpout");
Config conf = new Config();
conf.setDebug(false);
if(args != null && args.length > 0) {
StormSubmitter.submitTopology("userTopo", conf, builder.createTopology());
}else{
// conf.setMaxSpoutPending(100);
// conf.setMaxTaskParallelism(2); //該選項(xiàng)設(shè)置了一個(gè)組件最多能夠分配的 executor 數(shù)(線程數(shù)上限)
// conf.setKryoFactory();
conf.put(Config.NIMBUS_HOST, "10.0.12.36");
conf.setNumWorkers(3); //設(shè)置workers的進(jìn)程數(shù)量
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("userTopo", conf, builder.createTopology());
}
}
}
Storm 的消息可靠性
可靠性API
- 將原始Tupple和新Tupple一起發(fā)送;
- 調(diào)用collector#ack()通知Storm處理完成之剧;
或者使用更簡(jiǎn)單的方式是繼承BaseBasicBolt會(huì)自動(dòng)完成以上兩個(gè)操作;
禁用可靠性機(jī)制:
- 將 Config.TOPOLOGY_ACKERS 設(shè)置為0
- 可以通過在 SpoutOutputCollector.emit 方法中省略消息 id 來關(guān)閉 spout tuple 的跟蹤功能;
- 可以在發(fā)送 tuple 的時(shí)候選擇發(fā)送“非錨定”的(unanchored)tuple狞谱。
Storm 拓?fù)涞牟⑿卸?parallelism)理解
配置storm的Topo的并行度:
- work數(shù)量(Topo在集群中運(yùn)行所需的工作進(jìn)程數(shù)), 配置方法: Config#setNumWorkers
- Executors數(shù)量(每個(gè)組件需要執(zhí)行的線程數(shù)), 配置方法: TopologyBuilder#setSpout() 或TopologyBuilder#setBolt()
- Task數(shù)量(每個(gè)組件需要執(zhí)行的任務(wù)數(shù)), 配置方法: ComponentConfigurationDeclare#setNumTasks()
如何修改運(yùn)行當(dāng)中Topo的并行度
- 使用Storm UI
- 使用命令行 eg: storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
說明:重新配置拓?fù)?"mytopology"亚茬,使得該拓?fù)鋼碛?5 個(gè) worker processes歼秽,
另外贰盗,配置名為 "blue-spout" 的 spout 使用 3 個(gè) executor斑鸦,
配置名為 "yellow-bolt" 的 bolt 使用 10 個(gè) executor眯停。