<h4>背景:</h4>
我們知道在java中可以有最少3鐘方式來實(shí)現(xiàn)定時(shí)任務(wù):1、普通thread(里面使用while循環(huán)以及sleep) 2铲觉、Timer和TimerTask 3迂卢、ScheduledExecutorService 美莫,另外還有功能更全的Quartz框架或者是spring集成的Quartz菜拓。當(dāng)然從標(biāo)題就知道我們今天不是講這些東西,而是講講storm中自帶的定時(shí)功能使用系宫,可以使用場(chǎng)景如:每分鐘統(tǒng)計(jì)訂單數(shù)據(jù)累計(jì)數(shù)據(jù)總和等索昂。當(dāng)然這其中最好的搭配就是使用kafka來做訂單消息推送,目前我們只講個(gè)本地main demo扩借。
<h4>一椒惨、tick全解</h4>
<b>1、tick的功能</b>
Apache Storm中內(nèi)置了一種定時(shí)機(jī)制——tick潮罪,它能夠讓任何bolt的所有task每隔一段時(shí)間(精確到秒級(jí)康谆,用戶可以自定義)收到一個(gè)來自systemd的tick stream的tick tuple,bolt收到這樣的tuple后可以根據(jù)業(yè)務(wù)需求完成相應(yīng)的處理嫉到。Tick功能從Apache Storm 0.8.0版本開始支持沃暗,本文在Apache Storm 0.9.5上測(cè)試。
<b>2何恶、為bolt設(shè)置tick</b>
若希望某個(gè)bolt每隔一段時(shí)間做一些操作孽锥,那么可以將bolt繼承BaseBasicBolt/BaseRichBolt,并重寫getComponentConfiguration()方法细层。在方法中設(shè)置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值惜辑,單位是秒。
getComponentConfiguration()是backtype.storm.topology.IComponent接口中定義的方法疫赎,在此方法的實(shí)現(xiàn)中可以定義以Topology開頭的此bolt特定的Config盛撑。
<pre>
</pre>
這樣設(shè)置之后,此bolt的所有task都會(huì)每隔一段時(shí)間收到一個(gè)來自systemd的tick stream的tick tuple捧搞,因此execute()方法可以實(shí)現(xiàn)如下:
<pre>
</pre>
<b>3抵卫、全局tick</b>
若希望Topology中的每個(gè)bolt都每隔一段時(shí)間做一些操作,那么可以定義一個(gè)Topology全局的tick实牡,同樣是設(shè)置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值:
<pre>
<b>當(dāng)我們?cè)谡麄€(gè)Topology上設(shè)置tick和我們單個(gè)運(yùn)算bolt上沖突時(shí)陌僵,其優(yōu)先級(jí)如何呢轴合?事實(shí)是在更小范圍的bolt設(shè)置的tick優(yōu)先級(jí)更高</b>
<b>4创坞、定時(shí)精度問題</b>
Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS是精確到秒級(jí)的。例如某bolt設(shè)置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS為10s受葛,理論上說bolt的每個(gè)task應(yīng)該每個(gè)10s收到一個(gè)tick tuple题涨。實(shí)際測(cè)試發(fā)現(xiàn)偎谁,這個(gè)時(shí)間間隔的精確性是很高的,一般延遲(而不是提前)時(shí)間在1-2ms左右纲堵。
<h4>二巡雨、代碼實(shí)現(xiàn)</h4>
1、spout代碼
<pre>
public class TickWordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {"a","b","c"};
private int index = 0;
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index ++;
if(index >= sentences.length){
index = 0;
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
@SuppressWarnings("rawtypes")
public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
</pre>
2席函、bolt代碼
<pre>
public class TickWordCountBolt extends BaseBasicBolt{
Map<String, Integer> counts = new ConcurrentHashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
System.err.println("TickWordCount bolt: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
//模擬聚合打印結(jié)果
for (String key : counts.keySet()) {
System.err.println("key: " + key + " count: " + counts.get(key));
}
//模擬10秒鐘的結(jié)果處理以后清空操作
counts.clear();
} else {
String result = tuple.getStringByField("word");
if(counts.get(result) == null){
counts.put(result, 1);
}else{
counts.put(result, counts.get(result) + 1);
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
//設(shè)置10秒發(fā)送一次tick心跳
@SuppressWarnings("static-access")
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return conf;
}
}
</pre>
3铐望、main調(diào)試代碼
<pre>
public class TickTest {
@SuppressWarnings("static-access")
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TickWordSpout());
//啟動(dòng)3個(gè)線程按word值進(jìn)行分組處理
builder.setBolt("count", new TickWordCountBolt(),3).fieldsGrouping("spout", new Fields("word"));
Config conf = new Config();
//設(shè)置一個(gè)全局的Topology發(fā)送tick心跳時(shí)間,測(cè)試優(yōu)先級(jí)
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 7);
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
}
}
}
---------------------------------------輸出結(jié)果------------------------------------------------
TickWordCount bolt: 2016-09-17 12:41:23:031
key: b count: 3014
TickWordCount bolt: 2016-09-17 12:41:23:041
key: c count: 3017
TickWordCount bolt: 2016-09-17 12:41:23:053
key: a count: 3021
</br>
TickWordCount bolt: 2016-09-17 12:41:33:031
key: b count: 3294
TickWordCount bolt: 2016-09-17 12:41:33:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:41:33:053
key: a count: 3295
</br>
TickWordCount bolt: 2016-09-17 12:41:43:031
key: b count: 3294
TickWordCount bolt: 2016-09-17 12:41:43:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:41:43:053
key: a count: 3293
</br>
TickWordCount bolt: 2016-09-17 12:41:53:031
key: b count: 3297
TickWordCount bolt: 2016-09-17 12:41:53:041
key: c count: 3297
TickWordCount bolt: 2016-09-17 12:41:53:053
key: a count: 3298
</br>
TickWordCount bolt: 2016-09-17 12:42:03:031
key: b count: 3293
TickWordCount bolt: 2016-09-17 12:42:03:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:42:03:053
key: a count: 3293
</pre>
從這組測(cè)試數(shù)據(jù)來看茂附,每組都是相隔10s執(zhí)行0延遲正蛙,不過在測(cè)試中也有發(fā)現(xiàn)延遲1-2ms的情況,還是比較精準(zhǔn)的营曼。
<h4>三乒验、tick實(shí)現(xiàn)代碼淺顯分析</h4>
TopologyBuilder.setBolt
<pre>
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}
</pre>
<pre>
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
validateUnusedId(id);
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
return new BoltGetter(id);
}
</pre>
<pre>
<b> //Map conf = component.getComponentConfiguration();能夠獲取設(shè)置的tick發(fā)送心跳的設(shè)置</b>
private void initCommon(String id, IComponent component, Number parallelism) {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if(parallelism!=null) common.set_parallelism_hint(parallelism.intValue());
Map conf = component.getComponentConfiguration();
if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}
</pre>
tick功能的使用就講到這里啦。蒂阱。锻全。