Unity 已可使用 Thread稿湿、Task 等處理多線程任務,但缺少成熟的多線程任務隊列工具押赊,所以在此實現(xiàn)一個饺藤,代碼已上傳 Git 項目 GRUnityTools,可直接下載源碼或通過 UPM 使用
本文原地址:Unity實踐—多線程任務隊列實現(xiàn)
實現(xiàn)目標
-
串行與并發(fā)隊列
隊列是首要實現(xiàn)目標流礁,且需要串行與并發(fā)兩種隊列涕俗,以覆蓋不同需求
-
同步與異步執(zhí)行
因任務隊列過多可能阻塞主線程,所以除同步執(zhí)行外還需要多線程異步操作
-
主線程同步
因為有多線程神帅,但 Unity 部分操作只能在主線程執(zhí)行再姑,所以還需要線程同步到主線程
實現(xiàn)方式
-
Task
Task 為當前 .Net 提供的實用性最高的多線程接口,可實現(xiàn)任務的監(jiān)控與操縱
-
TaskScheduler
Task 專用調度器找御,可更便捷地實現(xiàn) Task 隊列調度
-
Loom
Loom 為網絡上廣為流傳的 Unity 中調用主線程的工具類元镀,目前找不到源碼最原始地址,代碼拷貝自知乎
實現(xiàn)過程
方案選擇
最初即決定使用 Task 作為隊列基本單位霎桅,但完全沒有考慮 TaskScheduler
栖疑。原計劃手動實現(xiàn)一個調度器,負責保存?zhèn)魅氲?Task 放入隊列滔驶,可設置同步異步遇革,根據(jù)設置實現(xiàn)對隊列的不同操作。后來再研究微軟官方文檔時發(fā)現(xiàn)在其 Task 文檔的示例中有一個 LimitedConcurrencyLevelTaskScheduler
的演示代碼瓜浸,直接通過 TaskScheduler
實現(xiàn)了可控并發(fā)數(shù)量的調度器澳淑,且當設置并發(fā)數(shù)為1時隊列中的任務會逐一按順序執(zhí)行即產生了串行隊列效果
TaskScheduler
有兩種使用方式
方式一:為 TaskFactory
配置 TaskScheduler,通過 TaksFactory
使用配置的調度器啟動 Task
//創(chuàng)建并發(fā)數(shù)32的調度器
LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(32);
//方式1
TaskFactory factory = new TaskFactory(scheduler);
factory.StartNew(()=>{
//執(zhí)行任務
});
方式二:直接使用 Task.Start(TaskFactory)
方法
//創(chuàng)建并發(fā)數(shù)1的調度器(此時即為串行隊列效果)
LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(1);
//聲明一個 Task 對象
Task task = new Task(()=>{
//任務
});
//啟動 Task 指定調度器
task.Start(scheduler);
編寫源碼
創(chuàng)建名為 TaskQueue
的類插佛,添加變量
//根據(jù)需求設置默認并發(fā)數(shù)
private const int DefaultConcurrentCount = 32;
//線程鎖
private static object _lock = new object();
//默認靜態(tài)串行隊列對象
private static TaskQueue _defaultSerial;
//默認靜態(tài)并發(fā)隊列對象
private static TaskQueue _defaultConcurrent;
//持有的調度器
private LimitedConcurrencyLevelTaskScheduler _scheduler;
//提供默認串行隊列
public static TaskQueue DefaultSerailQueue
{
get
{
if (_defaultSerial == null)
{
lock (_lock)
{
if (_defaultSerial == null)
{
_defaultSerial = new TaskQueue(1);
}
}
}
return _defaultSerial;
}
}
//提供默認并發(fā)隊列
public static TaskQueue DefaultConcurrentQueue
{
get
{
if (_defaultConcurrent == null)
{
lock (_lock)
{
if (_defaultConcurrent == null)
{
_defaultConcurrent = new TaskQueue(DefaultConcurrentCount);
}
}
}
return _defaultConcurrent;
}
}
提供快捷構造方法
//默認構造方法杠巡,因 Loom 為 UnityEngine.Monobehaviour對象,所以必須執(zhí)行初始化方法將其加入場景中
public TaskQueue(int concurrentCount)
{
_scheduler = new LimitedConcurrencyLevelTaskScheduler(concurrentCount);
Loom.Initialize();
}
//創(chuàng)建串行隊列
public static TaskQueue CreateSerialQueue()
{
return new TaskQueue(1);
}
//創(chuàng)建并發(fā)隊列
public static TaskQueue CreateConcurrentQueue()
{
return new TaskQueue(DefaultConcurrentCount);
}
下面是各種同步雇寇、異步氢拥、主線程執(zhí)行方法蚌铜,方法會將執(zhí)行的 Task 返回,以便在實際使用中需要對 Task 有其他操作
需注意 RunSyncOnMainThread
和 RunAsyncOnMainThread
為獨立的執(zhí)行系統(tǒng)與隊列無關嫩海,若在主線程執(zhí)行方法直接在主線程同步執(zhí)行即可
//異步執(zhí)行冬殃,因Task本身會開啟新線程所以直接調用即可
public Task RunAsync(Action action)
{
Task t = new Task(action);
t.Start(_scheduler);
return t;
}
//同步執(zhí)行,使用 Task 提供的 RunSynchronously 方法
public Task RunSync(Action action)
{
Task t = new Task(action);
t.RunSynchronously(_scheduler);
return t;
}
//同步執(zhí)行主線程方法
//為避免主線程調用該方法所以需要判斷當前線程叁怪,若為主線程則直接執(zhí)行审葬,防止死鎖
//為保證線程同步,此處使用信號量奕谭,僅在主線程方法執(zhí)行完成后才會釋放信號
public static void RunSyncOnMainThread(Action action)
{
if (Thread.CurrentThread.ManagedThreadId == 1)
{
action();
}
else
{
Semaphore sem = new Semaphore(0, 1);
Loom.QueueOnMainThread((o => {
action();
sem.Release();
}), null);
sem.WaitOne();
}
}
//因 Loom 本身即為不會立即執(zhí)行方法涣觉,所以直接調用即可
public static void RunAsyncOnMainThread(Action action)
{
Loom.QueueOnMainThread((o => { action(); }), null);
}
擴展延遲執(zhí)行方法,因延遲本身為異步操作血柳,所以只提供異步執(zhí)行方式
// 此處使用async官册、await 關鍵字實現(xiàn)延遲操作, delay 為秒难捌,Task.Delay 參數(shù)為毫秒
public Task RunAsync(Action action, float delay)
{
Task t = Task.Run(async () =>
{
await Task.Delay((int) (delay * 1000));
return RunAsync(action);
});
return t;
}
實現(xiàn)效果
到此一個多線程任務隊列工具就完成了膝宁,一般的需求基本可以滿足,后續(xù)還可提供更多擴展功能根吁,如傳參员淫、取消任務等
另外我個人想盡力將這套工具脫離 UnityEngine.Monobehaviour,但目前還沒找到除 Loom 外其他 Unity 獲取主線程的方法满粗,當然 Loom 本身仍然是一個很巧妙的工具
若想了解 LimitedConcurrencyLevelTaskScheduler
和 Loom
可繼續(xù)想下看
其他
LimitedConcurrencyLevelTaskScheduler
TaskScheduler 為抽象類映皆,想自定義任務調度需繼承該類捅彻,并復寫部分內部調度方法
LimitedConcurrencyLevelTaskScheduler 步淹,以下簡稱為 LCLTS缭裆,為微軟官方文檔提供的示例代碼澈驼,用于調度任務缝其,控制并發(fā)數(shù)
LCLTS 工作流程
- 將 Task 入隊放入鏈表
- 判斷當前已執(zhí)行任務數(shù)量内边,若未達到最大值則通過
ThreadPool
分配工作線程執(zhí)行嘴高,并計數(shù)+1 - 標記已分匹配的線程并死循環(huán)執(zhí)行任務隊列阳惹,將已執(zhí)行的任務出隊
- 任務隊列為空時退出循環(huán),并移除標記
- 若有任務想插隊到線程執(zhí)行颠印,先檢查當前線程標記线罕,若無標記則無法執(zhí)行插隊操作,該操作為避免任務占用隊列外繁忙線程
- 若插隊成功則檢查該 Task 是否已在隊列中询件,若存在則出隊執(zhí)行宛琅,若不存在則直接執(zhí)行
源碼解釋
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
//ThreadStatic 線程變量特性,表明是當前線程是否正在處理任務
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
// 任務隊列红伦,使用鏈表比 List 和 Array 更方便執(zhí)行插隊出隊操作(隊列中不會出現(xiàn)空位)
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // 該隊列由 lock(_tasks) 鎖定
// 最大并發(fā)數(shù)
private readonly int _maxDegreeOfParallelism;
// 當前已分配入隊的任務數(shù)量
private int _delegatesQueuedOrRunning = 0;
// 帶并發(fā)數(shù)的構造方法
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
// 將 Task 放入調度隊列
protected sealed override void QueueTask(Task task)
{
//將任務放入列表,檢查當前執(zhí)行數(shù)是否達到最大值箕戳,若未達到則分配線程執(zhí)行陵吸,并計數(shù)+1
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
// 使用 ThreadPool 將 Task 分配到工作線程
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
//標記當前線程正在執(zhí)行任務澳厢,當有 Task 想插入此線程執(zhí)行時會檢查該狀態(tài)
_currentThreadIsProcessingItems = true;
try
{
// 死循環(huán)處理所有隊列中 Task
while (true)
{
Task item;
lock (_tasks)
{
// 任務隊列執(zhí)行完后退出循環(huán)剩拢,并將占用標記置為 false
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
// 若還有 Task 則獲取第一個,并出隊
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
// 執(zhí)行 Task
base.TryExecuteTask(item);
}
}
// 線程占用標記置為 false
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
// 嘗試在當前線程執(zhí)行指定任務
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// 若當前線程沒有在執(zhí)行任務則無法執(zhí)行插隊操作
if (!_currentThreadIsProcessingItems) return false;
// 若該任務已在隊列中,則出隊
if (taskWasPreviouslyQueued)
// 嘗試執(zhí)行 Task
if (TryDequeue(task))
return base.TryExecuteTask(task);
else
return false;
else
return base.TryExecuteTask(task);
}
// 嘗試將已調度的 Task 移出調度隊列
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
// 獲取最大并發(fā)數(shù)
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
// 獲取已調度任務隊列迭代器
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
// Monitor.TryEnter 作用為線程鎖,其語法糖為 lock (_tasks)
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken) return _tasks;
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
}
Loom
Loom 通過繼承 UnityEngine.MonoBehaviour需曾,使用 Unity 主線程生命周期 Update
在主線程執(zhí)行方法呆万,同時 Loom 也支持簡單的多線程異步執(zhí)行
Loom 結構和流程
Loom 包含兩個隊列有延遲方法隊列和無延遲方法隊列,兩條隊列方法都可執(zhí)行傳參方法
- 將
Action
和param
以及延遲時間打包入結構體放入延遲或無延遲隊列 - 若為延遲任務逃顶,則使用
Time.time
獲取添加任務的時間加上延遲時間得到預定執(zhí)行時間打包入延遲任務結構體并入隊 - 待一個
Update
周期執(zhí)行,清空執(zhí)行隊列舊任務盈蛮,取出無延遲隊列所有對象抖誉,放入執(zhí)行隊列旁理,清空無延遲隊列孽文,遍歷執(zhí)行執(zhí)行隊列任務 - 同一個
Update
周期,清空延遲執(zhí)行隊列舊任務减牺,取出預計執(zhí)行時間小于等于當前時間的任務,放入延遲執(zhí)行隊列,將取出的任務移出延遲隊列,遍歷執(zhí)行延遲執(zhí)行隊列任務
Loom 的使用
用戶可將 Loom 腳本掛載在已有對象上憨琳,也可直接代碼調用方法,Loom 會自動在場景中添加一個不會銷毀的 Loom 單例對象
代碼中使用 QueueOnMainThread
將延遲和無延遲方法加入主線程隊列遍略, RunAsync
執(zhí)行異步方法
public class Loom :MonoBehaviour
{
public static int maxThreads = 8;
static int numThreads;
private static Loom _current;
//private int _count;
public static Loom Current
{
get
{
Initialize();
return _current;
}
}
void Awake()
{
_current = this;
initialized = true;
}
static bool initialized;
[RuntimeInitializeOnLoadMethod]
public static void Initialize()
{
if (!initialized)
{
if (!Application.isPlaying)
return;
initialized = true;
var g = new GameObject("Loom");
_current = g.AddComponent<Loom>();
#if !ARTIST_BUILD
UnityEngine.Object.DontDestroyOnLoad(g);
#endif
}
}
public struct NoDelayedQueueItem
{
public Action<object> action;
public object param;
}
private List<NoDelayedQueueItem> _actions = new List<NoDelayedQueueItem>();
public struct DelayedQueueItem
{
public float time;
public Action<object> action;
public object param;
}
private List<DelayedQueueItem> _delayed = new List<DelayedQueueItem>();
List<DelayedQueueItem> _currentDelayed = new List<DelayedQueueItem>();
public static void QueueOnMainThread(Action<object> taction, object tparam)
{
QueueOnMainThread(taction, tparam, 0f);
}
public static void QueueOnMainThread(Action<object> taction, object tparam, float time)
{
if (time != 0)
{
lock (Current._delayed)
{
Current._delayed.Add(new DelayedQueueItem { time = Time.time + time, action = taction, param = tparam });
}
}
else
{
lock (Current._actions)
{
Current._actions.Add(new NoDelayedQueueItem { action = taction, param = tparam });
}
}
}
public static Thread RunAsync(Action a)
{
Initialize();
while (numThreads >= maxThreads)
{
Thread.Sleep(100);
}
Interlocked.Increment(ref numThreads);
ThreadPool.QueueUserWorkItem(RunAction, a);
return null;
}
private static void RunAction(object action)
{
try
{
((Action)action)();
}
catch
{
}
finally
{
Interlocked.Decrement(ref numThreads);
}
}
void OnDisable()
{
if (_current == this)
{
_current = null;
}
}
// Use this for initialization
void Start()
{
}
List<NoDelayedQueueItem> _currentActions = new List<NoDelayedQueueItem>();
// Update is called once per frame
void Update()
{
if (_actions.Count > 0)
{
lock (_actions)
{
_currentActions.Clear();
_currentActions.AddRange(_actions);
_actions.Clear();
}
for (int i = 0; i < _currentActions.Count; i++)
{
_currentActions[i].action(_currentActions[i].param);
}
}
if (_delayed.Count > 0)
{
lock (_delayed)
{
_currentDelayed.Clear();
_currentDelayed.AddRange(_delayed.Where(d => d.time <= Time.time));
for (int i = 0; i < _currentDelayed.Count; i++)
{
_delayed.Remove(_currentDelayed[i]);
}
}
for (int i = 0; i < _currentDelayed.Count; i++)
{
_currentDelayed[i].action(_currentDelayed[i].param);
}
}
}
}