不喜歡看廢話的同學(xué)直接跳到 九 看代碼
一、
有時(shí)候我們會(huì)有這樣的需求:
一個(gè)或多個(gè)線程(Senders)向一個(gè)隊(duì)列(FIFO)中寫入數(shù)據(jù),
另外一個(gè)或多個(gè)線程(Receivers)從這個(gè)隊(duì)列中取數(shù)據(jù),并對(duì)數(shù)據(jù)進(jìn)行處理或加工
這就是異步隊(duì)列
PS:發(fā)送者(sender)/接收者(receiver)有時(shí)也被叫做生產(chǎn)者(producer)/消費(fèi)者(consumer )
二肆糕、
最近在項(xiàng)目中有使用(本地)異步隊(duì)列的需求曲管,在網(wǎng)上搜了一圈,都不是很滿意:(所以說.NET生態(tài)還有待加強(qiáng))
一種是通過事件機(jī)制來觸發(fā)癞松,這種方式寫出的代碼比較“分散”,不易閱讀冕碟,理解和維護(hù)拦惋,這種當(dāng)然不能接受啦,
另一種是通過“阻塞”模式來優(yōu)化代碼的可讀性安寺,代價(jià)就是浪費(fèi)性能厕妖,
拜托,現(xiàn)在都什么年代了挑庶,怎么可能阻塞線程呢言秸?當(dāng)然是使用 C# 5.0 引入的 async/await啦。
因?yàn)樗巡坏接啵灾荒茏约簞?dòng)手了
三举畸、
我們的目標(biāo)當(dāng)然是寫出這樣的代碼:
var x = await queue.Dequeue(cancellationToken);
并且內(nèi)部的實(shí)現(xiàn)必須是非阻塞式的,
基于這個(gè)目標(biāo)我們需要知道一個(gè)知識(shí)點(diǎn)信號(hào)量
四凳枝、
信號(hào)量簡(jiǎn)單來說就是對(duì)一個(gè)資源打上一個(gè)數(shù)字的標(biāo)記抄沮,
這個(gè)數(shù)字(正數(shù))表示了這個(gè)資源可以同時(shí)被多少個(gè)對(duì)象訪問,(負(fù)數(shù))還有多少個(gè)對(duì)象需要訪問他
打個(gè)比方:一支筆岖瑰,他同時(shí)只能被一個(gè)人使用叛买,所以我可以初始給他打上一個(gè)信號(hào)量1
當(dāng)?shù)谝粋€(gè)小朋友來借筆時(shí),首先觀察信號(hào)量1(大于0)蹋订,則表示可以將筆(資源)借(分配)給小朋友(對(duì)象)率挣,并將信號(hào)量-1,此時(shí)信號(hào)量為0
第二個(gè)小朋友來借筆時(shí)露戒,信號(hào)量為0椒功,表示需要等待,并將信號(hào)量-1智什,此時(shí)信號(hào)量為-1(表示有1個(gè)對(duì)象正在等待資源釋放)
如果這時(shí)动漾,第一個(gè)小朋友,將筆(資源)歸還(釋放)撩鹿,則將信號(hào)量+1谦炬,并將筆借給第二個(gè)小朋友,此時(shí)信號(hào)量為0(表示無等待)
如果在第一個(gè)小朋友還沒有將筆歸還之前,第二個(gè)小朋友表示不愿意再等了键思,則信號(hào)量也-1
例子2:
一個(gè)小游泳池础爬,可以同時(shí)允許10個(gè)人一起下水,則初始信號(hào)量為10
第一個(gè)人來吼鳞,信號(hào)量-1看蚜,得到9,大于等于0赔桌,表示可以進(jìn)去玩
第二人人來供炎,信號(hào)量-1,得到8疾党,大于等于0音诫,表示可以進(jìn)去玩
......
第十個(gè)人來,信號(hào)量-1雪位,得到0竭钝,大于等于0,表示可以進(jìn)去玩
第十一個(gè)人來雹洗,信號(hào)量-1香罐,得到-1,小于0时肿,表示需要等待
第十二個(gè)人來庇茫,信號(hào)量-1,得到-2螃成,小于0旦签,表示需要等待
第十三個(gè)人來,信號(hào)量-1寸宏,得到-3顷霹,小于0,表示需要等待
第一個(gè)人走了击吱,信號(hào)量+1,將第十個(gè)人放進(jìn)去遥昧,信號(hào)量等于-2覆醇,有2個(gè)人在等待
第十二個(gè)人走了,信號(hào)量+1炭臭,信號(hào)量等于-1永脓,有1個(gè)人在等待
與信號(hào)量的處理相關(guān)的還有一個(gè)PV操作,了解一下
五鞋仍、
在C#中有專門用于解決信號(hào)量問題的類:Semaphore
和SemaphoreSlim
Semaphore:限制可同時(shí)訪問某一資源或資源池的線程數(shù)常摧。
SemaphoreSlim:對(duì)可同時(shí)訪問資源或資源池的線程數(shù)加以限制的 System.Threading.Semaphore 的輕量替代。
這里我選擇更輕量的SemaphoreSlim
來實(shí)現(xiàn),他的用法也非常簡(jiǎn)單
var s = new SemaphoreSlim(1); // 計(jì)數(shù)器初始值1
await s.WaitAsync(cancellationToken); // 計(jì)數(shù)器-1落午,如果計(jì)數(shù)不足則等待(這個(gè)類的設(shè)計(jì)是計(jì)數(shù)到0就不會(huì)再減少了)
s.Release(); // 計(jì)數(shù)器+1
下面就開始實(shí)現(xiàn)一個(gè)異步隊(duì)列
六谎懦、
先定義一個(gè)異步隊(duì)列的接口
// 異步隊(duì)列接口
public interface IAsyncQueue<T>: IDisposable
{
// 清空隊(duì)列。
Task Clear(CancellationToken token);
// 移除并返回位于隊(duì)列開始處的對(duì)象溃斋。
Task<T> Dequeue(CancellationToken token);
// 將對(duì)象添加到隊(duì)列的結(jié)尾處界拦。
Task Enqueue(T item, CancellationToken token);
}
定義接口的好處是為了方便寫擴(kuò)展方法和以后對(duì)實(shí)現(xiàn)的修改
七、
定義信號(hào)量
從接口中可以看出梗劫,入和出2個(gè)操作都是異步的,所以需要定義2個(gè)信號(hào)量
private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
private readonly SemaphoreSlim _out = new SemaphoreSlim(0);
入操作的信號(hào)量初始值是1,表示允許1個(gè)并發(fā)執(zhí)行
出操作的信號(hào)量初始值是0忿偷,因?yàn)槌霾僮鞯男盘?hào)量是根據(jù)隊(duì)列中的元素個(gè)數(shù)來決定的材蛛,初始隊(duì)列元素個(gè)數(shù)為0
定義一個(gè)內(nèi)部隊(duì)列,用于實(shí)現(xiàn)隊(duì)列的基本操作
private readonly Queue<T> _queue = new Queue<T>();
實(shí)現(xiàn)類定義:
// 異步消息隊(duì)列實(shí)現(xiàn)
sealed class AsyncQueue<T> : IAsyncQueue<T>
{
// 內(nèi)部隊(duì)列實(shí)例
private readonly Queue<T> _queue = new Queue<T>();
// 入操作信號(hào)量
private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
// 出操作信號(hào)量
private readonly SemaphoreSlim _out = new SemaphoreSlim(0);
public Task Clear(CancellationToken token) => throw new NotImplementedException();
public Task<T> Dequeue(CancellationToken token) => throw new NotImplementedException();
public Task Enqueue(T item, CancellationToken token) => throw new NotImplementedException();
public void Dispose() => throw new NotImplementedException();
}
八走哺、
入(Enqueue)操作
public async Task Enqueue(T item, CancellationToken token)
{
await _in.WaitAsync(token); // 入操作信號(hào)量-1蚯嫌,并發(fā)時(shí)等待,只允許一個(gè)線程操作
try
{
_queue.Enqueue(item); // 將對(duì)象放入隊(duì)列
_out.Release(); // “出”操作信號(hào)量+1
}
finally
{
_in.Release(); // 如果Wait操作完成割坠,則必須將信號(hào)量施放
}
}
出(Dequeue)操作
public async Task<T> Dequeue(CancellationToken token)
{
await _out.WaitAsync(token); // 同上齐帚,出操作比較簡(jiǎn)單就不贅述了
return _queue.Dequeue();
}
清空(Clear)操作
public async Task Clear(CancellationToken token)
{
await _in.WaitAsync(token); // 先占中入操作的資源,防止操作中插入新的對(duì)象
try
{
// 循環(huán)調(diào)用出操作的Wait彼哼,將信號(hào)量減為0
// WaitAsync(100)表示每次操作等待100毫秒对妄,為了防止另一個(gè)線程將`_out`的最后一個(gè)資源搶先領(lǐng)取后,清空操作無限期等待
while (await _out.WaitAsync(100) || _out.CurrentCount > 0)
{
}
_queue.Clear();
}
finally
{
_in.Release();
}
}
九敢朱、
完整代碼:
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace blqw
{
sealed class AsyncQueue<T> : IAsyncQueue<T>
{
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
private readonly SemaphoreSlim _out = new SemaphoreSlim(0);
public async Task Clear(CancellationToken token)
{
await _in.WaitAsync(token);
try
{
while (await _out.WaitAsync(100) || _out.CurrentCount > 0)
{
_queue.TryDequeue(out _);
}
}
finally
{
_in.Release();
}
}
public async Task<T> Dequeue(CancellationToken token)
{
await _out.WaitAsync(token);
return _queue.TryDequeue(out var val) ? val : throw new System.InvalidOperationException();
}
public async Task Enqueue(T item, CancellationToken token)
{
await _in.WaitAsync(token);
try
{
_queue.Enqueue(item);
_out.Release();
}
finally
{
_in.Release();
}
}
void DisposeSemaphoreSlim(SemaphoreSlim ss)
{
try
{
ss.Dispose();
}
catch { }
}
public void Dispose()
{
DisposeSemaphoreSlim(_in);
DisposeSemaphoreSlim(_out);
}
}
}
64行
十剪菱、
工廠類
/// <summary>
/// 異步隊(duì)列
/// </summary>
public static class AsyncQueue
{
public static IAsyncQueue<T> Create<T>() => new AsyncQueue<T>();
}
不直接公開 AsyncQueue<T> 是考慮到以后方便替換實(shí)現(xiàn)類
拓展類
public static class AsyncQueueExtensions
{
public static Task Clear<T>(this IAsyncQueue<T> aq) => aq.Clear(CancellationToken.None);
public static Task Clear<T>(this IAsyncQueue<T> aq, int millisecondsTimeout)
{
var source = new CancellationTokenSource(millisecondsTimeout);
return aq.Clear(source.Token).ContinueWith(t => source.Dispose());
}
public static Task Clear<T>(this IAsyncQueue<T> aq, TimeSpan timeout)
{
var source = new CancellationTokenSource(timeout);
return aq.Clear(source.Token).ContinueWith(t => source.Dispose());
}
public static Task<T> Dequeue<T>(this IAsyncQueue<T> aq) => aq.Dequeue(CancellationToken.None);
public static async Task<T> Dequeue<T>(this IAsyncQueue<T> aq, int millisecondsTimeout)
{
using (var source = new CancellationTokenSource(millisecondsTimeout))
{
return await aq.Dequeue(source.Token);
}
}
public static async Task<T> Dequeue<T>(this IAsyncQueue<T> aq, TimeSpan timeout)
{
using (var source = new CancellationTokenSource(timeout))
{
return await aq.Dequeue(source.Token);
}
}
public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item) => aq.Enqueue(item, CancellationToken.None);
public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item, int millisecondsTimeout)
{
var source = new CancellationTokenSource(millisecondsTimeout);
return aq.Enqueue(item, source.Token).ContinueWith(t => source.Dispose());
}
public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item, TimeSpan timeout)
{
var source = new CancellationTokenSource(timeout);
return aq.Enqueue(item, source.Token).ContinueWith(t => source.Dispose());
}
public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items)
{
if (items != null)
{
foreach (var item in items)
{
await aq.Enqueue(item, CancellationToken.None);
}
}
}
public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items, int millisecondsTimeout)
{
if (items != null)
{
using (var source = new CancellationTokenSource(millisecondsTimeout))
{
foreach (var item in items)
{
await aq.Enqueue(item, CancellationToken.None);
}
}
}
}
public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items, TimeSpan timeout)
{
if (items != null)
{
using (var source = new CancellationTokenSource(timeout))
{
foreach (var item in items)
{
await aq.Enqueue(item, CancellationToken.None);
}
}
}
}
}
十一、
現(xiàn)在來測(cè)試一下
為了方便觀察測(cè)試結(jié)果拴签,先寫一個(gè)將結(jié)果改為彩色的類孝常,并且是異步的,不影響測(cè)試代碼
static class ColorConsole
{
public static void WriteLine(string value, ConsoleColor? backgroundColor = null, ConsoleColor? foregroundColor = null)
{
Task.Run(() =>
{
lock (typeof(Console))
{
Console.ResetColor();
if (backgroundColor != null)
{
Console.BackgroundColor = backgroundColor.Value;
}
if (foregroundColor != null)
{
Console.ForegroundColor = foregroundColor.Value;
}
Console.WriteLine(value);
}
});
}
}
發(fā)送者:
class Sender
{
private readonly int _index;
private readonly IAsyncQueue<string> _queue;
private readonly ConsoleColor _background;
public Sender(int index, IAsyncQueue<string> queue, ConsoleColor background)
{
_index = index;
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
_background = background;
}
public async Task Send(string message)
{
ColorConsole.WriteLine($"{_index}號(hào)發(fā)送者寫入{message}", backgroundColor: _background);
await Task.Delay(100 + Math.Abs(new object().GetHashCode() % 300)); // 加入延遲模擬實(shí)際場(chǎng)景
await _queue.Enqueue(message); // 關(guān)鍵代碼
}
}
接收者
class Receiver
{
private readonly int _index;
private readonly IAsyncQueue<string> _queue;
private readonly ConsoleColor _foreground;
public Receiver(int index, IAsyncQueue<string> queue, ConsoleColor foreground)
{
_index = index;
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
_foreground = foreground;
}
public async Task Receive(CancellationToken token)
{
try
{
while (true)
{
var str = await _queue.Dequeue(token); // 關(guān)鍵代碼
ColorConsole.WriteLine($"{_index}號(hào)接收者獲取到:{str}", foregroundColor: _foreground);
await Task.Delay(100 + Math.Abs(new object().GetHashCode() % 300)); // 加入延遲蚓哩,模擬實(shí)際場(chǎng)景
}
}
catch (OperationCanceledException)
{
ColorConsole.WriteLine($"{_index}號(hào)接收者關(guān)閉", foregroundColor: _foreground);
}
}
}
測(cè)試類
static void Main(string[] args)
{
var queue = AsyncQueue.Create<string>(); // 初始化異步隊(duì)列
var source = new CancellationTokenSource(); // 初始化取消標(biāo)志
var token = source.Token;
var senders = Enumerable.Range(0, 3).Select(index => new Sender(index, queue, (ConsoleColor)(index+13))).ToArray(); // 初始化3個(gè)發(fā)送者
var receivers = Enumerable.Range(0, 10).Select(index => new Receiver(index, queue, (ConsoleColor)(index + 5))).ToArray(); // 初始化10個(gè)接收者
Parallel.ForEach(receivers, async x => await x.Receive(token)); // 并行啟動(dòng)10個(gè)接收者
Thread.Sleep(1000); // 延遲1秒 等待接收者全部啟動(dòng)完成
var message = 0;
// 并行啟動(dòng)3個(gè)發(fā)送者构灸,每個(gè)發(fā)送者發(fā)送10次,發(fā)送內(nèi)容為從1開始自增的整型數(shù)字岸梨,也就是1~30
Parallel.ForEach(senders, async x =>
{
for (var i = 0; i < 10; i++)
{
await x.Send(Interlocked.Increment(ref message).ToString());
}
});
Console.ReadLine();
source.Cancel(); // 停止所有接收者
Console.ReadLine();
}
十二喜颁、
由于整個(gè)過程都是異步的,所以打印結(jié)果并不會(huì)是順序的
十三曹阔、
十四半开、
如果可以幫到你,別忘了幫我點(diǎn)一下喜歡赃份,讓更多的人看到