Disruptor
What? 是什么惜互?
高性能的無鎖隊列本昏。
大學(xué)在學(xué)習(xí)到隊列的時候值朋,老師是不是讓我們課下自己去實現(xiàn)阻塞隊列垛叨,大家還有印象么衡怀?沒有印象建議讀一讀java.util.concurrent.ArrayBlockingQueue
Why颇象? 為什么這么快
偽共享問題處理,緩沖行填充你虹,大量CAS操作绘搞,去鎖
無鎖環(huán)形隊列設(shè)計,位運算鎖定坑位等
個人圖解disruptor
ringbuffer.jpg
so 到目前為止傅物,如果讓你自己實現(xiàn)一個Disruptor 你會怎么做呢夯辖?
Disruptor HelloWorld
知其然,再知其所以然董饰。so 先知其然
(```)
//環(huán)形數(shù)組 位置的數(shù)據(jù)對象
public final class ValueEvent {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
/**
* 數(shù)據(jù)對象數(shù)據(jù)工廠
*/
public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() {
@Override
public ValueEvent newInstance() {
return new ValueEvent();
}
};
}
//demo main
public static void main(String[] args) throws InterruptedException {
//環(huán)形大小
int ringBufferSize = 2 << 2;
//disruptor
Disruptor<ValueEvent> disruptor = new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, ringBufferSize,
Executors.defaultThreadFactory(), ProducerType.MULTI, new BlockingWaitStrategy());
//消費端事件處理器
EventHandler<ValueEvent> eventHandler = new EventHandler<ValueEvent>() {
@Override
public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception {
System.out.println("Sequence: " + sequence);
System.out.println("ValueEvent: " + event.getValue());
}
};
disruptor.handleEventsWith(eventHandler);
//初始化disruptor
RingBuffer<ValueEvent> ringBuffer = disruptor.start();
Thread.sleep(3000L);
//生產(chǎn)端事件
for (long i = 0; i < 11; i++) {
String uuid = UUID.randomUUID().toString();
// Two phase commit. Grab one of the 1024 slots
long seq = ringBuffer.next();
ValueEvent valueEvent = ringBuffer.get(seq);
valueEvent.setValue(uuid);
ringBuffer.publish(seq);
}
disruptor.shutdown();
}
(```)
關(guān)鍵類個人理解
Disruptor:包裝了ringbuffer,消費組
RingBuffer:
數(shù)據(jù)結(jié)構(gòu):環(huán)形數(shù)組 2的n次階
核心屬性:sequencer發(fā)布坑位神器
next()方法:獲取下一個可以發(fā)布的坑位
publish(index)方法:發(fā)布坑位蒿褂,告訴消費者圆米,這個坑位可以消費了
Sequence:可以理解為一個增強的AtomicLong,解決了緩存行失效的問題啄栓,性能更好娄帖,代碼就不分析了,大家有興趣自己讀汝汲(主要是解決了偽共享的問題)
Sequencer:維護了當(dāng)前發(fā)布的坑位(單生成器近速,多生成器)
SequenceBarrier:跟蹤 生產(chǎn)者當(dāng)前發(fā)布坑位,消費者消費當(dāng)前坑位堪旧,依賴坑位等
WaitStrategy:等待策略確定消費者如何等待生產(chǎn)者將事件放入Disruptor
Event:從生產(chǎn)者傳遞給消費者的數(shù)據(jù)單位削葱。事件沒有特定的代碼表示,因為它完全由用戶定義淳梦。
EventProcessor:
用于處理來自Disruptor的事件的主事件循環(huán)析砸,消費坑位。
單線程:BatchEventProcessor
多線程 WorkProcessor
EventHandler:用戶自己實現(xiàn)谭跨,拿到數(shù)據(jù)了如何消費
Producer:用戶自己實現(xiàn)
核心類解讀
師傅領(lǐng)進門干厚,修行靠自己李滴。
建議看完上面的圖螃宙,跑跑helloworld
生產(chǎn)者#SingleProducerSequencer
(```)
//核心屬性 維護了生產(chǎn)者當(dāng)前生產(chǎn)的下標(biāo) 和 緩存門衛(wèi)下標(biāo)
long nextValue = -1;
long cachedValue = -1;
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
//初始值為-1,最后申請的生產(chǎn)者坑位
long nextValue = this.nextValue;
//本次申請的坑位所坯,單個發(fā)生n=1 【nextSequence&(bufferSize-1)即為在數(shù)組中的具體位置】
long nextSequence = nextValue + n;
//本次申請的坑位減一圈谆扎? 讓一圈還比你跑的快的意思
long wrapPoint = nextSequence - bufferSize;
//初始值為-1 最小的消費者坑位
long cachedGatingSequence = this.cachedValue;
/*
* 生產(chǎn)者追尾 消費者 || 消費者追尾生產(chǎn)者?芹助? 這種情況什么時候會發(fā)生
*/
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue);
long minSequence;
//當(dāng)發(fā)生生產(chǎn)者 追尾消費者的時候
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
//生產(chǎn)者等1ns 讓消費者先泡一會
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
/*
* 消費端讓出坑位堂湖,記下消費者占用的最小的那個坑位
*/
this.cachedValue = minSequence;
}
//成功申請到坑位
this.nextValue = nextSequence;
return nextSequence;
}
public void publish(long sequence)
{
//設(shè)置當(dāng)前生產(chǎn)的下標(biāo)位置
cursor.set(sequence);
//喚醒消費線程->獲取可消費的下標(biāo)
waitStrategy.signalAllWhenBlocking();
}
(```)
生產(chǎn)者#MultiProducerSequencer
(```)
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
//當(dāng)前坑位
current = cursor.get();
//下一個坑位
next = current + n;
//讓一圈后的坑位
long wrapPoint = next - bufferSize;
//守門員坑位 最小消費者坑位
long cachedGatingSequence = gatingSequenceCache.get();
//不能超車,等待后 自旋重試
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
//多生產(chǎn)者 用cas的方式獲取坑位
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
public void publish(final long sequence)
{
//當(dāng)前坑位 圈數(shù)量状土?
setAvailable(sequence);
//通知阻塞的消費者消費
waitStrategy.signalAllWhenBlocking();
}
(```)
環(huán)形數(shù)組#RingBuffer
(```)
//環(huán)形最大的index
private final long indexMask;
/**
* 環(huán)形數(shù)組元素
*null,null,null,null [ 中間這一塊是環(huán)形數(shù)組,兩邊對稱的數(shù)組位置暫時沒有放任何東西无蜂,做填充] null,null,null,null
*/
private final Object[] entries;
protected final int bufferSize;
/**
* 生產(chǎn)者對象的引用
* @see MultiProducerSequencer
* @see SingleProducerSequencer
*/
protected final Sequencer sequencer;
RingBufferFields(
EventFactory<E> eventFactory,
Sequencer sequencer) {
//生產(chǎn)者對象
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
//indexMask = 2^n次-1 用來做與運算 相當(dāng)快,計算機非常喜歡010101
this.indexMask = bufferSize - 1;
/**
* 其實是多創(chuàng)建了一些數(shù)組蒙谓,填充了一些數(shù)組斥季,保證ringbuffer的數(shù)組不和別人共享緩存行,
* 因為ringbuffer的數(shù)組對象一開始就創(chuàng)建好了累驮,保持具體的應(yīng)用就好了酣倾,不會變,提高了性能
*/
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory) {
for (int i = 0; i < bufferSize; i++) {
//初始化坑位
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
@SuppressWarnings("unchecked")
protected final E elementAt(long sequence) {
//根據(jù)下標(biāo)獲取entry
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
(```)
Disruptor
(```)
EventHandlerGroup<T> createEventProcessors(
/* new Sequence[0] 參數(shù)傳進
來的*/
final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
/*
* 有幾個消費者 就有幾個sequence
*/
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
/**
* 追蹤 生產(chǎn)者的生產(chǎn)下標(biāo) 和 依賴消費端的的Sequence
*/
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
/**
* 批次事件處理器
*/
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
/**
* 消費端倉庫 加入事件處理器
*/
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
/**
* 每個hanlder都有一個sequence
*/
processorSequences[i] = batchEventProcessor.getSequence();
}
/**
* 消費端的Sequence 需要加到sequencer中谤专。
*/
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
(```)
事件處理器 BatchEventProcessor
(```)
private void processEvents()
{
/**
* 每個消費事件是一個線程躁锡。
*
* sequence 最開始是-1
*
* 從0開始消費
*/
T event = null;
long nextSequence = sequence.get() + 1L;
while (true)
{
try
{
/**
* 檢查生產(chǎn)者生產(chǎn)的位置,消費端需要消費置侍,消費者追尾生產(chǎn)者的時候映之,阻塞在這里
*/
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
/**
* 消費端拿到可用的消費下標(biāo)
*/
while (nextSequence <= availableSequence)
{
//獲取數(shù)據(jù)
event = dataProvider.get(nextSequence);
//事件處理
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
//消費坑位+1
nextSequence++;
}
//消費坑位為availableSequence
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (running.get() != RUNNING)
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
(```)
disruptor 一個實現(xiàn)處理器 多線程消費 disruptor.handleEventsWithWorkerPool
換湯不換藥的存在拦焚,多線程用cas拿到當(dāng)前坑位消費。留給大家自行解讀惕医。
思考:
1耕漱、必須A處理器處理完 才能處理B和C處理器的場景,disrupptor如何實現(xiàn)的抬伺?
SequenceBarrier的實現(xiàn)類ProcessingSequenceBarrier構(gòu)造器public ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences /***哇哦 這是什么東西螟够?**/)
創(chuàng)建來源
Disruptor.createEventProcessors(
final Sequence[] barrierSequences/**哇哦,這個參數(shù)原來是要傳依賴的那個消費組的下標(biāo)峡钓,一開始我還不知道有什么用呢 哈哈哈 讀源碼真實件有趣的事情**/,
final EventHandler<? super T>[] eventHandlers)
SequenceBarrier維護了一個依賴的坑位序列組妓笙。具體代碼也留給大家自行解讀
2、為什么Disruptor快能岩?寞宫?
現(xiàn)在你有答案了么