聊聊storm的ack機制

本文主要研究一下storm的ack機制


image

實例

SentenceSpout

public class AckSentenceSpout extends BaseRichSpout {

    private ConcurrentHashMap<UUID, Values> pending;

    private SpoutOutputCollector collector;

    private int index = 0;

    private String[] sentences = {
            "my dog has fleas",
            "i like cold beverages",
            "the dog ate my homework",
            "don't have a cow man",
            "i don't think i like fleas"
    };

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.pending = new ConcurrentHashMap<UUID, Values>();
    }

    @Override
    public void nextTuple() {
        Values values = new Values(sentences[index]);
        UUID msgId = UUID.randomUUID();
        this.pending.put(msgId, values);
//        this.collector.emit(values);
        //NOTE 這里要傳入msgId
        this.collector.emit(values, msgId);
        index++;
        if (index >= sentences.length) {
            index = 0;
        }
        Utils.sleep(100);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    @Override
    public void ack(Object msgId) {
        this.pending.remove(msgId);
    }

    //NOTE 對于ack是失敗的制妄,要重新發(fā)送
    @Override
    public void fail(Object msgId) {
        this.collector.emit(this.pending.get(msgId), msgId);
    }
}
  • 對spout來說,需要在emit的時候要指定msgId买羞,然后需要緩存數(shù)據(jù)爷光,在ack時刪除峻贮,在fail的時候重新發(fā)送進行重試

AckWordCountBolt

public class AckWordCountBolt extends BaseRichBolt {
    private static final Logger LOGGER = LoggerFactory.getLogger(AckWordCountBolt.class);
    private OutputCollector collector;
    private HashMap<String, Long> counts = null;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.counts = new HashMap<String, Long>();
    }

    public void execute(Tuple tuple) {
        try{
            String word = tuple.getStringByField("word");
            Long count = this.counts.get(word);
            if(count == null){
                count = 0L;
            }
            count++;
            this.counts.put(word, count);

            //NOTE 傳入當前處理的tuple作為anchor
            this.collector.emit(tuple, new Values(word, count));

            //NOTE 這里要自己ack
            this.collector.ack(tuple);
        }catch (Exception e){
            LOGGER.error(e.getMessage(),e);
            //NOTE 處理異常要fail
            this.collector.fail(tuple);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
  • 對于bolt來說席怪,要做兩件事情,一是要anchor纤控,在emit的時候把輸入及輸出tuple連接起來挂捻,構(gòu)建tuple tree;而要對處理完的tuple進行ack船万,失敗進行fail操作

源碼解析

SpoutOutputCollectorImpl.emit

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java

    @Override
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        try {
            return sendSpoutMsg(streamId, tuple, messageId, null);
        } catch (InterruptedException e) {
            LOG.warn("Spout thread interrupted during emit().");
            throw new RuntimeException(e);
        }
    }

    private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) throws
        InterruptedException {
        emittedCount.increment();

        List<Integer> outTasks;
        if (outTaskId != null) {
            outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
        } else {
            outTasks = taskData.getOutgoingTasks(stream, values);
        }

        final boolean needAck = (messageId != null) && hasAckers;

        final List<Long> ackSeq = needAck ? new ArrayList<>() : null;

        final long rootId = needAck ? MessageId.generateId(random) : 0;

        for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators.
            Integer t = outTasks.get(i);
            MessageId msgId;
            if (needAck) {
                long as = MessageId.generateId(random);
                msgId = MessageId.makeRootId(rootId, as);
                ackSeq.add(as);
            } else {
                msgId = MessageId.makeUnanchored();
            }

            final TupleImpl tuple =
                new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
            AddressedTuple adrTuple = new AddressedTuple(t, tuple);
            executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
        }
        if (isEventLoggers) {
            taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
        }

        if (needAck) {
            boolean sample = executor.samplerCheck();
            TupleInfo info = new TupleInfo();
            info.setTaskId(this.taskId);
            info.setStream(stream);
            info.setMessageId(messageId);
            if (isDebug) {
                info.setValues(values);
            }
            if (sample) {
                info.setTimestamp(System.currentTimeMillis());
            }

            pending.put(rootId, info);
            List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
            taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits());
        } else if (messageId != null) {
            // Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical
            if (isDebug) {
                if (spoutExecutorThdId != Thread.currentThread().getId()) {
                    throw new RuntimeException("Detected background thread emitting tuples for the spout. " +
                                               "Spout Output Collector should only emit from the main spout executor thread.");
                }
            }
            globalTupleInfo.clear();
            globalTupleInfo.setStream(stream);
            globalTupleInfo.setValues(values);
            globalTupleInfo.setMessageId(messageId);
            globalTupleInfo.setTimestamp(0);
            globalTupleInfo.setId("0:");
            Long timeDelta = 0L;
            executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo);
        }
        return outTasks;
    }
  • 對于needAck的刻撒,首先創(chuàng)建rootId,然后調(diào)用ackSeq.add(as)耿导,之后觸發(fā)taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits())操作

BoltOutputCollectorImpl.ack&fail

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java

    @Override
    public void ack(Tuple input) {
        if (!ackingEnabled) {
            return;
        }
        long ackValue = ((TupleImpl) input).getAckVal();
        Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
        for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
            task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID,
                                new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
                                executor.getExecutorTransfer(), executor.getPendingEmits());
        }
        long delta = tupleTimeDelta((TupleImpl) input);
        if (isDebug) {
            LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
        }

        if (!task.getUserContext().getHooks().isEmpty()) {
            BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
            boltAckInfo.applyOn(task.getUserContext());
        }
        if (delta >= 0) {
            executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta,
                                               task.getTaskMetrics().getAcked(input.getSourceStreamId()));
        }
    }

    @Override
    public void fail(Tuple input) {
        if (!ackingEnabled) {
            return;
        }
        Set<Long> roots = input.getMessageId().getAnchors();
        for (Long root : roots) {
            task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
                                new Values(root), executor.getExecutorTransfer(), executor.getPendingEmits());
        }
        long delta = tupleTimeDelta((TupleImpl) input);
        if (isDebug) {
            LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
        }
        BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
        boltFailInfo.applyOn(task.getUserContext());
        if (delta >= 0) {
            executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta,
                                                task.getTaskMetrics().getFailed(input.getSourceStreamId()));
        }
    }
  • BoltOutputCollectorImpl的ack及fail均是調(diào)用task.sendUnanchored操作
  • ack發(fā)送到Acker.ACKER_ACK_STREAM_ID声怔,fail發(fā)送到Acker.ACKER_FAIL_STREAM_ID

Task.sendUnanchored

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java

    // Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument
    public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) {
        Tuple tuple = getTuple(stream, values);
        List<Integer> tasks = getOutgoingTasks(stream, values);
        for (Integer t : tasks) {
            AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
            transfer.tryTransfer(addressedTuple, pendingEmits);
        }
    }
  • 這里調(diào)用了ExecutorTransfer.tryTransfer

ExecutorTransfer.tryTransfer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java

    // adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
    public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
        if (isDebug) {
            LOG.info("TRANSFERRING tuple {}", addressedTuple);
        }

        JCQueue localQueue = getLocalQueue(addressedTuple);
        if (localQueue != null) {
            return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
        }
        return workerData.tryTransferRemote(addressedTuple, pendingEmits, serializer);
    }

    /**
     * Adds tuple to localQueue (if overflow is empty). If localQueue is full adds to pendingEmits instead. pendingEmits can be null.
     * Returns false if unable to add to localQueue.
     */
    public boolean tryTransferLocal(AddressedTuple tuple, JCQueue localQueue, Queue<AddressedTuple> pendingEmits) {
        workerData.checkSerialize(serializer, tuple);
        if (pendingEmits != null) {
            if (pendingEmits.isEmpty() && localQueue.tryPublish(tuple)) {
                queuesToFlush.set(tuple.dest - indexingBase, localQueue);
                return true;
            } else {
                pendingEmits.add(tuple);
                return false;
            }
        } else {
            return localQueue.tryPublish(tuple);
        }
    }
  • 這里先根據(jù)addressedTuple判斷目標隊列是否是本地,是的話舱呻,調(diào)用tryTransferLocal醋火;不是的話悠汽,則調(diào)用workerData.tryTransferRemote
  • tryTransferLocal操作,執(zhí)行的localQueue.tryPublish芥驳,就是將數(shù)據(jù)放到JCQueue的recvQueue隊列中
  • workerData.tryTransferRemote的話柿冲,是通過WorkerTransfer將數(shù)據(jù)放到TransferDrainer,在flush的時候傳輸?shù)竭h程的node節(jié)點

StormCommon.systemTopology

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

    public static StormTopology systemTopology(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
        return _instance.systemTopologyImpl(topoConf, topology);
    }

    protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
        validateBasic(topology);

        StormTopology ret = topology.deepCopy();
        addAcker(topoConf, ret);
        if (hasEventLoggers(topoConf)) {
            addEventLogger(topoConf, ret);
        }
        addMetricComponents(topoConf, ret);
        addSystemComponents(topoConf, ret);
        addMetricStreams(ret);
        addSystemStreams(ret);

        validateStructure(ret);

        return ret;
    }

    public static void addAcker(Map<String, Object> conf, StormTopology topology) {
        int ackerNum =
            ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
        Map<GlobalStreamId, Grouping> inputs = ackerInputs(topology);

        Map<String, StreamInfo> outputStreams = new HashMap<String, StreamInfo>();
        outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
        outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));
        outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms")));

        Map<String, Object> ackerConf = new HashMap<>();
        ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum);
        ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));

        Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf);

        for (Bolt bolt : topology.get_bolts().values()) {
            ComponentCommon common = bolt.get_common();
            common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val")));
            common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
            common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id")));
        }

        for (SpoutSpec spout : topology.get_spouts().values()) {
            ComponentCommon common = spout.get_common();
            Map<String, Object> spoutConf = componentConf(spout);
            spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
                          ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
            common.set_json_conf(JSONValue.toJSONString(spoutConf));
            common.put_to_streams(Acker.ACKER_INIT_STREAM_ID,
                                  Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task")));
            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID),
                                 Thrift.prepareDirectGrouping());
            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID),
                                 Thrift.prepareDirectGrouping());
            common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
                                 Thrift.prepareDirectGrouping());
        }

        topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker);
    }

    public static Map<GlobalStreamId, Grouping> ackerInputs(StormTopology topology) {
        Map<GlobalStreamId, Grouping> inputs = new HashMap<>();
        Set<String> boltIds = topology.get_bolts().keySet();
        Set<String> spoutIds = topology.get_spouts().keySet();

        for (String id : spoutIds) {
            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID),
                       Thrift.prepareFieldsGrouping(Arrays.asList("id")));
        }

        for (String id : boltIds) {
            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID),
                       Thrift.prepareFieldsGrouping(Arrays.asList("id")));
            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID),
                       Thrift.prepareFieldsGrouping(Arrays.asList("id")));
            inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID),
                       Thrift.prepareFieldsGrouping(Arrays.asList("id")));
        }
        return inputs;
    }

    public static IBolt makeAckerBolt() {
        return _instance.makeAckerBoltImpl();
    }

    public IBolt makeAckerBoltImpl() {
        return new Acker();
    }
  • WorkerState構(gòu)造器里頭調(diào)用了systemTopology方法兆旬,添加了一些系統(tǒng)的組件假抄,比如Acker、MetricsConsumerBolt丽猬、SystemBolt
  • addAcker執(zhí)行了創(chuàng)建ack的邏輯宿饱,ackerNum為ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS))),即如果Config.TOPOLOGY_ACKER_EXECUTORS沒有配置脚祟,則取Config.TOPOLOGY_WORKERS的值
  • 這里對ack配置了Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS刑棵,值為ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)),也就是Acker配置了tickTuple愚铡,Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS的時候觸發(fā)超時操作
  • Thrift.prepareSerializedBoltDetails傳入?yún)?shù)的時候,調(diào)用makeAckerBolt()方法胡陪,創(chuàng)建Acker
  • ack里頭對input及output配置了Acker.ACKER_ACK_STREAM_ID沥寥、Acker.ACKER_FAIL_STREAM_ID
  • addAcker對spout配置了Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,Acker.ACKER_ACK_STREAM_ID柠座、Acker.ACKER_FAIL_STREAM_ID邑雅、Acker.ACKER_RESET_TIMEOUT_STREAM_ID

Acker

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Acker.java

public class Acker implements IBolt {
    public static final String ACKER_COMPONENT_ID = "__acker";
    public static final String ACKER_INIT_STREAM_ID = "__ack_init";
    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
    public static final String ACKER_RESET_TIMEOUT_STREAM_ID = "__ack_reset_timeout";
    public static final int TIMEOUT_BUCKET_NUM = 3;
    private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
    private static final long serialVersionUID = 4430906880683183091L;
    private OutputCollector collector;
    private RotatingMap<Object, AckObject> pending;

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.pending = new RotatingMap<>(TIMEOUT_BUCKET_NUM);
    }

    @Override
    public void execute(Tuple input) {
        if (TupleUtils.isTick(input)) {
            Map<Object, AckObject> tmp = pending.rotate();
            LOG.debug("Number of timeout tuples:{}", tmp.size());
            return;
        }

        boolean resetTimeout = false;
        String streamId = input.getSourceStreamId();
        Object id = input.getValue(0);
        AckObject curr = pending.get(id);
        if (ACKER_INIT_STREAM_ID.equals(streamId)) {
            if (curr == null) {
                curr = new AckObject();
                pending.put(id, curr);
            }
            curr.updateAck(input.getLong(1));
            curr.spoutTask = input.getInteger(2);
        } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
            if (curr == null) {
                curr = new AckObject();
                pending.put(id, curr);
            }
            curr.updateAck(input.getLong(1));
        } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
            // For the case that ack_fail message arrives before ack_init
            if (curr == null) {
                curr = new AckObject();
            }
            curr.failed = true;
            pending.put(id, curr);
        } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
            resetTimeout = true;
            if (curr != null) {
                pending.put(id, curr);
            } //else if it has not been added yet, there is no reason time it out later on
        } else if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
            collector.flush();
            return;
        } else {
            LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
            return;
        }

        int task = curr.spoutTask;
        if (task >= 0 && (curr.val == 0 || curr.failed || resetTimeout)) {
            Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
            if (curr.val == 0) {
                pending.remove(id);
                collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple);
            } else if (curr.failed) {
                pending.remove(id);
                collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple);
            } else if (resetTimeout) {
                collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple);
            } else {
                throw new IllegalStateException("The checks are inconsistent we reach what should be unreachable code.");
            }
        }

        collector.ack(input);
    }

    @Override
    public void cleanup() {
        LOG.info("Acker: cleanup successfully");
    }

    private long getTimeDeltaMillis(long startTimeMillis) {
        return Time.currentTimeMillis() - startTimeMillis;
    }

    private static class AckObject {
        public long val = 0L;
        public long startTime = Time.currentTimeMillis();
        public int spoutTask = -1;
        public boolean failed = false;

        // val xor value
        public void updateAck(Long value) {
            val = Utils.bitXor(val, value);
        }
    }
}
  • 對于tickTuple,執(zhí)行RotatingMap.rotate操作
  • 對于成功則調(diào)用AckObject的updateAck操作妈经,對于失敗的重新放回pending中
  • 最后判斷淮野,如果AckObject的val為0的話,表示整個tuple tree都操作成功吹泡,則往ACKER_ACK_STREAM_ID通知骤星;如果是failed的則往ACKER_FAIL_STREAM_ID通知;如果是resetTimeout的則往ACKER_RESET_TIMEOUT_STREAM_ID通知

SpoutExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

public class SpoutExecutor extends Executor {
    //......
    @Override
    public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {
        String streamId = tuple.getSourceStreamId();
        if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
            spoutOutputCollector.flush();
        } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
            pending.rotate();
        } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {
            metricsTick(idToTask.get(taskId - idToTaskBase), tuple);
        } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {
            Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();
            if (spoutObj instanceof ICredentialsListener) {
                ((ICredentialsListener) spoutObj).setCredentials((Map<String, String>) tuple.getValue(0));
            }
        } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) {
            Long id = (Long) tuple.getValue(0);
            TupleInfo pendingForId = pending.get(id);
            if (pendingForId != null) {
                pending.put(id, pendingForId);
            }
        } else {
            Long id = (Long) tuple.getValue(0);
            Long timeDeltaMs = (Long) tuple.getValue(1);
            TupleInfo tupleInfo = pending.remove(id);
            if (tupleInfo != null && tupleInfo.getMessageId() != null) {
                if (taskId != tupleInfo.getTaskId()) {
                    throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId());
                }
                Long timeDelta = null;
                if (hasAckers) {
                    long startTimeMs = tupleInfo.getTimestamp();
                    if (startTimeMs != 0) {
                        timeDelta = timeDeltaMs;
                    }
                }
                if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) {
                    ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo);
                } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) {
                    failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM");
                }
            }
        }
    }

    public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) {
        try {
            ISpout spout = (ISpout) taskData.getTaskObject();
            int taskId = taskData.getTaskId();
            if (executor.getIsDebug()) {
                LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId());
            }
            spout.ack(tupleInfo.getMessageId());
            if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary
                new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
            }
            if (hasAckers && timeDelta != null) {
                executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta,
                                                    taskData.getTaskMetrics().getAcked(tupleInfo.getStream()));
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) {
        try {
            ISpout spout = (ISpout) taskData.getTaskObject();
            int taskId = taskData.getTaskId();
            if (executor.getIsDebug()) {
                LOG.info("SPOUT Failing {} : {} REASON: {}", tupleInfo.getId(), tupleInfo, reason);
            }
            spout.fail(tupleInfo.getMessageId());
            new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
            if (timeDelta != null) {
                executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta,
                                                     taskData.getTaskMetrics().getFailed(tupleInfo.getStream()));
            }
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
}
  • SpoutExecutor在tupleActionFn里頭爆哑,如果接收到ACKER_ACK_STREAM_ID洞难,則進行ackSpoutMsg操作;如果接收到ACKER_FAIL_STREAM_ID揭朝,則進行failSpoutMsg操作
  • SpoutExecutor的ackSpoutMsg及failSpoutMsg里頭分別調(diào)用了具體spout的ack及fail方法队贱,將ack的結(jié)果通知到原始的spout

小結(jié)

  • storm通過ack機制保證least once processing的語義
  • storm在WorkerState構(gòu)造器里頭調(diào)用了systemTopology方法,對提交的topology添加了一些系統(tǒng)的組件潭袱,比如Acker柱嫌、MetricsConsumerBolt、SystemBolt屯换;addAcker里頭添加了acker编丘,也對spout進行了ack相關(guān)的配置
  • spout的emit方法如果帶messageId的話,則表示需要ack,然后會觸發(fā)taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits())操作
  • bolt通過BoltOutputCollectorImpl的ack或fail方法將ack信息發(fā)送出去瘪吏,里頭調(diào)用了task.sendUnanchored操作癣防,而該操作是調(diào)用ExecutorTransfer.tryTransfer,將addressedTuple發(fā)送到目標隊列(如果是遠程node則會遠程進行遠程調(diào)用)掌眠,發(fā)送到的stream為Acker.ACKER_ACK_STREAM_ID或者Acker.ACKER_FAIL_STREAM_ID
  • acker接收到Acker.ACKER_ACK_STREAM_ID調(diào)用AckObject的updateAck操作蕾盯,對于Acker.ACKER_FAIL_STREAM_ID則重新放回pending中,然后對AckObject的val進行判斷蓝丙,如果為0的話级遭,表示整個tuple tree都操作成功,則emitDirect往ACKER_ACK_STREAM_ID通知渺尘;如果是failed的則emitDirect往ACKER_FAIL_STREAM_ID通知對應(yīng)的task挫鸽;如果是resetTimeout的則往ACKER_RESET_TIMEOUT_STREAM_ID通知對應(yīng)的task
  • SpoutExecutor接收到接收到ACKER_ACK_STREAM_ID,則進行ackSpoutMsg操作鸥跟;接收到ACKER_FAIL_STREAM_ID丢郊,則進行failSpoutMsg操作;ackSpoutMsg及failSpoutMsg里頭分別調(diào)用了具體spout的ack及fail方法医咨,將ack的結(jié)果通知到原始的spout

doc

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末枫匾,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子拟淮,更是在濱河造成了極大的恐慌干茉,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,406評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件很泊,死亡現(xiàn)場離奇詭異角虫,居然都是意外死亡,警方通過查閱死者的電腦和手機委造,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,395評論 3 398
  • 文/潘曉璐 我一進店門戳鹅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人昏兆,你說我怎么就攤上這事粉楚。” “怎么了亮垫?”我有些...
    開封第一講書人閱讀 167,815評論 0 360
  • 文/不壞的土叔 我叫張陵模软,是天一觀的道長。 經(jīng)常有香客問我饮潦,道長燃异,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,537評論 1 296
  • 正文 為了忘掉前任继蜡,我火速辦了婚禮回俐,結(jié)果婚禮上逛腿,老公的妹妹穿的比我還像新娘。我一直安慰自己仅颇,他們只是感情好单默,可當我...
    茶點故事閱讀 68,536評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著忘瓦,像睡著了一般搁廓。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上耕皮,一...
    開封第一講書人閱讀 52,184評論 1 308
  • 那天境蜕,我揣著相機與錄音,去河邊找鬼凌停。 笑死粱年,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的罚拟。 我是一名探鬼主播台诗,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼赐俗!你這毒婦竟也來了拉庶?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,668評論 0 276
  • 序言:老撾萬榮一對情侶失蹤秃励,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后吉捶,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體夺鲜,經(jīng)...
    沈念sama閱讀 46,212評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,299評論 3 340
  • 正文 我和宋清朗相戀三年呐舔,在試婚紗的時候發(fā)現(xiàn)自己被綠了币励。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,438評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡珊拼,死狀恐怖食呻,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情澎现,我是刑警寧澤仅胞,帶...
    沈念sama閱讀 36,128評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站剑辫,受9級特大地震影響干旧,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜妹蔽,卻給世界環(huán)境...
    茶點故事閱讀 41,807評論 3 333
  • 文/蒙蒙 一椎眯、第九天 我趴在偏房一處隱蔽的房頂上張望挠将。 院中可真熱鬧,春花似錦编整、人聲如沸舔稀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,279評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽内贮。三九已至,卻和暖如春赏半,著一層夾襖步出監(jiān)牢的瞬間贺归,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,395評論 1 272
  • 我被黑心中介騙來泰國打工断箫, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留拂酣,地道東北人。 一個月前我還...
    沈念sama閱讀 48,827評論 3 376
  • 正文 我出身青樓仲义,卻偏偏與公主長得像婶熬,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子埃撵,可洞房花燭夜當晚...
    茶點故事閱讀 45,446評論 2 359

推薦閱讀更多精彩內(nèi)容