[TOC]
計算限制的異步操作(下)
三、Parallel并行循環(huán)
3.1 Parallel說明
靜態(tài)System.Threading.Tasks.Parallel對任務(wù)進行了封裝要糊,其內(nèi)部使用Task對象:
// 并行For循環(huán)
Parallel.For(0, 1000, i => DoWork(i));
// 并行ForEach循環(huán)
Parallel.ForEach(collection, item => DoWork(item));
// 并行執(zhí)行方法
Parallel.Invoke(
() => Method1(),
() => Method2(),
() => Method3());
Parallel的所有方法都會讓調(diào)用線程參與處理身冀,若調(diào)用線程在其它工作線程之前完成工作放椰,它會將自己掛起匈辱,直到所有工作完成。
并行的任何操作拋出未處理的異常球涛,Parallel方法最后都會拋出一個AggregateException異常對象劣针。
Parallel的方法本身也有開銷,委托對象必須分配宾符,而針對每個工作項都要調(diào)用一次這些委托酿秸。為簡單的工作項使用Parallel可能會得不償失灭翔。
3.1魏烫、Parallel方法的使用
Parallel的For、ForEach和Invoke方法都提供了接受一個ParallelOptions對象的重載,ParallelOptions定義如下:
public class ParallelOptions {
public CancellationToken CancellationToken{get;set;} // 默認(rèn)為Token.None
public Int32 MaxDegreeOfParallelism {get;set;} // 可用CPU數(shù)肝箱,默認(rèn)-1
public TaskScheduler TaskScheduler{get;set;} // 默認(rèn)為TaskSchduler.Default
}
For哄褒、ForEach方法有一些重載版本允許傳遞3個委托:
- localInit: 任務(wù)局部初始化委托,要求每個處理工作項的任務(wù)在處理工作項之前調(diào)用煌张;
- body: 主體委托呐赡,處理工作項的委托;
- localFinally:任務(wù)局部終結(jié)委托骏融,任務(wù)處理好派發(fā)給它的所有工作項之后調(diào)用链嘀,即使主體委托代碼引發(fā)一個未處理的異常,也會調(diào)用它档玻。
以下為演示代碼:
static long DirectoryBytes(string path, string searchPattern, SearchOption searchOption)
{
var files = Directory.EnumerateFiles(path, searchPattern, searchOption);
long masterTotal = 0;
ParallelLoopResult result = Parallel.ForEach<String, long>(
files,
() => {// localInit :每個任務(wù)開始之前調(diào)用一次
// 每個任務(wù)開始之前怀泊,總計值都初始化為0
return 0; // 將 taskLocalTotal 初始值設(shè)置為0
},
(file, loopState, index, taskLocalTotal) => { // body:每個工作項調(diào)用一次
// 獲得這個文件的大小,把它添加到這個任務(wù)的累加值上
long fileLength = 0;
FileStream fs = null;
try
{
fs = File.OpenRead(file);
fileLength = fs.Length;
}
catch (IOException) { /*忽略拒絕訪問的任何文件*/ }
finally
{
if (fs != null) fs.Dispose();
}
return taskLocalTotal + fileLength;
},
taskLocalTotal => { // localFinally:每個任務(wù)完成時調(diào)用一次
// 將這個任務(wù)的總計值(taskLocalTotal)加到總的總計值(masterTotal)上
Interlocked.Add(ref masterTotal, taskLocalTotal);
});
return masterTotal;
}
每個任務(wù)都通過 taskLocalTotal 變量為分配給它的文件維護它自己的總計值误趴;
每個任務(wù)在完成之后都通過調(diào)用 Interlocked.Add 方法以一種線程安全的方式更新總的總計值(masterTotal)霹琼。
3.2、ParallelLoopState 和 ParallelLoopResult
3.2.1 ParallelLoopState
上例中的 body 委托主體中存在一個 ParallelLoopState 參數(shù)對象,每個工作任務(wù)都將獲得它自己的ParallelLoopState對象枣申,并可通過該對象與其它任務(wù)進行交互售葡。該參數(shù)的主體定義如下:
public class ParallelLoopState {
public void Stop();
public Boolean IsStopped { get; }
public void Break();
public Int64? LowestBreakIteration { get; }
public Boolean IsExceptional { get; }
public Boolean ShouldExitCurrentIteration { get; }
}
成員解釋如下:
- Stop:循環(huán)停止處理任何更多的任務(wù),未來對IsStopped屬性的查詢會返回true忠藤;
- Break:循環(huán)不再繼續(xù)處理當(dāng)前項之后的項挟伙;
- LowestBreakIteration:返回在處理過程中調(diào)用過Break方法的最低的項,默認(rèn)為null模孩;
- IsExceptional:處理任何一項時像寒,若造成未處理的異常,則返回true瓜贾;
- ShouldExitCurrentIteration:判斷當(dāng)前項是否應(yīng)該提前退出诺祸,若調(diào)用過 Stop、Break祭芦,或被取消筷笨,或發(fā)生未處理異常,則返回true龟劲;
Break說明:加入ForEach要處理100項胃夏,在第五項時調(diào)用了Break,那么循環(huán)會確保前5項處理好之后ForEach才返回昌跌。由于并行循環(huán)是無序的仰禀,第5項之后的項可能在以前就處理完畢了。
3.2.2 ParallelLoopResult
Parallel的For蚕愤、ForEach方法都返回一個ParallelLoopResult實例答恶,可檢查該實例的相關(guān)屬性來了解循環(huán)結(jié)果,該結(jié)構(gòu)的定義如下:
public struct ParallelLoopResult {
public Boolean IsCompleted { get; } // 如果操作提前終止則返回 false
public Int64? LowestBreakIteration { get; }
}
其IsCompleted返回true萍诱,則表明循環(huán)運行完成悬嗓,所有項都得到了正確處理。若返回false:
- LowestBreakIteration為null:處理某個工作項的線程調(diào)用了Stop裕坊;
- LowestBreakIteration返回int64:處理某個工作項的線程調(diào)用了Break包竹,且返回了得到處理的最低一項的索引。
四 定時器
4.1 說明
命名空間:System.Threading.Timer
定義:
public sealed class Timer : MarshalByRefObject, IDisposable {
public Timer(TimerCallback callback,
Object state,
Int32/Uint32/TimeSpan dueTime,
Int32/Uint32/TimeSpan period);
public Boolean Change(Int32/UInt32/Int64/TimeSpan dueTime,
Int32/UInt32/Int64/TimeSpan period);
public Boolean Dispose();
public Boolean Dispose(WaitHandle notifyObject);
}
其中籍凝,TimerCallBack的定義如下
delegate void TimerCallBack(Object state);
Timer構(gòu)造器的參數(shù)說明:
- dueTime:告訴CLR在首次調(diào)用回調(diào)方法前需要等待多少毫秒周瞎,0表示立即調(diào)用, Timeout.Infinite(-1)表示暫不啟動;
- period:每次調(diào)用回調(diào)方法之前要等待多少毫秒饵蒂,Timeout.Infinite(-1)表示線程池線程只調(diào)用回調(diào)方法一次声诸。
在內(nèi)部,線程池為所有的Timer對象只使用了一個線程苹享。該線程知道下一個Timer對象在什么時候到期(計時器還有多久觸發(fā))双絮。下一個Timer對象到期時浴麻,線程就會喚醒,在內(nèi)部調(diào)用ThreadPool.QueueUserWorkItem囤攀,將一個工作項添加到線程池的隊列中來調(diào)用回調(diào)方法软免。
【注意】若回調(diào)方法執(zhí)行時間過長,可能上個回調(diào)還沒完成而計時器再次觸發(fā)焚挠,會造成多個線程池線程同時執(zhí)行你的回調(diào)方法膏萧。
解決方案:
- 構(gòu)造Timer時,將period參數(shù)指定為 Timeout.Infinite蝌衔,使計時器只觸發(fā)一次榛泛;
- 在回調(diào)方法中,調(diào)用Change來指定一個新的 dueTime噩斟,并再次為 period 參數(shù)指定 Timeout.Infinite曹锨。
Change方法的簽名如下:
public Boolean Change(TimeSpan duetime, TimeSpan period);
完整的Demo如下
private static Timer s_timer;
public static void Main(string[] args) {
s_timer = new Timer(Status, null, Timeout.Infinite, Timeout.Infinite);
// 啟動定時器
s_timer.Change(0, Timeout.Infinite);
Console.ReadLine();
}
private static void Status(Object state) {
Console.WriteLine("In Status at {0}", DateTime.Now);
// Do Some Works in here
s_timer.Change(2000, Timeout.Infinite);
}
也可以使用以下方式完成定時器等效功能:
while(true){
// Do some workds in here
await Task.Delay(2000);
}
4.2 FCL提供的多種計時器
- System.Threading 的 Timer 類;
- 在線程池線程上執(zhí)行定時后臺任務(wù)剃允,最好用的計時器沛简;
- System.Windows.Forms 的 Timer 類:
- 將一個計時器和調(diào)用線程關(guān)聯(lián)。當(dāng)計時器觸發(fā)時斥废,Windows將一條計時器消息(WM_TIMER)注入線程的消息隊列椒楣。線程需要執(zhí)行一個消息泵來提取這些消息,并把它們派發(fā)給需要的回調(diào)方法牡肉。
- 所有工作只由一個線程完成捧灰,設(shè)置計時器的線程就是執(zhí)行回調(diào)方法的線程。
- 該 Timer 用于 WinForm 應(yīng)用程序统锤;
- System.Windows.Threading 的 DispatcherTimer 類:
- 原理同 System.Windows.Forms.Timer 類毛俏;
- 該 Timer 用于 Silverlight 和 WPF 應(yīng)用程序;
- Windows.UI.Xaml 的 DispatcherTimer 類:
- 原理同 System.Windows.Forms.Timer 類跪另;
- 該 Timer 用于 Windows Store 應(yīng)用程序拧抖;
- System.Timers 的 Timer 類:
- 微軟最初的 Timer 類,是微軟還沒理清線程處理和計時器的時候添加到FCL中的免绿,winform中的Timer控件。
五擦盾、線程池如何管理線程
5.1 線程池基礎(chǔ)
創(chuàng)建和銷毀線程是一個昂貴的操作嘲驾,要耗費大量時間,太多的線程也會浪費內(nèi)存資源迹卢,調(diào)度線程時會執(zhí)行上下文切換影響性能辽故。每個CLR管理一個線程池,這個線程池供CLR控制的所有AppDomain共享腐碱。若一個進程中加載了多個CLR誊垢,那么每個CLR都有它自己的線程池掉弛。
CLR初始化時,線程池中沒有線程喂走。線程池維護了一個請求隊列殃饿,應(yīng)用每執(zhí)行一個異步操作時,就調(diào)用某個方法芋肠,將一個記錄項(entry)追加到線程池的隊列中乎芳。線程池的代碼從這個隊列中提取記錄項并派發(fā)(dispatch)給一個線程池線程。
線程池線程處理完畢后會回到線程池中進入空閑狀態(tài)帖池,等待新的任務(wù)奈惑。一段時間之后,線程池線程會醒來自己終止自己以釋放資源睡汹。
隨著CLR版本的發(fā)布肴甸,線程池管理工作線程的內(nèi)部實現(xiàn)發(fā)生了很多變化。開發(fā)者使用線程池囚巴,最好將它看做一個黑盒雷滋。
線程的執(zhí)行上下文
每個線程都關(guān)聯(lián)了一個執(zhí)行上下文的數(shù)據(jù)結(jié)構(gòu):
- 安全設(shè)置:包括壓縮棧、Thread的Principal屬性和windows身份文兢;
- 宿主設(shè)置:參見 System.Threading.HostExecutionContextManager晤斩;
- 邏輯調(diào)用上下文數(shù)據(jù):參見System.Runtime.Remoting.Messaging.CallContext的LogicalSetData和LogicalGetData方法;
線程執(zhí)行它的代碼時姆坚,一些操作會受到線程執(zhí)行上下文設(shè)置(尤其是安全設(shè)置)的影響澳泵。理想情況下,每當(dāng)一個線程使用另一個線程執(zhí)行任務(wù)時兼呵,前者的執(zhí)行上下文應(yīng)該流向(復(fù)制)輔助線程兔辅。這樣確保了輔助線程執(zhí)行的任何操作使用的是相同的安全設(shè)置和宿主設(shè)置,以及初始線程的邏輯調(diào)用上下文中存儲的任何數(shù)據(jù)都適用于輔助線程击喂。
由于執(zhí)行上下文中包含大量信息维苔,而收集這些信息,再把它們賦值到輔助線程要耗費不少時間懂昂。若輔助線程中又啟用了更多的輔助線程介时,則必須創(chuàng)建和初始化更多的執(zhí)行上下文。
System.Threading.ExecutionContext類凌彬,允許控制線程的執(zhí)行上下文如何從一個線程“流”向另一個沸柔。可使用該類組織執(zhí)行上下文的流動以提升程序性能铲敛。其常用類型定義如下:
public sealed class ExecutionContext : IDisposable, ISerializable {
[SecurityCritical]
public static AsyncFlowControl SuppressFlow();
public static void RestoreFlow();
public static Boolean IsFlowSuppressed();
...
}
ExecutionContext 可以阻止上下文流動以提升性能褐澎,對于客戶端來說性能提升不了多少,但對服務(wù)器應(yīng)用程序來說伐蒋,性能的提升會非常顯著工三。另外迁酸,由于SuppressFlow()方法使用了[SecurityCritical]特性,某些客戶端應(yīng)用程序俭正,如Silverlight奸鬓,是無法調(diào)用的。
static void Main(string[] args)
{
// 將一些數(shù)據(jù)放到Main線程的邏輯調(diào)用上下文中
CallContext.LogicalSetData("Name", "Jimmie");
// 初始化要由一個線程池線程做的一些工作段审,線程池線程能訪問邏輯調(diào)用上下文數(shù)據(jù)
ThreadPool.QueueUserWorkItem(state => Console.WriteLine("Name={0}", CallContext.LogicalGetData("Name")));
// 阻止Main線程的執(zhí)行上下文流動
ExecutionContext.SuppressFlow();
// 初始化要由線程池線程做的工作全蝶,線程池線程不能訪問邏輯調(diào)用上下文數(shù)據(jù)
ThreadPool.QueueUserWorkItem(state => Console.WriteLine("Name={0}", CallContext.LogicalGetData("Name")));
// 恢復(fù)Main線程的執(zhí)行上下文流動
ExecutionContext.RestoreFlow();
Console.ReadLine();
}
/*
輸出如下:
Name=Jimmie
Name=
*/
只有在輔助線程不需要訪問上下文信息時,才應(yīng)該阻止上下文的流動寺枉。若初始線程的執(zhí)行上下文不流向輔助線程抑淫,輔助線程就會使用上一次和它關(guān)聯(lián)的任意執(zhí)行上下文,此時姥闪,輔助線程的代碼不應(yīng)該依賴任何執(zhí)行上下文狀態(tài)的代碼(如用戶的Windows身份)始苇。
5.2 設(shè)置線程池限制
System.Threading.ThreadPool類提供了幾個靜態(tài)方法用于查詢和設(shè)置線程池的線程數(shù):
- GetMaxThreads
- SetMaxThreads
- GetMinThreads
- SetMinThreads
建議不要限制線程的數(shù)量,尤其是設(shè)置線程上限筐喳,這樣可能發(fā)生饑餓或死鎖催式。
如設(shè)置線程池線程最大為1000個,假定隊列中有1000個工作項都因為一個事件而堵塞避归,只有等待第1001個工作項才會解鎖荣月。而1001永遠(yuǎn)不會被創(chuàng)建,此時1000個線程全部堵塞梳毙,進程只能被迫中止哺窄。
每個線程的創(chuàng)建都要為其用戶模式棧和線程環(huán)境塊(TEB)準(zhǔn)備超過1MB的內(nèi)存。32位的進程中账锹,最多能有1360個線程萌业,視圖創(chuàng)建更多的線程會拋出OutOfMemoryException。64位進程提供了8TB的地址空間奸柬,理論上可以創(chuàng)建千百萬個線程生年。
而CLR對線程線程的數(shù)量存在一個默認(rèn)的最大值,現(xiàn)在大約1000左右廓奕。
5.3 如何管理工作者線程
ThreadPool.QueueUserWorkItem 方法和 Timer 類總是將工作項放到全局隊列中抱婉。工作者線程采用一個先入先出(First-in First-out,FIFO)算法將工作項從這個隊列中取出,并處理它們懂从。由于多個工作者線程可能同時從全局隊列中拿走工作項授段,所以所有工作者線程都競爭一個線程同步鎖,以保證兩個或多個線程不會獲取同一個工作項番甩。這個線程同步鎖在某些應(yīng)用程序中可能成為瓶頸,對伸縮性和性能造成某種程度的限制届搁。
以默認(rèn)的 TaskScheduler 調(diào)度 Task 對象的方式(其他 TaskScheduler 派生對象的行為可能和這里描述的不同)缘薛。
- 非工作者線程調(diào)度一個 Task 時窍育,該 Task 被添加到全局隊列。
- 工作者線程調(diào)度一個 Task 時宴胧,由于工作者線程都有自己的本地隊列漱抓,該 Task 被添加到調(diào)用線程的本地隊列。
暫且將非工作者線程理解成UI線程恕齐。
工作者線程準(zhǔn)備好處理工作項時乞娄,它總是先檢查本地隊列來查找一個 Task。存在一個 Task显歧,工作者線程就從本地隊列中移除 Task 并處理工作項仪或。要注意的是,工作者線程采用后入先出(LIFO)算法將任務(wù)從本地隊列取出士骤。由于工作者線程是唯一允許訪問它自己的本地隊列頭的線程范删,所以無需同步鎖,而且在隊列中添加和刪除 Task 的速度非晨郊。快到旦。這個行為的副作用是 Task 按照進入隊列時相反的順序執(zhí)行。
線程池從來不保證排隊中的工作項的處理順序巨缘。
如果工作者線程發(fā)現(xiàn)它的本地隊列變空了添忘,會嘗試從另一個工作者線程的本地隊列“偷”一個 Task。這個 Task 是從本地隊列的尾部“偷”走的若锁,并要求獲取一個線程同步鎖搁骑,這對性能有少許影響。當(dāng)然拴清,希望這種“偷盜”行為很少發(fā)生靶病,從而很少需要線程同步鎖。如果所有本地隊列都變空口予,那么工作者線程就會使用FIFO算法娄周,從全局隊列提取一個工作項(取得他的鎖)。如果全局隊列也為空沪停,工作者線程就會進入睡眠狀態(tài)煤辨,等待事情的發(fā)生。如果睡眠時間過長木张,它會自己醒來并銷毀自身众辨,允許系統(tǒng)回收線程使用的資源(內(nèi)核對象、棧舷礼、TEB等)鹃彻。
線程池會快速創(chuàng)建工作者線程,使工作者線程的數(shù)量等于傳給 ThreadPool 的 SetMinThreads 方法的值妻献。如果從不調(diào)用這個方法(也建議永遠(yuǎn)不要調(diào)用這個方法)蛛株,那么默認(rèn)的值等于你的進程允許使用的CPU數(shù)量团赁,這是由進程的 affinity mask(關(guān)聯(lián)掩碼)決定的。通常谨履,你的進程允許使用機器上所有的CPU欢摄,所以線程池創(chuàng)建的工作者線程數(shù)量很快就會達(dá)到機器的CPU數(shù)。創(chuàng)建了這么多的CPU數(shù)量的線程后笋粟,線程池會監(jiān)視工作項的完成速度怀挠。如果工作項完成的時間太長(具體多長時間沒有正式公布),線程池會創(chuàng)建更多的工作者線程害捕,如果工作項完成的速度開始變快绿淋,工作者線程就會被銷毀。