之前我們一起分析了Disruptor的初始化和啟動(dòng)代碼喂链,接下來我們來分析下生產(chǎn)者的發(fā)布代碼。還不太了解的同學(xué)建議看看我之前發(fā)的Disruptor原理翻譯和導(dǎo)讀文章庶喜,尤其是一些名詞概念最好要清楚是做什么用的唯沮。
1 生產(chǎn)者線程
生產(chǎn)者一般就是我們的應(yīng)用線程,在發(fā)布通常使用一個(gè)EventTranslator將數(shù)據(jù)轉(zhuǎn)移到RingBuffer上忿檩,因?yàn)椴簧婕肮蚕頂?shù)據(jù)和實(shí)例變量尉尾,通常使用同一個(gè)EventTranslator實(shí)例進(jìn)行操作(注:translate經(jīng)常是“翻譯”的意思,但其實(shí)還有“ move from one place or condition to another.”的轉(zhuǎn)移燥透、轉(zhuǎn)換的意思)沙咏。
根據(jù)同一事件傳入?yún)?shù)的多少,可以選擇不同接口接收參數(shù)班套。
/**
* 生產(chǎn)者在發(fā)布事件時(shí)肢藐,使用翻譯器將原始對象設(shè)置到RingBuffer的對象中
*/
static class IntToExampleEventTranslator implements EventTranslatorOneArg<ExampleEvent, Integer>{
static final IntToExampleEventTranslator INSTANCE = new IntToExampleEventTranslator();
@Override
public void translateTo(ExampleEvent event, long sequence, Integer arg0) {
event.data = arg0 ;
System.err.println("put data "+sequence+", "+event+", "+arg0);
}
}
// 生產(chǎn)線程0
Thread produceThread0 = new Thread(new Runnable() {
@Override
public void run() {
int x = 0;
while(x++ < events / 2){ // 除了下面這行代碼,其他都沒有關(guān)系
disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
}
}
});
// 生產(chǎn)線程1
Thread produceThread1 = new Thread(new Runnable() {
@Override
public void run() {
int x = 0;
while(x++ < events / 2){
disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x);
}
}
});
produceThread0.start();
produceThread1.start();
在demo中吱韭,我們實(shí)例化并啟動(dòng)了兩個(gè)線程吆豹,用來生產(chǎn)事件放置到Disruptor中。
接下來我們跟隨源碼一點(diǎn)點(diǎn)深入理盆。
2 生產(chǎn)事件的整體邏輯
// Disruptor.java
/**
* Publish an event to the ring buffer. 使用給定的事件翻譯器痘煤,發(fā)布事件
*
* @param eventTranslator the translator that will load data into the event.
* @param arg A single argument to load into the event
*/
public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, final A arg)
{
ringBuffer.publishEvent(eventTranslator, arg);
}
//之前也講過,Disruptor這個(gè)類是一個(gè)輔助類猿规,在發(fā)布事件時(shí)其實(shí)是委托給RingBuffer完成發(fā)布操作衷快。
//RingBuffer.publishEvent()的邏輯大概分為兩個(gè)步驟:第一步先占有RingBuffer上的一個(gè)可用位置,我們簡稱為“占坑”坎拐;第二步在可用位置發(fā)布數(shù)據(jù)烦磁,我們簡稱為“填坑”养匈。
// RingBuffer
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
{
final long sequence = sequencer.next(); // 第一步 占坑
translateAndPublish(translator, sequence, arg0); // 第二步 填坑
}
其中第二步中哼勇,在填坑完畢還要調(diào)用Sequencer接口的publish方法對外發(fā)布事件都伪。為啥呢?先留個(gè)疑問积担。
在第一步占坑中陨晶,首先通過調(diào)用Sequencer.next()獲取RingBuffer實(shí)例下一個(gè)能用的序號。
AbstractSequencer作為一個(gè)抽象類帝璧,實(shí)現(xiàn)了Sequencer接口先誉,是單生產(chǎn)者Sequencer和多生產(chǎn)者Sequencer的父類。
3 Disruptor的核心--Sequencer接口
為什么說Sequencer是Disruptor的核心呢的烁?其實(shí)這也不是我說的褐耳,是Disruptor官方Wiki Introduction上說的:
Sequencer是用來保證生產(chǎn)者和消費(fèi)者之間正確、高速傳遞數(shù)據(jù)的渴庆。我們先來看看以生產(chǎn)者的角度看Sequencer有什么作用铃芦。
先來張類圖。
下邊是Sequencer接口及其父接口Cursored襟雷、Sequenced 定義刃滓。
// Sequencer
/**
* Coordinates claiming sequences for access to a data structure while tracking dependent {@link Sequence}s
*/
public interface Sequencer extends Cursored, Sequenced
{
/**
* Set to -1 as sequence starting point
* 序號開始位置
*/
long INITIAL_CURSOR_VALUE = -1L;
/**
* Claim a specific sequence. Only used if initialising the ring buffer to
* a specific value.
*
* @param sequence The sequence to initialise too.
* 聲明指定序號,只用在初始化RingBuffer到指定值耸弄,基本上不用了
*/
void claim(long sequence);
/**
* Confirms if a sequence is published and the event is available for use; non-blocking.
*
* @param sequence of the buffer to check
* @return true if the sequence is available for use, false if not
* 用非阻塞方式咧虎,確認(rèn)某個(gè)序號是否已經(jīng)發(fā)布且事件可用。
*/
boolean isAvailable(long sequence);
/**
* Add the specified gating sequences to this instance of the Disruptor. They will
* safely and atomically added to the list of gating sequences.
*
* @param gatingSequences The sequences to add.
* 增加門控序列(消費(fèi)者序列)计呈,用于生產(chǎn)者在生產(chǎn)時(shí)避免追尾消費(fèi)者
*/
void addGatingSequences(Sequence... gatingSequences);
/**
* Remove the specified sequence from this sequencer.
*
* @param sequence to be removed.
* @return <tt>true</tt> if this sequence was found, <tt>false</tt> otherwise.
* 從門控序列中移除指定序列
*/
boolean removeGatingSequence(Sequence sequence);
/**
* Create a new SequenceBarrier to be used by an EventProcessor to track which messages
* are available to be read from the ring buffer given a list of sequences to track.
*
* @param sequencesToTrack
* @return A sequence barrier that will track the specified sequences.
* @see SequenceBarrier
* 消費(fèi)者使用砰诵,用于追蹤指定序列(通常是上一組消費(fèi)者的序列)
*/
SequenceBarrier newBarrier(Sequence... sequencesToTrack);
/**
* Get the minimum sequence value from all of the gating sequences
* added to this ringBuffer.
*
* @return The minimum gating sequence or the cursor sequence if
* no sequences have been added.
* 獲取追蹤序列中最小的序列
*/
long getMinimumSequence();
/**
* Get the highest sequence number that can be safely read from the ring buffer. Depending
* on the implementation of the Sequencer this call may need to scan a number of values
* in the Sequencer. The scan will range from nextSequence to availableSequence. If
* there are no available values <code>>= nextSequence</code> the return value will be
* <code>nextSequence - 1</code>. To work correctly a consumer should pass a value that
* is 1 higher than the last sequence that was successfully processed.
*
* @param nextSequence The sequence to start scanning from.
* @param availableSequence The sequence to scan to.
* @return The highest value that can be safely read, will be at least <code>nextSequence - 1</code>.
*
* 獲取能夠從環(huán)形緩沖讀取的最高的序列號。依賴Sequencer的實(shí)現(xiàn)捌显,可能會(huì)掃描Sequencer的一些值胧砰。掃描從nextSequence
* 到availableSequence。如果沒有大于等于nextSequence的可用值苇瓣,返回值將為nextSequence-1尉间。為了工作正常,消費(fèi)者
* 應(yīng)該傳遞一個(gè)比最后成功處理的序列值大1的值击罪。
*/
long getHighestPublishedSequence(long nextSequence, long availableSequence);
<T> EventPoller<T> newPoller(DataProvider<T> provider, Sequence... gatingSequences);
}
// Cursored.java
/**
* Implementors of this interface must provide a single long value
* that represents their current cursor value. Used during dynamic
* add/remove of Sequences from a
* {@link SequenceGroups#addSequences(Object, java.util.concurrent.atomic.AtomicReferenceFieldUpdater, Cursored, Sequence...)}.
* 游標(biāo)接口哲嘲,用于獲取生產(chǎn)者當(dāng)前游標(biāo)位置
*/
public interface Cursored
{
/**
* Get the current cursor value.
*
* @return current cursor value
*/
long getCursor();
}
// Sequenced.java
public interface Sequenced
{
/**
* The capacity of the data structure to hold entries.
*
* @return the size of the RingBuffer.
* 獲取環(huán)形緩沖的大小
*/
int getBufferSize();
/**
* Has the buffer got capacity to allocate another sequence. This is a concurrent
* method so the response should only be taken as an indication of available capacity.
*
* @param requiredCapacity in the buffer
* @return true if the buffer has the capacity to allocate the next sequence otherwise false.
* 判斷是否含有指定的可用容量
*/
boolean hasAvailableCapacity(final int requiredCapacity);
/**
* Get the remaining capacity for this sequencer.
*
* @return The number of slots remaining.
* 剩余容量
*/
long remainingCapacity();
/**
* Claim the next event in sequence for publishing.
*
* @return the claimed sequence value
* 生產(chǎn)者發(fā)布時(shí),申請下一個(gè)序號
*/
long next();
/**
* Claim the next n events in sequence for publishing. This is for batch event producing. Using batch producing
* requires a little care and some math.
* <pre>
* int n = 10;
* long hi = sequencer.next(n);
* long lo = hi - (n - 1);
* for (long sequence = lo; sequence <= hi; sequence++) {
* // Do work.
* }
* sequencer.publish(lo, hi);
* </pre>
*
* @param n the number of sequences to claim
* @return the highest claimed sequence value
* 申請n個(gè)序號媳禁,用于批量發(fā)布
*/
long next(int n);
/**
* Attempt to claim the next event in sequence for publishing. Will return the
* number of the slot if there is at least <code>requiredCapacity</code> slots
* available.
*
* @return the claimed sequence value
* @throws InsufficientCapacityException
* next()的非阻塞模式
*/
long tryNext() throws InsufficientCapacityException;
/**
* Attempt to claim the next n events in sequence for publishing. Will return the
* highest numbered slot if there is at least <code>requiredCapacity</code> slots
* available. Have a look at {@link Sequencer#next()} for a description on how to
* use this method.
*
* @param n the number of sequences to claim
* @return the claimed sequence value
* @throws InsufficientCapacityException
* next(n)的非阻塞模式
*/
long tryNext(int n) throws InsufficientCapacityException;
/**
* Publishes a sequence. Call when the event has been filled.
*
* @param sequence
* 數(shù)據(jù)填充后眠副,發(fā)布此序號
*/
void publish(long sequence);
/**
* Batch publish sequences. Called when all of the events have been filled.
*
* @param lo first sequence number to publish
* @param hi last sequence number to publish
* 批量發(fā)布序號
*/
void publish(long lo, long hi);
}
3.1 單生產(chǎn)者發(fā)布事件
下邊先看使用單生產(chǎn)者SingleProducerSequencer具體是怎么占坑的。
// SingleProducerSequencer.java
@Override
public long next()
{
return next(1);
}
/**
* @see Sequencer#next(int)
*/
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
// 復(fù)制上次申請完畢的序列值
long nextValue = this.nextValue;
// 加n竣稽,得到本次需要申請的序列值囱怕,單個(gè)發(fā)送n為1
long nextSequence = nextValue + n; // 本次要驗(yàn)證的值
// 可能發(fā)生繞環(huán)的點(diǎn)霍弹,本次申請值 - 一圈長度
long wrapPoint = nextSequence - bufferSize;
long cachedGatingSequence = this.cachedValue; // 數(shù)值最小的序列值,也就是最慢消費(fèi)者
// wrapPoint 等于 cachedGatingSequence 將發(fā)生繞環(huán)行為娃弓,生產(chǎn)者將在環(huán)上典格,從后方覆蓋未消費(fèi)的事件。
// 如果即將生產(chǎn)者超一圈從后方追消費(fèi)者尾(要申請的序號落了最慢消費(fèi)者一圈)或 消費(fèi)者追生產(chǎn)者尾台丛,將進(jìn)行等待耍缴。后邊這種情況應(yīng)該不會(huì)發(fā)生吧?
// 針對以上值舉例:400米跑道(bufferSize)挽霉,小明跑了599米(nextSequence)防嗡,小紅(最慢消費(fèi)者)跑了200米(cachedGatingSequence)。小紅不動(dòng)侠坎,小明再跑一米就撞翻小紅的那個(gè)點(diǎn)蚁趁,叫做繞環(huán)點(diǎn)wrapPoint。
// 沒有空坑位实胸,將進(jìn)入循環(huán)等待他嫡。
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
// 只有當(dāng)消費(fèi)者消費(fèi),向前移動(dòng)后童芹,才能跳出循環(huán)
// 由于外層判斷使用的是緩存的消費(fèi)者序列最小值涮瞻,這里使用真實(shí)的消費(fèi)者序列進(jìn)行判斷,并將最新結(jié)果在跳出while循環(huán)之后進(jìn)行緩存
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{ // 喚醒等待的消費(fèi)者
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
// 當(dāng)消費(fèi)者向前消費(fèi)后假褪,更新緩存的最小序號
this.cachedValue = minSequence;
}
// 將成功申請的序號賦值給對象實(shí)例變量
this.nextValue = nextSequence;
return nextSequence;
}
next()占坑成功將會(huì)返回坑位號署咽,回到RingBuffer的publishEvent方法,執(zhí)行translateAndPublish方法生音,進(jìn)行填坑和發(fā)布操作宁否。
// RingBuffer.java
private void translateAndPublish(EventTranslator<E> translator, long sequence)
{
try
{
translator.translateTo(get(sequence), sequence);
}
finally
{
sequencer.publish(sequence);
}
}
translator參數(shù)用戶定義的對EventTranslator接口的實(shí)現(xiàn)對象缀遍。
上文已經(jīng)介紹過EventTranslator接口慕匠,除EventTranslator外,還有EventTranslatorOneArg域醇,EventTranslatorTwoArg台谊,EventTranslatorThreeArg,EventTranslatorVararg譬挚。功能是將給定的數(shù)據(jù)填充到指定坑位的對象(因?yàn)镽ingBuffer上已經(jīng)預(yù)先分配了對象)上锅铅,只不過分別對應(yīng)不同參數(shù)。簡單看下EventTranslatorOneArg接口定義减宣。
public interface EventTranslatorOneArg<T, A>
{
/**
* Translate a data representation into fields set in given event
*
* @param event into which the data should be translated.
* @param sequence that is assigned to event.
* @param arg0 The first user specified argument to the translator
*/
void translateTo(final T event, long sequence, final A arg0);
}
在放好數(shù)據(jù)后盐须,就可以調(diào)用sequencer的publish方法發(fā)布對象了。首先是更新當(dāng)前游標(biāo)漆腌,更新完畢再通知等待中的消費(fèi)者贼邓,消費(fèi)者將繼續(xù)消費(fèi)阶冈。關(guān)于消費(fèi)者的等待策略,后續(xù)還會(huì)講到塑径。
// SingleProducerSequencer.java
@Override
public void publish(long sequence)
{ // 在發(fā)布此位置可用時(shí)女坑,需要更新Sequencer內(nèi)部游標(biāo)值,并在使用阻塞等待策略時(shí)晓勇,通知等待可用事件的消費(fèi)者進(jìn)行繼續(xù)消費(fèi)
cursor.set(sequence);
// 除signalAllWhenBlocking外都是空實(shí)現(xiàn)
waitStrategy.signalAllWhenBlocking();
}
// BlockingWaitStrategy.java
@Override
public void signalAllWhenBlocking()
{
lock.lock();
try
{
processorNotifyCondition.signalAll();
}
finally
{
lock.unlock();
}
}
3.2 插播Disruptor中的高效AtomicLong--Sequence
注意那個(gè)cursor堂飞,這個(gè)cursor可不是簡單的long類型灌旧,而是Disruptor內(nèi)部實(shí)現(xiàn)的Sequence類绑咱。
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{ // value的前后各有7個(gè)long變量,用于緩存行填充枢泰,前后各7個(gè)保證了不管怎樣描融,當(dāng)64位的緩存行加載時(shí)value,不會(huì)有其他變量共享緩存行衡蚂,從而解決了偽共享問題
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
/**
* <p>Concurrent sequence class used for tracking the progress of
* the ring buffer and event processors. Support a number
* of concurrent operations including CAS and order writes.
*
* <p>Also attempts to be more efficient with regards to false
* sharing by adding padding around the volatile field.
* Sequence可以按照AtomicLong來理解窿克,除了Sequence消除了偽共享問題,更加高效
*/
public class Sequence extends RhsPadding
{
static final long INITIAL_VALUE = -1L;
private static final Unsafe UNSAFE;
private static final long VALUE_OFFSET;
static
{
UNSAFE = Util.getUnsafe();
try
{
VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
}
catch (final Exception e)
{
throw new RuntimeException(e);
}
}
/**
* Create a sequence initialised to -1.
*/
public Sequence()
{
this(INITIAL_VALUE);
}
/**
* Create a sequence with a specified initial value.
*
* @param initialValue The initial value for this sequence.
*/
public Sequence(final long initialValue)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
}
/**
* Perform a volatile read of this sequence's value.
*
* @return The current value of the sequence.
*/
public long get()
{
return value;
}
/**
* Perform an ordered write of this sequence. The intent is
* a Store/Store barrier between this write and any previous
* store.
*
* @param value The new value for the sequence.
* 此方法等同于AtomicLong#lazySet(long newValue)毛甲,
* 和直接修改volatile修飾的value相比年叮,非阻塞,更高效玻募,但更新的值會(huì)稍遲一點(diǎn)看到
*/
public void set(final long value)
{
UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
}
/**
* Performs a volatile write of this sequence. The intent is
* a Store/Store barrier between this write and any previous
* write and a Store/Load barrier between this write and any
* subsequent volatile read.
*
* @param value The new value for the sequence.
*/
public void setVolatile(final long value)
{
UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
}
/**
* Perform a compare and set operation on the sequence.
*
* @param expectedValue The expected current value.
* @param newValue The value to update to.
* @return true if the operation succeeds, false otherwise.
*/
public boolean compareAndSet(final long expectedValue, final long newValue)
{
return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
}
/**
* Atomically increment the sequence by one.
*
* @return The value after the increment
*/
public long incrementAndGet()
{
return addAndGet(1L);
}
/**
* Atomically add the supplied value.
*
* @param increment The value to add to the sequence.
* @return The value after the increment.
*/
public long addAndGet(final long increment)
{
long currentValue;
long newValue;
do
{
currentValue = get();
newValue = currentValue + increment;
}
while (!compareAndSet(currentValue, newValue));
return newValue;
}
@Override
public String toString()
{
return Long.toString(get());
}
}
這個(gè)Sequence其實(shí)相當(dāng)于AtomicLong只损,最大的區(qū)別在于Sequence解決了偽共享問題。另外Sequence#set相當(dāng)于AtomicLong#lazySet七咧。
致此跃惫,使用單生產(chǎn)者發(fā)布事件的流程就完成了。
3.3 多生產(chǎn)者發(fā)布事件
如果使用的是多生產(chǎn)者艾栋,占坑則調(diào)用MultiProducerSequencer.next()爆存。
@Override
public long next()
{
return next(1);
}
/**
* @see Sequencer#next(int)
*/
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.get(); // 當(dāng)前游標(biāo)值,初始化時(shí)是-1
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
可以發(fā)現(xiàn)蝗砾,多生產(chǎn)者模式占坑和放置數(shù)據(jù)的邏輯和單生產(chǎn)者模式區(qū)別不大先较。區(qū)別主要是最后調(diào)用publish發(fā)布坑位的邏輯。
// MultiProducerSequencer.java
private static final Unsafe UNSAFE = Util.getUnsafe();
private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); // 獲取int[]數(shù)組類的第一個(gè)元素與該類起始位置的偏移悼粮。
private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); // 每個(gè)元素需要占用的位置闲勺,也有可能返回0。BASE和SCALE都是為了操作availableBuffer
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// availableBuffer tracks the state of each ringbuffer slot
// see below for more details on the approach
private final int[] availableBuffer; // 初始全是-1
private final int indexMask;
private final int indexShift;
@Override
public void publish(final long sequence)
{
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking(); // 如果使用BlokingWaitStrategy矮锈,才會(huì)進(jìn)行通知霉翔。否則不會(huì)操作
}
@Override
public void publish(long lo, long hi)
{
for (long l = lo; l <= hi; l++)
{
setAvailable(l);
}
waitStrategy.signalAllWhenBlocking();
}
/
* availableBuffer設(shè)置可用標(biāo)志
* 主要原因是避免發(fā)布者線程之間共享一個(gè)序列對象。
* 游標(biāo)和最小門控序列的差值應(yīng)該永遠(yuǎn)不大于RingBuffer的大邪俊(防止生產(chǎn)者太快债朵,覆蓋未消費(fèi)完的數(shù)據(jù))
*/
private void setAvailable(final long sequence)
{ // calculateIndex 求模%子眶, calculateAvailabilityFlag 求除/
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}
private void setAvailableBufferValue(int index, int flag)
{ // 使用Unsafe更新屬性,因?yàn)槭侵苯硬僮鲀?nèi)存序芦,所以需要計(jì)算元素位置對應(yīng)的內(nèi)存位置bufferAddress
long bufferAddress = (index * SCALE) + BASE;
// availableBuffer是標(biāo)志可用位置的int數(shù)組臭杰,初始全為-1。隨著sequence不斷上升谚中,buffer中固定位置的flag(也就是sequence和bufferSize相除的商)會(huì)一直增大渴杆。
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
private int calculateAvailabilityFlag(final long sequence)
{ // 求商 就是 sequence / bufferSize , bufferSize = 2^indexShift。
return (int) (sequence >>> indexShift);
}
private int calculateIndex(final long sequence)
{ // 計(jì)算位置即求模宪塔,直接使用序號 與 掩碼(2的平方-1磁奖,也就是一個(gè)全1的二進(jìn)制表示),相當(dāng)于 sequence % (bufferSize), bufferSize = indexMask + 1
return ((int) sequence) & indexMask;
}
對比SingleProducerSequencer的publish,MultiProducerSequencer的publish沒有設(shè)置cursor某筐,而是將內(nèi)部使用的availableBuffer數(shù)組對應(yīng)位置進(jìn)行設(shè)置比搭。availableBuffer是一個(gè)記錄RingBuffer槽位狀態(tài)的數(shù)組,通過對序列值sequence取ringBuffer大小的模南誊,獲得槽位號身诺,再通過與ringBuffer大小相除,獲取序列值所在的圈數(shù)抄囚,進(jìn)行設(shè)置霉赡。這里沒有直接使用模運(yùn)算和觸發(fā)運(yùn)算,而使用更高效的位與和右移操作幔托。
其他的操作穴亏,MultiProducerSequencer和SingleProducerSequencer類似,就不再贅述了柑司。
4 剖析SingleProducerSequencer設(shè)計(jì)
上面已經(jīng)把Disruptor的主要發(fā)布事件流程過了一遍迫肖,好奇如你,必然覺得意猶未盡攒驰。如果你沒有蟆湖,那肯定還是我講的有問題,不代表Disruptor本身的精彩玻粪。
接下來說一說SingleProducerSequencer的設(shè)計(jì)隅津。從中我們可以看到Disruptor解決偽共享問題的實(shí)際代碼。
SingleProducerSequencer繼承了抽象類SingleProducerSequencerFields劲室,SingleProducerSequencerFields又繼承了抽象類SingleProducerSequencerPad伦仍。其中SingleProducerSequencerFields是實(shí)際放置有效實(shí)例變量的位置。
// SingleProducerSequencer.java
abstract class SingleProducerSequencerPad extends AbstractSequencer
{
protected long p1, p2, p3, p4, p5, p6, p7;
public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
}
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad
{
public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
}
/**
* Set to -1 as sequence starting point
*/
protected long nextValue = Sequence.INITIAL_VALUE; // 生產(chǎn)者申請的下一個(gè)序列值
protected long cachedValue = Sequence.INITIAL_VALUE; // 緩存上一次比較的門控序列組和next的較小值(最慢消費(fèi)者序列值)
}
/**
* <p>Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s.
* Not safe for use from multiple threads as it does not implement any barriers.</p>
* <p>
* <p>Note on {@link Sequencer#getCursor()}: With this sequencer the cursor value is updated after the call
* to {@link Sequencer#publish(long)} is made.
*/
public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
protected long p1, p2, p3, p4, p5, p6, p7;
// ...省略
}
可以發(fā)現(xiàn)很洋,在兩個(gè)實(shí)例變量前后各有7個(gè)long型變量充蓝。為什么這樣做呢?對CPU緩存有了解的同學(xué)一定知道的……對,就是為了解決偽共享問題谓苟。
CPU在加載內(nèi)存到緩存行時(shí)官脓,一個(gè)緩存行中最多只有這兩個(gè)有效變量,最大限度地避免了因偽共享問題涝焙,導(dǎo)致緩存失效卑笨,而造成性能損失。
為了更清晰地闡述這個(gè)道理仑撞,我們嘗試看一下SingleProducerSequencer實(shí)例的內(nèi)存布局赤兴。
使用HSDB(HotSpot Debugger,可通過 java -cp .;"%JAVA_HOME%/lib/sa-jdi.jar" sun.jvm.hotspot.HSDB 啟動(dòng))跟蹤demo對應(yīng)的已斷點(diǎn)的HotSpot進(jìn)程隧哮,從Object Histogram對象圖中篩選出SingleProducerSequencer實(shí)例桶良,并通過Inspector工具對SingleProducerSequencer實(shí)例進(jìn)行查看。
本例中近迁,0x00000000828026f8為com.lmax.disruptor.SingleProducerSequencer實(shí)例在JVM中的內(nèi)存起始位置艺普。以此內(nèi)存地址通過mem命令查看后續(xù)的30個(gè)內(nèi)存地址內(nèi)容簸州。為啥要30個(gè)呢鉴竭?其實(shí)20個(gè)就夠了,可以看到"Object Histogram"中SingleProducerSequencer實(shí)例的size是160字節(jié)岸浑,mem打印一行表示一字長搏存,對應(yīng)到我本機(jī)的64位機(jī)器即8字節(jié),所以長度選擇大于等于160/8=20就可以看到SingleProducerSequencer實(shí)例的內(nèi)存布局全貌矢洲。
左側(cè)紅框中的地址0x0000000082802750和0x0000000082802758分別對應(yīng)右側(cè)紅框中的nextValue和cachedValue兩個(gè)實(shí)例變量璧眠。而在它們前后,各有7個(gè)連續(xù)的long型整數(shù)0读虏。CPU在加載連續(xù)內(nèi)存到緩存時(shí)责静,以緩存行為單位。緩存行通常為64B盖桥,通過占位灾螃,可以讓實(shí)際變量獨(dú)享一個(gè)緩存行。從而解決了偽共享問題揩徊。
緩存行查看:linux可使用以下命令查看腰鬼。
cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size
windows可使用CPU-Z查看。
附錄:JAVA對象的內(nèi)存布局相關(guān)知識(shí)
最后再說點(diǎn)Java對象的內(nèi)存布局塑荒,和本文主題關(guān)系不大熄赡,可以略過。
HotSpot對象內(nèi)存布局:
HotSpot中一個(gè)對象(非數(shù)組)的內(nèi)存布局大概是這樣的:對象頭(Mark Word + klass pointer) + 實(shí)際數(shù)據(jù) + 為了保持8字節(jié)對齊的填充齿税。其中對象頭的Mark Word和klass pointer長度各為一機(jī)器字(machine-word)彼硫,即32位機(jī)器對應(yīng)32bit(4字節(jié)),64位機(jī)器對應(yīng)64bit(8字節(jié))。如64位JVM開啟了指針壓縮拧篮,klass pointer將壓縮到4字節(jié)溜在。
查看是否開啟了指針壓縮:
jinfo -flag UseCompressedOops pid 返回-XX:+UseCompressedOops即為開啟,或jinfo -flags pid 查看全部選項(xiàng)他托。
此例中返回了-XX:+UseCompressedOops掖肋,表示開啟了指針壓縮(jdk1.8默認(rèn)開啟)。此時(shí)普通類型指針將被壓縮為4字節(jié)赏参。
下面通過SingleProducerSequencer舉一個(gè)實(shí)際的例子志笼。
SingleProducerSequencer屬性
使用HSDB Inspector查看實(shí)例。
查看對象內(nèi)存內(nèi)容:
hsdb> mem 0x00000000828026f8 20
0x00000000828026f8: 0x0000000000000009 // mark word 存儲(chǔ)對象運(yùn)行時(shí)數(shù)據(jù)把篓,如哈希碼纫溃、GC分代年齡、鎖狀態(tài)標(biāo)志韧掩、線程持有鎖紊浩、偏向線程ID、偏向時(shí)間戳
0x0000000082802700: 0x000000082000de38 // 高4位(82802704~82802707):int bufferSize 8 ,低4位(82802700~8280273):2000de38疗锐。由于開啟了指針壓縮坊谁,低4位表示klass pointer,由于使用的JDK1.8滑臊,klass metadata保存在Metadataspace中口芍。
0x0000000082802708: 0x828028e082809e98 // 高4位:ref cursor,低4位: ref waitStrategy
0x0000000082802710: 0x000000008284b390 // ref gatingSequences ObjArray
0x0000000082802718: 0x0000000000000000 // 包括當(dāng)前行的以下7行 SingleProducerSequencerPad中定義的p1~p7
0x0000000082802720: 0x0000000000000000
0x0000000082802728: 0x0000000000000000
0x0000000082802730: 0x0000000000000000
0x0000000082802738: 0x0000000000000000
0x0000000082802740: 0x0000000000000000
0x0000000082802748: 0x0000000000000000
0x0000000082802750: 0x0000000000000001 // nextValue 1
0x0000000082802758: 0xffffffffffffffff // cachedValue -1
0x0000000082802760: 0x0000000000000000 // SingleProducerSequencer定義的p1~p7
0x0000000082802768: 0x0000000000000000
0x0000000082802770: 0x0000000000000000
0x0000000082802778: 0x0000000000000000
0x0000000082802780: 0x0000000000000000
0x0000000082802788: 0x0000000000000000
0x0000000082802790: 0x0000000000000000
計(jì)算此對象的Shallow Heap size 和 Retained Heap size:
可以發(fā)現(xiàn)此對象一共占用20*8=160B內(nèi)存雇卷,此值即Shallow Heap size鬓椭。也可以手工計(jì)算:mark_word[8] + klass_pointer[4] + 2 * ref[4] + ObjArray_ref[8] + 16 * long[8] + int[4] = 160B
而保留內(nèi)存大小Retained Heap size = Shallow Heap size + (當(dāng)前對象的引用對象排除GC Root引用對象)的Shallow Heap size。
這里涉及到的引用為:cursor 0x00000000828028e0 关划,waitStrategy 0x0000000082809e98 小染,gatingSequences 0x000000008284b390。
分別使用revptrs命令查找反向引用贮折,發(fā)現(xiàn)只有g(shù)atingSequences為此對象唯一引用裤翩,故計(jì)算gatingSequences(com.lmax.disruptor.Sequence[1] ) Shallow Heap size = 12 + 4 + 1 * 4 + 4 = 24B。這里由于開啟了壓縮指針脱货,引用指針占用4B岛都,此時(shí)占用20B,需要填充4B補(bǔ)滿24B振峻。故對象的Retained Heap size為160+24=184臼疫。
hsdb> mem 0x000000008284b390 3
0x000000008284b390: 0x0000000000000009
0x000000008284b398: 0x000000012000e08d
0x000000008284b3a0: 0x000000008284abc0
數(shù)組對象的Shallow Heap size=引用對象頭大小12字節(jié)+存儲(chǔ)數(shù)組長度的空間大小4字節(jié)+數(shù)組的長度*數(shù)組中對象的Shallow Heap size+padding大小
最后還有個(gè)問題,我們知道從Java8開始扣孟,Metaspace替代之前的PermGen存儲(chǔ)元信息烫堤。使用Java7的HSDB是可以通過universe命令查看到PermGen信息的,而Java8就查不到Metaspace信息。
Heap Parameters:
ParallelScavengeHeap [ PSYoungGen [
eden = [0x00000000d6300000,0x00000000d66755d0,0x00000000d8300000] ,
from = [0x00000000d8300000,0x00000000d8300000,0x00000000d8800000] ,
to = [0x00000000d8800000,0x00000000d8800000,0x00000000d8d00000] ]
PSOldGen [ [0x0000000082800000,0x00000000829d79c0,0x0000000084a00000] ] ]
Disruptor生產(chǎn)者相關(guān)源碼就分享到這鸽斟,后續(xù)將對消費(fèi)者一探究竟拔创。
參考資料:
- Java對象內(nèi)存布局(推薦,寫的很棒) http://www.reibang.com/p/91e398d5d17c
- JVM——深入分析對象的內(nèi)存布局 http://www.cnblogs.com/zhengbin/p/6490953.html
- 借HSDB來探索HotSpot VM的運(yùn)行時(shí)數(shù)據(jù) http://rednaxelafx.iteye.com/blog/1847971
- markOop.hpp https://github.com/dmlloyd/openjdk/blob/jdk8u/jdk8u/hotspot/src/share/vm/oops/markOop.hpp
- Shallow and retained sizes http://toolkit.globus.org/toolkit/testing/tools/docs/help/sizes.html
- AtomicLong.lazySet是如何工作的富蓄? http://ifeve.com/how-does-atomiclong-lazyset-work/
- 《深入理解Java虛擬機(jī)》2.3.2 對象的內(nèi)存布局