Disruptor據(jù)說是最快的并發(fā)消息處理器。我今天學(xué)習(xí)了下童擎,有了一個初步的了解媳维。
Disruptor可以當(dāng)做隊列使用,不過可以支持多消費者铸抑,既一條生產(chǎn)者的消息可以提供給多個消費者同時使用垂券。
gethub 安裝 Install-Package Disruptor -Version 3.3.7
我們添加一個實體類,這個類的主要是生產(chǎn)者和消費者的信息載體
public sealed class CmdEntry
{
public string Cmd { get; set; }
}
我們增加一個類實現(xiàn)IEventHandler接口,這個類的作用相當(dāng)于消費者觸發(fā)口菇爪,也可以當(dāng)作消費者算芯。
public class ExecuteCmdHandler : IEventHandler<CmdEntry>
{
public void OnNext(CmdEntry data, long sequence, bool endOfBatch)
{
Console.WriteLine("this is CmdHandler0 Cmd = {0} (index:{1})", data.Cmd, sequence);
}
}
然后我們添加一個窗體界面。并生成生產(chǎn)者信息凳宙。
public partial class MainWindow : Window
{
RingBuffer<CmdEntry> _ringBuffer;
public MainWindow()
{
InitializeComponent();
var disruptor = new Disruptor<CmdEntry>(() => new CmdEntry(), 512, TaskScheduler.Default);
disruptor.HandleEventsWith(new ExecuteCmdHandler());
//disruptor.HandleEventsWith(new ExecuteCmdHandler1());
_ringBuffer = disruptor.Start();
Thread tr = new Thread(Product);
tr.Start();
}
private void Product(object o)
{
int i = 0;
while (i<500)
{
long index = _ringBuffer.Next();
var entity = _ringBuffer[index];
entity.Cmd = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff");
_ringBuffer.Publish(index);
Thread.Sleep(1);
i = i + 1;
}
}
}
運行
如果我們有多個消費者熙揍,只需要在實現(xiàn)IEventHandler接口
public class ExecuteCmdHandler1 : IEventHandler<CmdEntry>
{
public void OnNext(CmdEntry data, long sequence, bool endOfBatch)
{
Console.WriteLine("this is CmdHandler1 Cmd = {0} (index:{1})", data.Cmd, sequence);
}
}
然后再通過Disruptor對象的HandleEventsWith方法添加進(jìn)去
運行,從結(jié)果可以看出氏涩,兩個消費者是并行處理届囚。