title: storm不透明分區(qū)事務(wù)
date: 2017-08-27
categoties:
-storm
tags:
- storm
- java
不透明分區(qū)事務(wù)
一般情況下storm是在構(gòu)造函數(shù)中對基本數(shù)據(jù)類型和可序列化的對象進(jìn)行賦值和實(shí)例化痢法,在prepare()方法中對不可序列化的對象進(jìn)行實(shí)例化
接口說明
IOpaquePartitionedTransactionalSpout<T>:不透明分區(qū)事務(wù)Spout
--IOpaquePartitionedTransactionalSpout.Coordinator
--isReady():為true開啟一個(gè)新的事務(wù),沒提交一個(gè)事務(wù)停頓一段時(shí)間杜跷,可以在此進(jìn)行sleep
--IOpaquePartitionedTransactionalSpout.Emitter<X>
-- emitPartitionBatch(TransactionAttempt tx,BatchOutputCollector collector,int partition,X lastPartitionMeta)
-- numPartitions()
不透明分區(qū)事務(wù)特點(diǎn)
以下X表示元數(shù)據(jù)
它不區(qū)分發(fā)新消息還是重發(fā)舊消息额港,全部用emitPartitionBatch搞定。雖然emitPartitionBatch返回的X應(yīng)該是下一批次供自己使用的(emitPartitionBatch的第4個(gè)參數(shù))积瞒,但是只有一個(gè)批次成功以后X才會(huì)更新到ZooKeeper中君仆,如果失敗重發(fā)最爬,emitPartitionBatch讀取的X還是舊的铃岔。所以這時(shí)候自定義的X不需要記錄當(dāng)前批次的開始位置和下一批次的開始位置兩個(gè)值汪疮,只需要記錄下一批次開始位置一個(gè)值即可,例如:
public class BatchMeta{
public long nextOffset; //下一批次的偏移量
}
實(shí)例
制造一批數(shù)據(jù)對數(shù)據(jù)按照時(shí)間進(jìn)行內(nèi)的進(jìn)行統(tǒng)計(jì)
定義不透明分區(qū)事務(wù)的spout毁习,在構(gòu)造函數(shù)中制造數(shù)據(jù)源
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class MyOpaquePtTxSpout implements IOpaquePartitionedTransactionalSpout<MyMdata> {
private static final long serialVersionUID = -1889920861418121463L;
public static Map<Integer,Map<Long,String>> Partion_Data_Base = new HashMap<Integer, Map<Long, String>>();
public static int PART_COUNT = 5;
public MyOpaquePtTxSpout() {
Random _rand = new Random();
String[] hosts={"www.haoyidao.com"};
String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
"CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53",
"2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
for (int i=0;i<PART_COUNT;i++){
Map<Long,String> map = new HashMap<Long, String>();
for (long j=0;j<30;j++){
map.put(j,hosts[0]+"\t"+session_id[_rand.nextInt(5)]+"\t"+_rand.nextInt(8));
}
Partion_Data_Base.put(i,map);
}
System.err.println("MtPtTxSpout start.....");
}
public Emitter<MyMdata> getEmitter(Map map, TopologyContext topologyContext) {
return new MyEmitter();
}
public Coordinator getCoordinator(Map map, TopologyContext topologyContext) {
return new MyCoordinator();
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("tx","today","log"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
public class MyCoordinator implements IOpaquePartitionedTransactionalSpout.Coordinator{
public boolean isReady() {
Utils.sleep(1000);
return true;
}
public void close() {
}
}
public static final long BITCHNUM = 10;
public class MyEmitter implements IOpaquePartitionedTransactionalSpout.Emitter<MyMdata>{
public MyMdata emitPartitionBatch(TransactionAttempt transactionAttempt, BatchOutputCollector batchOutputCollector, int i, MyMdata myMdata) {
long beginPoint = 0;
if (myMdata == null){
beginPoint = 0;
}else {
beginPoint = myMdata.getBeginPoint()+myMdata.getNum();
}
MyMdata myMdata1 = new MyMdata();
myMdata1.setBeginPoint(beginPoint);
myMdata1.setNum(BITCHNUM);
System.err.println("啟動(dòng)一個(gè)事務(wù):"+myMdata1.toString());
//進(jìn)行發(fā)射數(shù)據(jù)
Map<Long,String> batchMap = Partion_Data_Base.get(i);
for (Long j = myMdata1.getBeginPoint();j<myMdata1.getBeginPoint()+myMdata1.getNum();j++){
//表示一個(gè)分區(qū)消息發(fā)送完了
if (batchMap.size()<j){
break;
}
batchOutputCollector.emit(new Values(transactionAttempt,"2017-08-09",batchMap.get(j)));
}
return myMdata1;
}
public int numPartitions() {
return 5;
}
public void close() {
}
}
}
定義bolt
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.coordination.IBatchBolt;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
public class Mybolt implements IBatchBolt<TransactionAttempt> {
private static Map<String,Integer> countMap = new HashMap<String, Integer>();
private BatchOutputCollector batchOutputCollector;
private TransactionAttempt transactionAttempt;
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
String today;
Integer count;
public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
this.batchOutputCollector = batchOutputCollector;
this.transactionAttempt = transactionAttempt;
}
public void execute(Tuple tuple) {
TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);
String log = (String) tuple.getValue(1);
String[] strings = log.split("\t");
today = "2014-01-07";
count = countMap.get(today);
if (count == null){
count = 0;
}
count++;
countMap.put(today,count);
}
public void finishBatch() {
System.err.println(this.transactionAttempt.toString()+"---"+today+"----"+count);
this.batchOutputCollector.emit(new Values(this.transactionAttempt,today,count));
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("tx","today","count"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
定義提交報(bào)告的commit
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.ICommitter;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
public class MyCommitbolt extends BaseTransactionalBolt implements ICommitter{
private static final long serialVersionUID = 1550322821850864035L;
public static Map<String,DbValue> dbValueMap = new HashMap<String, DbValue>();
Map<String,Integer> countMap = new HashMap<String, Integer>();
TransactionAttempt transactionAttempt;
BatchOutputCollector collector;
String today = null;
public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
this.transactionAttempt = transactionAttempt;
this.collector = batchOutputCollector;
}
public void execute(Tuple tuple) {
today = tuple.getString(1);
Integer count = tuple.getInteger(2);
transactionAttempt = (TransactionAttempt) tuple.getValue(0);
if (today!=null&&count!=null){
Integer batchCount = countMap.get(today);
if (batchCount==null){
batchCount = 0;
}
batchCount = batchCount+count;
countMap.put(today,batchCount);
}
}
public void finishBatch() {
if (countMap.size()>0){
DbValue dbValue = dbValueMap.get(today);
DbValue newValue;
if (dbValue==null||!dbValue.txid.equals(transactionAttempt.getTransactionId())){
newValue = new DbValue();
newValue.txid = transactionAttempt.getTransactionId();
newValue.dataStr = today;
if (dbValue==null){
newValue.count = countMap.get(today);
newValue.pre_cout = 0;
}else {
newValue.count = dbValue.count+countMap.get(today);
newValue.pre_cout = dbValue.count;
}
dbValueMap.put(today,newValue);
}
System.out.println("total==========:"+dbValueMap.get(today).count);
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
public static class DbValue{
BigInteger txid;
int count = 0;
String dataStr;
int pre_cout;
}
}
定義topo結(jié)構(gòu)
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.ICommitter;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
public class MyCommitbolt extends BaseTransactionalBolt implements ICommitter{
private static final long serialVersionUID = 1550322821850864035L;
public static Map<String,DbValue> dbValueMap = new HashMap<String, DbValue>();
Map<String,Integer> countMap = new HashMap<String, Integer>();
TransactionAttempt transactionAttempt;
BatchOutputCollector collector;
String today = null;
public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
this.transactionAttempt = transactionAttempt;
this.collector = batchOutputCollector;
}
public void execute(Tuple tuple) {
today = tuple.getString(1);
Integer count = tuple.getInteger(2);
transactionAttempt = (TransactionAttempt) tuple.getValue(0);
if (today!=null&&count!=null){
Integer batchCount = countMap.get(today);
if (batchCount==null){
batchCount = 0;
}
batchCount = batchCount+count;
countMap.put(today,batchCount);
}
}
public void finishBatch() {
if (countMap.size()>0){
DbValue dbValue = dbValueMap.get(today);
DbValue newValue;
if (dbValue==null||!dbValue.txid.equals(transactionAttempt.getTransactionId())){
newValue = new DbValue();
newValue.txid = transactionAttempt.getTransactionId();
newValue.dataStr = today;
if (dbValue==null){
newValue.count = countMap.get(today);
newValue.pre_cout = 0;
}else {
newValue.count = dbValue.count+countMap.get(today);
newValue.pre_cout = dbValue.count;
}
dbValueMap.put(today,newValue);
}
System.out.println("total==========:"+dbValueMap.get(today).count);
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
public static class DbValue{
BigInteger txid;
int count = 0;
String dataStr;
int pre_cout;
}
}