不透明分區(qū)事務(wù)


title: storm不透明分區(qū)事務(wù)
date: 2017-08-27
categoties:
-storm
tags:

  • storm
  • java

不透明分區(qū)事務(wù)

一般情況下storm是在構(gòu)造函數(shù)中對基本數(shù)據(jù)類型和可序列化的對象進(jìn)行賦值和實(shí)例化痢法,在prepare()方法中對不可序列化的對象進(jìn)行實(shí)例化

接口說明

IOpaquePartitionedTransactionalSpout<T>:不透明分區(qū)事務(wù)Spout
--IOpaquePartitionedTransactionalSpout.Coordinator
--isReady():為true開啟一個(gè)新的事務(wù),沒提交一個(gè)事務(wù)停頓一段時(shí)間杜跷,可以在此進(jìn)行sleep
--IOpaquePartitionedTransactionalSpout.Emitter<X>
-- emitPartitionBatch(TransactionAttempt tx,BatchOutputCollector collector,int partition,X lastPartitionMeta)
-- numPartitions()

不透明分區(qū)事務(wù)特點(diǎn)

以下X表示元數(shù)據(jù)
它不區(qū)分發(fā)新消息還是重發(fā)舊消息额港,全部用emitPartitionBatch搞定。雖然emitPartitionBatch返回的X應(yīng)該是下一批次供自己使用的(emitPartitionBatch的第4個(gè)參數(shù))积瞒,但是只有一個(gè)批次成功以后X才會(huì)更新到ZooKeeper中君仆,如果失敗重發(fā)最爬,emitPartitionBatch讀取的X還是舊的铃岔。所以這時(shí)候自定義的X不需要記錄當(dāng)前批次的開始位置和下一批次的開始位置兩個(gè)值汪疮,只需要記錄下一批次開始位置一個(gè)值即可,例如:
public class BatchMeta{
public long nextOffset; //下一批次的偏移量
}

實(shí)例

制造一批數(shù)據(jù)對數(shù)據(jù)按照時(shí)間進(jìn)行內(nèi)的進(jìn)行統(tǒng)計(jì)

定義不透明分區(qū)事務(wù)的spout毁习,在構(gòu)造函數(shù)中制造數(shù)據(jù)源

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

public class MyOpaquePtTxSpout implements IOpaquePartitionedTransactionalSpout<MyMdata> {


    private static final long serialVersionUID = -1889920861418121463L;

    public static Map<Integer,Map<Long,String>> Partion_Data_Base = new HashMap<Integer, Map<Long, String>>();
    public static int PART_COUNT = 5;

    public MyOpaquePtTxSpout() {

        Random _rand = new Random();
        String[] hosts={"www.haoyidao.com"};
        String[] session_id =  { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };

        String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53",
                "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };

        for (int i=0;i<PART_COUNT;i++){
            Map<Long,String> map = new HashMap<Long, String>();
            for (long j=0;j<30;j++){
                map.put(j,hosts[0]+"\t"+session_id[_rand.nextInt(5)]+"\t"+_rand.nextInt(8));
            }
            Partion_Data_Base.put(i,map);
        }

        System.err.println("MtPtTxSpout start.....");

    }

    public Emitter<MyMdata> getEmitter(Map map, TopologyContext topologyContext) {
        return new MyEmitter();
    }

    public Coordinator getCoordinator(Map map, TopologyContext topologyContext) {
        return new MyCoordinator();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        outputFieldsDeclarer.declare(new Fields("tx","today","log"));
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    public class MyCoordinator implements IOpaquePartitionedTransactionalSpout.Coordinator{

        public boolean isReady() {
            Utils.sleep(1000);
            return true;
        }

        public void close() {

        }
    }

    public static final long BITCHNUM = 10;

    public class MyEmitter implements IOpaquePartitionedTransactionalSpout.Emitter<MyMdata>{

        public MyMdata emitPartitionBatch(TransactionAttempt transactionAttempt, BatchOutputCollector batchOutputCollector, int i, MyMdata myMdata) {

            long beginPoint = 0;
            if (myMdata == null){
                beginPoint = 0;
            }else {
                beginPoint = myMdata.getBeginPoint()+myMdata.getNum();
            }
            MyMdata myMdata1 = new MyMdata();
            myMdata1.setBeginPoint(beginPoint);
            myMdata1.setNum(BITCHNUM);
            System.err.println("啟動(dòng)一個(gè)事務(wù):"+myMdata1.toString());

            //進(jìn)行發(fā)射數(shù)據(jù)
            Map<Long,String> batchMap = Partion_Data_Base.get(i);
            for (Long j = myMdata1.getBeginPoint();j<myMdata1.getBeginPoint()+myMdata1.getNum();j++){
                //表示一個(gè)分區(qū)消息發(fā)送完了
                if (batchMap.size()<j){
                    break;
                }
                batchOutputCollector.emit(new Values(transactionAttempt,"2017-08-09",batchMap.get(j)));
            }

            return myMdata1;
        }

        public int numPartitions() {
            return 5;
        }

        public void close() {

        }
    }
}

定義bolt

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.coordination.IBatchBolt;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;

public class Mybolt implements IBatchBolt<TransactionAttempt> {

    private static  Map<String,Integer> countMap = new HashMap<String, Integer>();
    private BatchOutputCollector batchOutputCollector;
    private TransactionAttempt transactionAttempt;
    DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
    String today;
    Integer count;



    public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
        this.batchOutputCollector = batchOutputCollector;
        this.transactionAttempt = transactionAttempt;

    }

    public void execute(Tuple tuple) {
        TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);
        String log = (String) tuple.getValue(1);
        String[] strings = log.split("\t");
        today = "2014-01-07";
         count = countMap.get(today);
        if (count == null){
            count = 0;
        }
        count++;
        countMap.put(today,count);

    }

    public void finishBatch() {
        System.err.println(this.transactionAttempt.toString()+"---"+today+"----"+count);
        this.batchOutputCollector.emit(new Values(this.transactionAttempt,today,count));

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        outputFieldsDeclarer.declare(new Fields("tx","today","count"));

    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

定義提交報(bào)告的commit

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.ICommitter;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;

public class MyCommitbolt extends BaseTransactionalBolt implements ICommitter{

    private static final long serialVersionUID = 1550322821850864035L;

    public static Map<String,DbValue> dbValueMap = new HashMap<String, DbValue>();
    Map<String,Integer> countMap = new HashMap<String, Integer>();
    TransactionAttempt transactionAttempt;
    BatchOutputCollector collector;
    String today = null;


    public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {

        this.transactionAttempt = transactionAttempt;
        this.collector = batchOutputCollector;
    }

    public void execute(Tuple tuple) {
        today = tuple.getString(1);
        Integer count = tuple.getInteger(2);
        transactionAttempt = (TransactionAttempt) tuple.getValue(0);
        if (today!=null&&count!=null){
            Integer batchCount = countMap.get(today);
            if (batchCount==null){
                batchCount = 0;
            }
            batchCount = batchCount+count;
            countMap.put(today,batchCount);
        }

    }

    public void finishBatch() {
        if (countMap.size()>0){
            DbValue dbValue = dbValueMap.get(today);

            DbValue newValue;

            if (dbValue==null||!dbValue.txid.equals(transactionAttempt.getTransactionId())){
                newValue = new DbValue();
                newValue.txid = transactionAttempt.getTransactionId();
                newValue.dataStr = today;
                if (dbValue==null){
                    newValue.count = countMap.get(today);
                    newValue.pre_cout = 0;
                }else {
                    newValue.count = dbValue.count+countMap.get(today);
                    newValue.pre_cout = dbValue.count;
                }

                dbValueMap.put(today,newValue);
            }

            System.out.println("total==========:"+dbValueMap.get(today).count);

        }


    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    public static class DbValue{
        BigInteger txid;
        int count = 0;
        String dataStr;
        int pre_cout;
    }
}

定義topo結(jié)構(gòu)

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.ICommitter;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;

public class MyCommitbolt extends BaseTransactionalBolt implements ICommitter{

    private static final long serialVersionUID = 1550322821850864035L;

    public static Map<String,DbValue> dbValueMap = new HashMap<String, DbValue>();
    Map<String,Integer> countMap = new HashMap<String, Integer>();
    TransactionAttempt transactionAttempt;
    BatchOutputCollector collector;
    String today = null;


    public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {

        this.transactionAttempt = transactionAttempt;
        this.collector = batchOutputCollector;
    }

    public void execute(Tuple tuple) {
        today = tuple.getString(1);
        Integer count = tuple.getInteger(2);
        transactionAttempt = (TransactionAttempt) tuple.getValue(0);
        if (today!=null&&count!=null){
            Integer batchCount = countMap.get(today);
            if (batchCount==null){
                batchCount = 0;
            }
            batchCount = batchCount+count;
            countMap.put(today,batchCount);
        }

    }

    public void finishBatch() {
        if (countMap.size()>0){
            DbValue dbValue = dbValueMap.get(today);

            DbValue newValue;

            if (dbValue==null||!dbValue.txid.equals(transactionAttempt.getTransactionId())){
                newValue = new DbValue();
                newValue.txid = transactionAttempt.getTransactionId();
                newValue.dataStr = today;
                if (dbValue==null){
                    newValue.count = countMap.get(today);
                    newValue.pre_cout = 0;
                }else {
                    newValue.count = dbValue.count+countMap.get(today);
                    newValue.pre_cout = dbValue.count;
                }

                dbValueMap.put(today,newValue);
            }

            System.out.println("total==========:"+dbValueMap.get(today).count);

        }


    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    public static class DbValue{
        BigInteger txid;
        int count = 0;
        String dataStr;
        int pre_cout;

    }

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末智嚷,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子纺且,更是在濱河造成了極大的恐慌盏道,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,290評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件载碌,死亡現(xiàn)場離奇詭異猜嘱,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)嫁艇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評論 2 385
  • 文/潘曉璐 我一進(jìn)店門泉坐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人裳仆,你說我怎么就攤上這事」虑眨” “怎么了歧斟?”我有些...
    開封第一講書人閱讀 156,872評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長偏形。 經(jīng)常有香客問我静袖,道長,這世上最難降的妖魔是什么俊扭? 我笑而不...
    開封第一講書人閱讀 56,415評論 1 283
  • 正文 為了忘掉前任队橙,我火速辦了婚禮,結(jié)果婚禮上萨惑,老公的妹妹穿的比我還像新娘捐康。我一直安慰自己,他們只是感情好庸蔼,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評論 6 385
  • 文/花漫 我一把揭開白布解总。 她就那樣靜靜地躺著,像睡著了一般姐仅。 火紅的嫁衣襯著肌膚如雪花枫。 梳的紋絲不亂的頭發(fā)上刻盐,一...
    開封第一講書人閱讀 49,784評論 1 290
  • 那天,我揣著相機(jī)與錄音劳翰,去河邊找鬼敦锌。 笑死,一個(gè)胖子當(dāng)著我的面吹牛佳簸,可吹牛的內(nèi)容都是我干的乙墙。 我是一名探鬼主播,決...
    沈念sama閱讀 38,927評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼溺蕉,長吁一口氣:“原來是場噩夢啊……” “哼伶丐!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起疯特,我...
    開封第一講書人閱讀 37,691評論 0 266
  • 序言:老撾萬榮一對情侶失蹤哗魂,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后漓雅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體录别,經(jīng)...
    沈念sama閱讀 44,137評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評論 2 326
  • 正文 我和宋清朗相戀三年邻吞,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了组题。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,622評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡抱冷,死狀恐怖崔列,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情旺遮,我是刑警寧澤赵讯,帶...
    沈念sama閱讀 34,289評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站耿眉,受9級特大地震影響边翼,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜鸣剪,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評論 3 312
  • 文/蒙蒙 一组底、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧筐骇,春花似錦债鸡、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至饺鹃,卻和暖如春莫秆,著一層夾襖步出監(jiān)牢的瞬間间雀,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工镊屎, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留惹挟,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,316評論 2 360
  • 正文 我出身青樓缝驳,卻偏偏與公主長得像连锯,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子用狱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評論 2 348

推薦閱讀更多精彩內(nèi)容