storm_定時(shí)機(jī)制tick使用

<h4>背景:</h4>
我們知道在java中可以有最少3鐘方式來實(shí)現(xiàn)定時(shí)任務(wù):1、普通thread(里面使用while循環(huán)以及sleep) 2铲觉、Timer和TimerTask 3迂卢、ScheduledExecutorService 美莫,另外還有功能更全的Quartz框架或者是spring集成的Quartz菜拓。當(dāng)然從標(biāo)題就知道我們今天不是講這些東西,而是講講storm中自帶的定時(shí)功能使用系宫,可以使用場(chǎng)景如:每分鐘統(tǒng)計(jì)訂單數(shù)據(jù)累計(jì)數(shù)據(jù)總和等索昂。當(dāng)然這其中最好的搭配就是使用kafka來做訂單消息推送,目前我們只講個(gè)本地main demo扩借。
<h4>一椒惨、tick全解</h4>
<b>1、tick的功能</b>
Apache Storm中內(nèi)置了一種定時(shí)機(jī)制——tick潮罪,它能夠讓任何bolt的所有task每隔一段時(shí)間(精確到秒級(jí)康谆,用戶可以自定義)收到一個(gè)來自systemd的tick stream的tick tuple,bolt收到這樣的tuple后可以根據(jù)業(yè)務(wù)需求完成相應(yīng)的處理嫉到。Tick功能從Apache Storm 0.8.0版本開始支持沃暗,本文在Apache Storm 0.9.5上測(cè)試。
<b>2何恶、為bolt設(shè)置tick</b>
若希望某個(gè)bolt每隔一段時(shí)間做一些操作孽锥,那么可以將bolt繼承BaseBasicBolt/BaseRichBolt,并重寫getComponentConfiguration()方法细层。在方法中設(shè)置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值惜辑,單位是秒。
getComponentConfiguration()是backtype.storm.topology.IComponent接口中定義的方法疫赎,在此方法的實(shí)現(xiàn)中可以定義以Topology開頭的此bolt特定的Config盛撑。
<pre>


</pre>
這樣設(shè)置之后,此bolt的所有task都會(huì)每隔一段時(shí)間收到一個(gè)來自systemd的tick stream的tick tuple捧搞,因此execute()方法可以實(shí)現(xiàn)如下:
<pre>

</pre>
<b>3抵卫、全局tick</b>
若希望Topology中的每個(gè)bolt都每隔一段時(shí)間做一些操作,那么可以定義一個(gè)Topology全局的tick实牡,同樣是設(shè)置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值:
<pre>
</pre>
<b>當(dāng)我們?cè)谡麄€(gè)Topology上設(shè)置tick和我們單個(gè)運(yùn)算bolt上沖突時(shí)陌僵,其優(yōu)先級(jí)如何呢轴合?事實(shí)是在更小范圍的bolt設(shè)置的tick優(yōu)先級(jí)更高</b>
<b>4创坞、定時(shí)精度問題</b>
Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS是精確到秒級(jí)的。例如某bolt設(shè)置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS為10s受葛,理論上說bolt的每個(gè)task應(yīng)該每個(gè)10s收到一個(gè)tick tuple题涨。實(shí)際測(cè)試發(fā)現(xiàn)偎谁,這個(gè)時(shí)間間隔的精確性是很高的,一般延遲(而不是提前)時(shí)間在1-2ms左右纲堵。
<h4>二巡雨、代碼實(shí)現(xiàn)</h4>
1、spout代碼
<pre>
public class TickWordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {"a","b","c"};
private int index = 0;
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index ++;
if(index >= sentences.length){
index = 0;
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
@SuppressWarnings("rawtypes")
public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
</pre>
2席函、bolt代碼
<pre>
public class TickWordCountBolt extends BaseBasicBolt{
Map<String, Integer> counts = new ConcurrentHashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
System.err.println("TickWordCount bolt: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));
//模擬聚合打印結(jié)果
for (String key : counts.keySet()) {
System.err.println("key: " + key + " count: " + counts.get(key));
}
//模擬10秒鐘的結(jié)果處理以后清空操作
counts.clear();
} else {
String result = tuple.getStringByField("word");
if(counts.get(result) == null){
counts.put(result, 1);
}else{
counts.put(result, counts.get(result) + 1);
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
//設(shè)置10秒發(fā)送一次tick心跳
@SuppressWarnings("static-access")
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return conf;
}
}
</pre>
3铐望、main調(diào)試代碼
<pre>
public class TickTest {
@SuppressWarnings("static-access")
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TickWordSpout());
//啟動(dòng)3個(gè)線程按word值進(jìn)行分組處理
builder.setBolt("count", new TickWordCountBolt(),3).fieldsGrouping("spout", new Fields("word"));
Config conf = new Config();
//設(shè)置一個(gè)全局的Topology發(fā)送tick心跳時(shí)間,測(cè)試優(yōu)先級(jí)
conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 7);
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
}
}
}
---------------------------------------輸出結(jié)果------------------------------------------------
TickWordCount bolt: 2016-09-17 12:41:23:031
key: b count: 3014
TickWordCount bolt: 2016-09-17 12:41:23:041
key: c count: 3017
TickWordCount bolt: 2016-09-17 12:41:23:053
key: a count: 3021
</br>
TickWordCount bolt: 2016-09-17 12:41:33:031
key: b count: 3294
TickWordCount bolt: 2016-09-17 12:41:33:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:41:33:053
key: a count: 3295
</br>
TickWordCount bolt: 2016-09-17 12:41:43:031
key: b count: 3294
TickWordCount bolt: 2016-09-17 12:41:43:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:41:43:053
key: a count: 3293
</br>
TickWordCount bolt: 2016-09-17 12:41:53:031
key: b count: 3297
TickWordCount bolt: 2016-09-17 12:41:53:041
key: c count: 3297
TickWordCount bolt: 2016-09-17 12:41:53:053
key: a count: 3298
</br>
TickWordCount bolt: 2016-09-17 12:42:03:031
key: b count: 3293
TickWordCount bolt: 2016-09-17 12:42:03:041
key: c count: 3294
TickWordCount bolt: 2016-09-17 12:42:03:053
key: a count: 3293
</pre>
從這組測(cè)試數(shù)據(jù)來看茂附,每組都是相隔10s執(zhí)行0延遲正蛙,不過在測(cè)試中也有發(fā)現(xiàn)延遲1-2ms的情況,還是比較精準(zhǔn)的营曼。
<h4>三乒验、tick實(shí)現(xiàn)代碼淺顯分析</h4>
TopologyBuilder.setBolt
<pre>
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}
</pre>
<pre>
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
validateUnusedId(id);
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
return new BoltGetter(id);
}
</pre>
<pre>
<b> //Map conf = component.getComponentConfiguration();能夠獲取設(shè)置的tick發(fā)送心跳的設(shè)置</b>
private void initCommon(String id, IComponent component, Number parallelism) {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if(parallelism!=null) common.set_parallelism_hint(parallelism.intValue());
Map conf = component.getComponentConfiguration();
if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}
</pre>
tick功能的使用就講到這里啦。蒂阱。锻全。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市录煤,隨后出現(xiàn)的幾起案子鳄厌,更是在濱河造成了極大的恐慌,老刑警劉巖妈踊,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件部翘,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡响委,警方通過查閱死者的電腦和手機(jī)新思,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赘风,“玉大人夹囚,你說我怎么就攤上這事⊙裕” “怎么了荸哟?”我有些...
    開封第一講書人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長瞬捕。 經(jīng)常有香客問我鞍历,道長,這世上最難降的妖魔是什么肪虎? 我笑而不...
    開封第一講書人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任劣砍,我火速辦了婚禮,結(jié)果婚禮上扇救,老公的妹妹穿的比我還像新娘刑枝。我一直安慰自己香嗓,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開白布装畅。 她就那樣靜靜地躺著靠娱,像睡著了一般。 火紅的嫁衣襯著肌膚如雪掠兄。 梳的紋絲不亂的頭發(fā)上像云,一...
    開封第一講書人閱讀 49,036評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音蚂夕,去河邊找鬼苫费。 笑死,一個(gè)胖子當(dāng)著我的面吹牛双抽,可吹牛的內(nèi)容都是我干的百框。 我是一名探鬼主播,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼牍汹,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼铐维!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起慎菲,我...
    開封第一講書人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤嫁蛇,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后露该,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體睬棚,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年解幼,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了抑党。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡撵摆,死狀恐怖底靠,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情特铝,我是刑警寧澤暑中,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站鲫剿,受9級(jí)特大地震影響鳄逾,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜灵莲,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一雕凹、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦请琳、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至榕堰,卻和暖如春竖慧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背逆屡。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來泰國打工圾旨, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人魏蔗。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓砍的,卻偏偏與公主長得像,于是被迫代替她去往敵國和親莺治。 傳聞我的和親對(duì)象是個(gè)殘疾皇子廓鞠,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容