一、簡介
LMAX是一家外匯黃金交易所丹莲,Disruptor是由LMAX公司開發(fā)的可信消息傳遞架構(gòu)的一部分
以便用非常快速的方法來在多組件之間傳遞數(shù)據(jù)尸诽。
核心思想是理解并適應(yīng)硬件工作方式來達到最優(yōu)的效果甥材。
github地址:https://github.com/LMAX-Exchange/disruptor
LMAX架構(gòu):https://martinfowler.com/articles/lmax.html
二、架構(gòu)圖
三性含、成員
sequencer:序列號分配
sequence:序號洲赵,自增不減
MultiProducerSequencer 多生產(chǎn)者序列分配器
SingleProducerSequencer 單生產(chǎn)者序列分配器
ProcessingSequenceBarrier 管理消費者和生產(chǎn)者的依賴關(guān)系,
Ring Buffer:負責存儲和更新事件的數(shù)據(jù)
Sequence Barrier:由Sequencer生成商蕴,它包含此Sequencer發(fā)布的Sequence指針以及依賴的其它消費者的Sequence叠萍。 它包含為消費者檢查是否有可用的事件的代碼邏輯绪商。
Wait Strategy: 消費者等待事件的策略, 這些事件由生產(chǎn)者放入部宿。
Event:傳遞的事件,完全有用戶定義
EventProcessor:處理事件的主要循環(huán),包含一個Sequence绵患。有一個具體的實現(xiàn)類BatchEventProcessor.
EventHandler: 用戶實現(xiàn)的接口,代表一個消費者
Producer:生產(chǎn)者落蝙,先獲得占位,然后提交事件移迫。
四管行、示例
maven依賴
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
</dependency>
消息體:
import lombok.Data;
/**
* Created by yangzaining on 2020-04-06.
*/
@Data
public class LongEvent {
private Long id;
}
消息工廠
import com.lmax.disruptor.EventFactory;
import lombok.Data;
/**
* Created by yangzaining on 2020-04-06.
*/
@Data
public class LongEventFactory implements EventFactory<LongEvent> {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
消費者
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
/**
* Created by yangzaining on 2020-04-06.
*/
@Slf4j
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("event = {},sequence = {}, endOfBatch = {}", event.getId(), sequence, endOfBatch);
}
}
生產(chǎn)者
package demo;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
/**
* Created by yangzaining on 2020-04-06.
*/
public class LongEventPusher {
private RingBuffer<LongEvent> ringBuffer;
LongEventPusher(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void push(Long id) {
long sequence = ringBuffer.next();
try {
LongEvent event = ringBuffer.get(sequence);
event.setId(id);
} finally {
ringBuffer.publish(sequence);
}
}
}
public static void main(String[] args) throws InterruptedException {
LongEventFactory factory = new LongEventFactory();
int bufferSize = 1024;
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
disruptor.handleEventsWith(new LongEventHandler(), new LongEventHandler());
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventPusher pusher = new LongEventPusher(ringBuffer);//java 8可以用pushEvent
for (long l = 0; l < 100; l++) {
// long finalL = l;
// disruptor.publishEvent((eventWrapper, sequence) -> eventWrapper.setId(finalL));
pusher.push(l);
}
Thread.sleep(10000);
}
/**
* @see Sequencer#next(int)
*/
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
waitStrategy.signalAllWhenBlocking();
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
注:以上全憑自己對Disruptor的理解雨效,有不對的地方歡迎指正