聊聊HystrixEventStream

本文主要研究一下HystrixEventStream

HystrixEventStream

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/HystrixEventStream.java

/**
 * Base interface for a stream of {@link com.netflix.hystrix.HystrixEventType}s.  Allows consumption by individual
 * {@link com.netflix.hystrix.HystrixEventType} or by time-based bucketing of events
 */
public interface HystrixEventStream<E extends HystrixEvent> {

    Observable<E> observe();
}

這個(gè)接口定義了一個(gè)observe方法,返回的是rxjava的Observable,它有如下幾個(gè)實(shí)現(xiàn)類

  • HystrixCommandStartStream
  • HystrixCommandCompletionStream
  • HystrixThreadPoolStartStream
  • HystrixThreadPoolCompletionStream
  • HystrixCollapserEventStream

HystrixCommandStartStream

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/HystrixCommandStartStream.java

/**
 * Per-Command stream of {@link HystrixCommandExecutionStarted}s.  This gets written to by {@link HystrixThreadEventStream}s.
 * Events are emitted synchronously in the same thread that performs the command execution.
 */
public class HystrixCommandStartStream implements HystrixEventStream<HystrixCommandExecutionStarted> {
    private final HystrixCommandKey commandKey;

    private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlySubject;
    private final Observable<HystrixCommandExecutionStarted> readOnlyStream;

    private static final ConcurrentMap<String, HystrixCommandStartStream> streams = new ConcurrentHashMap<String, HystrixCommandStartStream>();

    public static HystrixCommandStartStream getInstance(HystrixCommandKey commandKey) {
        HystrixCommandStartStream initialStream = streams.get(commandKey.name());
        if (initialStream != null) {
            return initialStream;
        } else {
            synchronized (HystrixCommandStartStream.class) {
                HystrixCommandStartStream existingStream = streams.get(commandKey.name());
                if (existingStream == null) {
                    HystrixCommandStartStream newStream = new HystrixCommandStartStream(commandKey);
                    streams.putIfAbsent(commandKey.name(), newStream);
                    return newStream;
                } else {
                    return existingStream;
                }
            }
        }
    }

    HystrixCommandStartStream(final HystrixCommandKey commandKey) {
        this.commandKey = commandKey;

        this.writeOnlySubject = new SerializedSubject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted>(PublishSubject.<HystrixCommandExecutionStarted>create());
        this.readOnlyStream = writeOnlySubject.share();
    }

    public static void reset() {
        streams.clear();
    }

    public void write(HystrixCommandExecutionStarted event) {
        writeOnlySubject.onNext(event);
    }

    @Override
    public Observable<HystrixCommandExecutionStarted> observe() {
        return readOnlyStream;
    }

    @Override
    public String toString() {
        return "HystrixCommandStartStream(" + commandKey.name() + ")";
    }
}

HystrixThreadEventStream會(huì)同步寫入HystrixCommandExecutionStarted,這里的write方法調(diào)用了writeOnlySubject的onNext方法

HystrixThreadEventStream

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/HystrixThreadEventStream.java

/**
 * Per-thread event stream.  No synchronization required when writing to it since it's single-threaded.
 *
 * Some threads will be dedicated to a single HystrixCommandKey (a member of a thread-isolated {@link HystrixThreadPool}.
 * However, many situations arise where a single thread may serve many different commands.  Examples include:
 * * Application caller threads (semaphore-isolated commands, or thread-pool-rejections)
 * * Timer threads (timeouts or collapsers)
 * <p>
 * I don't think that a thread-level view is an interesting one to consume (I could be wrong), so at the moment there
 * is no public way to consume it.  I can always add it later, if desired.
 * <p>
 * Instead, this stream writes to the following streams, which have more meaning to metrics consumers:
 * <ul>
 *     <li>{@link HystrixCommandCompletionStream}</li>
 *     <li>{@link HystrixCommandStartStream}</li>
 *     <li>{@link HystrixThreadPoolCompletionStream}</li>
 *     <li>{@link HystrixThreadPoolStartStream}</li>
 *     <li>{@link HystrixCollapserEventStream}</li>
 * </ul>
 *
 * Also note that any observers of this stream do so on the thread that writes the metric.  This is the command caller
 * thread in the SEMAPHORE-isolated case, and the Hystrix thread in the THREAD-isolated case. I determined this to
 * be more efficient CPU-wise than immediately hopping off-thread and doing all the metric calculations in the
 * RxComputationThreadPool.
 */
public class HystrixThreadEventStream {
    private final long threadId;
    private final String threadName;

    private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlyCommandStartSubject;
    private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject;
    private final Subject<HystrixCollapserEvent, HystrixCollapserEvent> writeOnlyCollapserSubject;

    private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() {
        @Override
        protected HystrixThreadEventStream initialValue() {
            return new HystrixThreadEventStream(Thread.currentThread());
        }
    };

    private static final Action1<HystrixCommandExecutionStarted> writeCommandStartsToShardedStreams = new Action1<HystrixCommandExecutionStarted>() {
        @Override
        public void call(HystrixCommandExecutionStarted event) {
            HystrixCommandStartStream commandStartStream = HystrixCommandStartStream.getInstance(event.getCommandKey());
            commandStartStream.write(event);

            if (event.isExecutedInThread()) {
                HystrixThreadPoolStartStream threadPoolStartStream = HystrixThreadPoolStartStream.getInstance(event.getThreadPoolKey());
                threadPoolStartStream.write(event);
            }
        }
    };

    private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
        @Override
        public void call(HystrixCommandCompletion commandCompletion) {
            HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
            commandStream.write(commandCompletion);

            if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) {
                HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey());
                threadPoolStream.write(commandCompletion);
            }
        }
    };

    private static final Action1<HystrixCollapserEvent> writeCollapserExecutionsToShardedStreams = new Action1<HystrixCollapserEvent>() {
        @Override
        public void call(HystrixCollapserEvent collapserEvent) {
            HystrixCollapserEventStream collapserStream = HystrixCollapserEventStream.getInstance(collapserEvent.getCollapserKey());
            collapserStream.write(collapserEvent);
        }
    };

    /* package */ HystrixThreadEventStream(Thread thread) {
        this.threadId = thread.getId();
        this.threadName = thread.getName();
        writeOnlyCommandStartSubject = PublishSubject.create();
        writeOnlyCommandCompletionSubject = PublishSubject.create();
        writeOnlyCollapserSubject = PublishSubject.create();

        writeOnlyCommandStartSubject
                .onBackpressureBuffer()
                .doOnNext(writeCommandStartsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());

        writeOnlyCommandCompletionSubject
                .onBackpressureBuffer()
                .doOnNext(writeCommandCompletionsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());

        writeOnlyCollapserSubject
                .onBackpressureBuffer()
                .doOnNext(writeCollapserExecutionsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());
    }

    public static HystrixThreadEventStream getInstance() {
        return threadLocalStreams.get();
    }

    public void shutdown() {
        writeOnlyCommandStartSubject.onCompleted();
        writeOnlyCommandCompletionSubject.onCompleted();
        writeOnlyCollapserSubject.onCompleted();
    }

    public void commandExecutionStarted(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey,
                                        HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy, int currentConcurrency) {
        HystrixCommandExecutionStarted event = new HystrixCommandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentConcurrency);
        writeOnlyCommandStartSubject.onNext(event);
    }

    public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
        HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
        writeOnlyCommandCompletionSubject.onNext(event);
    }

    public void collapserResponseFromCache(HystrixCollapserKey collapserKey) {
        HystrixCollapserEvent collapserEvent = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.RESPONSE_FROM_CACHE, 1);
        writeOnlyCollapserSubject.onNext(collapserEvent);
    }

    public void collapserBatchExecuted(HystrixCollapserKey collapserKey, int batchSize) {
        HystrixCollapserEvent batchExecution = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.BATCH_EXECUTED, 1);
        HystrixCollapserEvent batchAdditions = HystrixCollapserEvent.from(collapserKey, HystrixEventType.Collapser.ADDED_TO_BATCH, batchSize);
        writeOnlyCollapserSubject.onNext(batchExecution);
        writeOnlyCollapserSubject.onNext(batchAdditions);
    }

    @Override
    public String toString() {
        return "HystrixThreadEventStream (" + threadId + " - " + threadName + ")";
    }
}
  • writeCommandStartsToShardedStreams會(huì)往HystrixCommandStartStream寫入HystrixCommandExecutionStarted,如果是執(zhí)行線程則調(diào)用threadPoolStartStream.write(event);
  • writeCommandCompletionsToShardedStreams會(huì)往HystrixCommandCompletionStream寫入HystrixCommandCompletion蚕泽,如果是執(zhí)行線程則調(diào)用threadPoolStream.write(commandCompletion);
  • writeCollapserExecutionsToShardedStreams會(huì)往HystrixCollapserEventStream寫入HystrixCollapserEvent
  • commandExecutionStarted方法調(diào)用writeOnlyCommandStartSubject.onNext(event)
  • executionDone方法調(diào)用writeOnlyCommandCompletionSubject.onNext(event)
  • collapserResponseFromCache方法調(diào)用writeOnlyCollapserSubject.onNext(collapserEvent)

HystrixCommandMetrics

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixCommandMetrics.java

/**
 * Used by {@link HystrixCommand} to record metrics.
 */
public class HystrixCommandMetrics extends HystrixMetrics {
    //......
    /* package-private */ void markCommandStart(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
        int currentCount = concurrentExecutionCount.incrementAndGet();
        HystrixThreadEventStream.getInstance().commandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentCount);
    }

    /* package-private */ void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
        HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
        if (executionStarted) {
            concurrentExecutionCount.decrementAndGet();
        }
    }
    //......
}

HystrixCommandMetrics里頭的markCommandStart以及markCommandDone方法會(huì)調(diào)用HystrixThreadEventStream獲取實(shí)例,然后寫入指標(biāo)

HystrixCollapserMetrics

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixCollapserMetrics.java

/**
 * Used by {@link HystrixCollapser} to record metrics.
 * {@link HystrixEventNotifier} not hooked up yet.  It may be in the future.
 */
public class HystrixCollapserMetrics extends HystrixMetrics {
    //......
    public void markRequestBatched() {
    }

    public void markResponseFromCache() {
        HystrixThreadEventStream.getInstance().collapserResponseFromCache(collapserKey);
    }

    public void markBatch(int batchSize) {
        HystrixThreadEventStream.getInstance().collapserBatchExecuted(collapserKey, batchSize);
    }

    public void markShards(int numShards) {
    }
    //......
}
  • HystrixCollapserMetrics里頭的markResponseFromCache以及markBatch方法會(huì)調(diào)用HystrixThreadEventStream獲取實(shí)例,然后寫入指標(biāo)
  • HystrixCollapser以及HystrixObservableCollapser中的toObservable方法會(huì)調(diào)用markResponseFromCache
  • HystrixCollapser以及HystrixObservableCollapser中的createObservableCommand方法會(huì)調(diào)用markBatch方法

AbstractCommand

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.java

markCommandStart

    private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
            // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }

                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                    if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                        // the command timed out in the wrapping thread so we will return immediately
                        // and not increment any of the counters below or other such logic
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                        //we have not been unsubscribed, so should proceed
                        HystrixCounters.incrementGlobalConcurrentThreads();
                        threadPool.markThreadExecution();
                        // store the command that is being run
                        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                        executionResult = executionResult.setExecutedInThread();
                        /**
                         * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
                         */
                        try {
                            executionHook.onThreadStart(_cmd);
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            return getUserExecutionObservable(_cmd);
                        } catch (Throwable ex) {
                            return Observable.error(ex);
                        }
                    } else {
                        //command has already been unsubscribed, so return immediately
                        return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                    }
                }
            }).doOnTerminate(new Action0() {
                @Override
                public void call() {
                    if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                        handleThreadEnd(_cmd);
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                        //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
                    }
                    //if it was unsubscribed, then other cleanup handled it
                }
            }).doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                        handleThreadEnd(_cmd);
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                        //if it was never started and was cancelled, then no need to clean up
                    }
                    //if it was terminal, then other cleanup handled it
                }
            }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        } else {
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }

                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                    // semaphore isolated
                    // store the command that is being run
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    try {
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
                    } catch (Throwable ex) {
                        //If the above hooks throw, then use that as the result of the run method
                        return Observable.error(ex);
                    }
                }
            });
        }
    }

executeCommandWithSpecifiedIsolation方法調(diào)用了markCommandStart方法

markCommandDone

    private void cleanUpAfterResponseFromCache(boolean commandExecutionStarted) {
        Reference<TimerListener> tl = timeoutTimer.get();
        if (tl != null) {
            tl.clear();
        }

        final long latency = System.currentTimeMillis() - commandStartTimestamp;
        executionResult = executionResult
                .addEvent(-1, HystrixEventType.RESPONSE_FROM_CACHE)
                .markUserThreadCompletion(latency)
                .setNotExecutedInThread();
        ExecutionResult cacheOnlyForMetrics = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE)
                .markUserThreadCompletion(latency);
        metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, commandExecutionStarted);
        eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey);
    }

    private void handleCommandEnd(boolean commandExecutionStarted) {
        Reference<TimerListener> tl = timeoutTimer.get();
        if (tl != null) {
            tl.clear();
        }

        long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
        executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
        if (executionResultAtTimeOfCancellation == null) {
            metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
        } else {
            metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
        }

        if (endCurrentThreadExecutingCommand != null) {
            endCurrentThreadExecutingCommand.call();
        }
    }

cleanUpAfterResponseFromCache以及handleCommandEnd方法調(diào)用了markCommandDone方法

HystrixEventStream.observe

BucketedCounterStream

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/BucketedCounterStream.java

public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {
    //......
    protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
                                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
        this.numBuckets = numBuckets;
        this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call(Observable<Event> eventBucket) {
                return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
            }
        };

        final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();
        for (int i = 0; i < numBuckets; i++) {
            emptyEventCountsToStart.add(getEmptyBucketSummary());
        }

        this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call() {
                return inputEventStream
                        .observe()
                        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
                        .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
                        .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
            }
        });
    }
    //......
}

這里調(diào)用了HystrixEventStream<Event>的observe方法來(lái)消費(fèi)event stream毅该,這里的Event是泛型博秫,即HystrixEvent的子類

RollingConcurrencyStream

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/RollingConcurrencyStream.java

public abstract class RollingConcurrencyStream {
    //......
    protected RollingConcurrencyStream(final HystrixEventStream<HystrixCommandExecutionStarted> inputEventStream, final int numBuckets, final int bucketSizeInMs) {
        final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
        for (int i = 0; i < numBuckets; i++) {
            emptyRollingMaxBuckets.add(0);
        }

        rollingMaxStream = inputEventStream
                .observe()
                .map(getConcurrencyCountFromEvent)
                .window(bucketSizeInMs, TimeUnit.MILLISECONDS)
                .flatMap(reduceStreamToMax)
                .startWith(emptyRollingMaxBuckets)
                .window(numBuckets, 1)
                .flatMap(reduceStreamToMax)
                .share()
                .onBackpressureDrop();
    }
    //......
}

這里調(diào)用了HystrixEventStream<HystrixCommandExecutionStarted>的observe方法來(lái)消費(fèi)event stream

RollingDistributionStream

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/metric/consumer/RollingDistributionStream.java

public class RollingDistributionStream<Event extends HystrixEvent> {
    //......
    protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
                                        final Func2<Histogram, Event, Histogram> addValuesToBucket) {
        final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
        for (int i = 0; i < numBuckets; i++) {
            emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
        }

        final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
            @Override
            public Observable<Histogram> call(Observable<Event> bucket) {
                return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
            }
        };

        rollingDistributionStream = stream
                .observe()
                .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //stream of unaggregated buckets
                .flatMap(reduceBucketToSingleDistribution)     //stream of aggregated Histograms
                .startWith(emptyDistributionsToStart)          //stream of aggregated Histograms that starts with n empty
                .window(numBuckets, 1)                         //windowed stream: each OnNext is a stream of n Histograms
                .flatMap(reduceWindowToSingleDistribution)     //reduced stream: each OnNext is a single Histogram
                .map(cacheHistogramValues)                     //convert to CachedValueHistogram (commonly-accessed values are cached)
                .share()
                .onBackpressureDrop();
    }
    //......
}

這里調(diào)用了HystrixEventStream<Event>的observe方法來(lái)消費(fèi)event stream,這里的Event是泛型眶掌,即HystrixEvent的子類

小結(jié)

HystrixEventStream是hystrix基于rxjava設(shè)計(jì)的一個(gè)reactive stream挡育,hystrix command在相應(yīng)的生命周期里頭會(huì)調(diào)用HystrixThreadEventStream獲取實(shí)例,往指定HystrixCommandKey的相關(guān)stream發(fā)布對(duì)應(yīng)的事件朴爬,形成event stream即寒,然后會(huì)有其他stream去消費(fèi)event stream然后形成對(duì)應(yīng)的metrics。

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末召噩,一起剝皮案震驚了整個(gè)濱河市母赵,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌具滴,老刑警劉巖凹嘲,帶你破解...
    沈念sama閱讀 216,470評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異构韵,居然都是意外死亡周蹭,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門疲恢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)凶朗,“玉大人,你說(shuō)我怎么就攤上這事显拳∨锓撸” “怎么了?”我有些...
    開封第一講書人閱讀 162,577評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵萎攒,是天一觀的道長(zhǎng)遇八。 經(jīng)常有香客問(wèn)我,道長(zhǎng)耍休,這世上最難降的妖魔是什么刃永? 我笑而不...
    開封第一講書人閱讀 58,176評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮羊精,結(jié)果婚禮上斯够,老公的妹妹穿的比我還像新娘。我一直安慰自己喧锦,他們只是感情好读规,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,189評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著燃少,像睡著了一般束亏。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上阵具,一...
    開封第一講書人閱讀 51,155評(píng)論 1 299
  • 那天碍遍,我揣著相機(jī)與錄音定铜,去河邊找鬼。 笑死怕敬,一個(gè)胖子當(dāng)著我的面吹牛揣炕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播东跪,決...
    沈念sama閱讀 40,041評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼畸陡,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了虽填?” 一聲冷哼從身側(cè)響起丁恭,我...
    開封第一講書人閱讀 38,903評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎卤唉,沒(méi)想到半個(gè)月后涩惑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,319評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡桑驱,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,539評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了跛蛋。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片熬的。...
    茶點(diǎn)故事閱讀 39,703評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖赊级,靈堂內(nèi)的尸體忽然破棺而出押框,到底是詐尸還是另有隱情,我是刑警寧澤理逊,帶...
    沈念sama閱讀 35,417評(píng)論 5 343
  • 正文 年R本政府宣布橡伞,位于F島的核電站,受9級(jí)特大地震影響晋被,放射性物質(zhì)發(fā)生泄漏兑徘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,013評(píng)論 3 325
  • 文/蒙蒙 一羡洛、第九天 我趴在偏房一處隱蔽的房頂上張望挂脑。 院中可真熱鬧,春花似錦欲侮、人聲如沸崭闲。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)刁俭。三九已至,卻和暖如春韧涨,著一層夾襖步出監(jiān)牢的瞬間牍戚,已是汗流浹背侮繁。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留翘魄,地道東北人鼎天。 一個(gè)月前我還...
    沈念sama閱讀 47,711評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像暑竟,于是被迫代替她去往敵國(guó)和親斋射。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,601評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容