Unity實踐—多線程任務隊列實現(xiàn)

Unity 已可使用 Thread稿湿、Task 等處理多線程任務,但缺少成熟的多線程任務隊列工具押赊,所以在此實現(xiàn)一個饺藤,代碼已上傳 Git 項目 GRUnityTools,可直接下載源碼或通過 UPM 使用

本文原地址:Unity實踐—多線程任務隊列實現(xiàn)

實現(xiàn)目標

  1. 串行與并發(fā)隊列

    隊列是首要實現(xiàn)目標流礁,且需要串行與并發(fā)兩種隊列涕俗,以覆蓋不同需求

  2. 同步與異步執(zhí)行

    因任務隊列過多可能阻塞主線程,所以除同步執(zhí)行外還需要多線程異步操作

  3. 主線程同步

    因為有多線程神帅,但 Unity 部分操作只能在主線程執(zhí)行再姑,所以還需要線程同步到主線程

實現(xiàn)方式

  1. Task

    Task 為當前 .Net 提供的實用性最高的多線程接口,可實現(xiàn)任務的監(jiān)控與操縱

  2. TaskScheduler

    Task 專用調度器找御,可更便捷地實現(xiàn) Task 隊列調度

  3. Loom

    Loom 為網絡上廣為流傳的 Unity 中調用主線程的工具類元镀,目前找不到源碼最原始地址,代碼拷貝自知乎

實現(xiàn)過程

方案選擇

最初即決定使用 Task 作為隊列基本單位霎桅,但完全沒有考慮 TaskScheduler栖疑。原計劃手動實現(xiàn)一個調度器,負責保存?zhèn)魅氲?Task 放入隊列滔驶,可設置同步異步遇革,根據(jù)設置實現(xiàn)對隊列的不同操作。后來再研究微軟官方文檔時發(fā)現(xiàn)在其 Task 文檔的示例中有一個 LimitedConcurrencyLevelTaskScheduler 的演示代碼瓜浸,直接通過 TaskScheduler 實現(xiàn)了可控并發(fā)數(shù)量的調度器澳淑,且當設置并發(fā)數(shù)為1時隊列中的任務會逐一按順序執(zhí)行即產生了串行隊列效果

TaskScheduler 有兩種使用方式

方式一:為 TaskFactory 配置 TaskScheduler,通過 TaksFactory 使用配置的調度器啟動 Task

//創(chuàng)建并發(fā)數(shù)32的調度器
LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(32); 
//方式1 
TaskFactory factory = new TaskFactory(scheduler);
factory.StartNew(()=>{
  //執(zhí)行任務
});

方式二:直接使用 Task.Start(TaskFactory) 方法

//創(chuàng)建并發(fā)數(shù)1的調度器(此時即為串行隊列效果)
LimitedConcurrencyLevelTaskScheduler scheduler = new LimitedConcurrencyLevelTaskScheduler(1);
//聲明一個 Task 對象
Task task = new Task(()=>{
  //任務
});
//啟動 Task 指定調度器
task.Start(scheduler);

編寫源碼

創(chuàng)建名為 TaskQueue 的類插佛,添加變量

//根據(jù)需求設置默認并發(fā)數(shù)
private const int DefaultConcurrentCount = 32;
//線程鎖
private static object _lock = new object();
//默認靜態(tài)串行隊列對象
private static TaskQueue _defaultSerial;
//默認靜態(tài)并發(fā)隊列對象
private static TaskQueue _defaultConcurrent;
//持有的調度器
private LimitedConcurrencyLevelTaskScheduler _scheduler;  

//提供默認串行隊列
public static TaskQueue DefaultSerailQueue
{
    get
    {
        if (_defaultSerial == null)
        {
            lock (_lock)
            {
                if (_defaultSerial == null)
                {
                    _defaultSerial = new TaskQueue(1);
                }
            }
        }
        return _defaultSerial;
    }
}

//提供默認并發(fā)隊列
public static TaskQueue DefaultConcurrentQueue
{
    get
    {
        if (_defaultConcurrent == null)
        {
            lock (_lock)
            {
                if (_defaultConcurrent == null)
                {
                    _defaultConcurrent = new TaskQueue(DefaultConcurrentCount);
                }
            }
        }
        return _defaultConcurrent;
    }
}

提供快捷構造方法

//默認構造方法杠巡,因 Loom 為 UnityEngine.Monobehaviour對象,所以必須執(zhí)行初始化方法將其加入場景中
public TaskQueue(int concurrentCount)
{
    _scheduler = new LimitedConcurrencyLevelTaskScheduler(concurrentCount);
    Loom.Initialize();
}
//創(chuàng)建串行隊列
public static TaskQueue CreateSerialQueue()
{
    return new TaskQueue(1);
}
//創(chuàng)建并發(fā)隊列
public static TaskQueue CreateConcurrentQueue()
{
    return new TaskQueue(DefaultConcurrentCount);
}

下面是各種同步雇寇、異步氢拥、主線程執(zhí)行方法蚌铜,方法會將執(zhí)行的 Task 返回,以便在實際使用中需要對 Task 有其他操作

需注意 RunSyncOnMainThreadRunAsyncOnMainThread 為獨立的執(zhí)行系統(tǒng)與隊列無關嫩海,若在主線程執(zhí)行方法直接在主線程同步執(zhí)行即可

//異步執(zhí)行冬殃,因Task本身會開啟新線程所以直接調用即可
public Task RunAsync(Action action)
{
    Task t = new Task(action);
    t.Start(_scheduler);
    return t;
}

//同步執(zhí)行,使用 Task 提供的 RunSynchronously 方法
public Task RunSync(Action action)
{
    Task t = new Task(action);
    t.RunSynchronously(_scheduler);
    return t;
}

//同步執(zhí)行主線程方法
//為避免主線程調用該方法所以需要判斷當前線程叁怪,若為主線程則直接執(zhí)行审葬,防止死鎖  
//為保證線程同步,此處使用信號量奕谭,僅在主線程方法執(zhí)行完成后才會釋放信號
public static void RunSyncOnMainThread(Action action)
{
    if (Thread.CurrentThread.ManagedThreadId == 1)
    {
        action();
    }
    else
    {
        Semaphore sem = new Semaphore(0, 1);
        Loom.QueueOnMainThread((o => { 
            action();
            sem.Release();
        }), null);
        sem.WaitOne();
    }
}

//因 Loom 本身即為不會立即執(zhí)行方法涣觉,所以直接調用即可
public static void RunAsyncOnMainThread(Action action)
{
    Loom.QueueOnMainThread((o => { action(); }), null);
}

擴展延遲執(zhí)行方法,因延遲本身為異步操作血柳,所以只提供異步執(zhí)行方式

// 此處使用async官册、await 關鍵字實現(xiàn)延遲操作, delay 為秒难捌,Task.Delay 參數(shù)為毫秒
public Task RunAsync(Action action, float delay)
{
    Task t = Task.Run(async () =>
    {
        await Task.Delay((int) (delay * 1000));
        return RunAsync(action);
    });
    return t;
}

實現(xiàn)效果

并發(fā)隊列異步執(zhí)行
并發(fā)隊列同步執(zhí)行
串行隊列異步執(zhí)行
串行隊列同步執(zhí)行
并發(fā)隊列延遲執(zhí)行
子線程異步執(zhí)行主線程
子線程同步執(zhí)行主線程

到此一個多線程任務隊列工具就完成了膝宁,一般的需求基本可以滿足,后續(xù)還可提供更多擴展功能根吁,如傳參员淫、取消任務等

另外我個人想盡力將這套工具脫離 UnityEngine.Monobehaviour,但目前還沒找到除 Loom 外其他 Unity 獲取主線程的方法满粗,當然 Loom 本身仍然是一個很巧妙的工具

若想了解 LimitedConcurrencyLevelTaskSchedulerLoom 可繼續(xù)想下看

其他

LimitedConcurrencyLevelTaskScheduler

TaskScheduler 為抽象類映皆,想自定義任務調度需繼承該類捅彻,并復寫部分內部調度方法

LimitedConcurrencyLevelTaskScheduler 步淹,以下簡稱為 LCLTS缭裆,為微軟官方文檔提供的示例代碼澈驼,用于調度任務缝其,控制并發(fā)數(shù)

LCLTS 工作流程
  1. 將 Task 入隊放入鏈表
  2. 判斷當前已執(zhí)行任務數(shù)量内边,若未達到最大值則通過 ThreadPool 分配工作線程執(zhí)行嘴高,并計數(shù)+1
  3. 標記已分匹配的線程并死循環(huán)執(zhí)行任務隊列阳惹,將已執(zhí)行的任務出隊
  4. 任務隊列為空時退出循環(huán),并移除標記
  5. 若有任務想插隊到線程執(zhí)行颠印,先檢查當前線程標記线罕,若無標記則無法執(zhí)行插隊操作,該操作為避免任務占用隊列外繁忙線程
  6. 若插隊成功則檢查該 Task 是否已在隊列中询件,若存在則出隊執(zhí)行宛琅,若不存在則直接執(zhí)行
源碼解釋
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
   //ThreadStatic 線程變量特性,表明是當前線程是否正在處理任務
   [ThreadStatic]
   private static bool _currentThreadIsProcessingItems;

  // 任務隊列红伦,使用鏈表比 List 和 Array 更方便執(zhí)行插隊出隊操作(隊列中不會出現(xiàn)空位)
   private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // 該隊列由 lock(_tasks) 鎖定

   // 最大并發(fā)數(shù)
   private readonly int _maxDegreeOfParallelism;

   // 當前已分配入隊的任務數(shù)量 
   private int _delegatesQueuedOrRunning = 0;

   // 帶并發(fā)數(shù)的構造方法
   public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
   {
       if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
       _maxDegreeOfParallelism = maxDegreeOfParallelism;
   }

   // 將 Task 放入調度隊列
   protected sealed override void QueueTask(Task task)
   {
      //將任務放入列表,檢查當前執(zhí)行數(shù)是否達到最大值箕戳,若未達到則分配線程執(zhí)行陵吸,并計數(shù)+1
       lock (_tasks)
       {
           _tasks.AddLast(task);
           if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
           {
               ++_delegatesQueuedOrRunning;
               NotifyThreadPoolOfPendingWork();
           }
       }
   }

   // 使用 ThreadPool 將 Task 分配到工作線程
   private void NotifyThreadPoolOfPendingWork()
   {
       ThreadPool.UnsafeQueueUserWorkItem(_ =>
       {
             //標記當前線程正在執(zhí)行任務澳厢,當有 Task 想插入此線程執(zhí)行時會檢查該狀態(tài)
           _currentThreadIsProcessingItems = true;
           try
           {
               // 死循環(huán)處理所有隊列中 Task
               while (true)
               {
                   Task item;
                   lock (_tasks)
                   {
                       // 任務隊列執(zhí)行完后退出循環(huán)剩拢,并將占用標記置為 false
                       if (_tasks.Count == 0)
                       {
                           --_delegatesQueuedOrRunning;
                           break;
                       }

                       // 若還有 Task 則獲取第一個,并出隊
                       item = _tasks.First.Value;
                       _tasks.RemoveFirst();
                   }

                   // 執(zhí)行 Task
                   base.TryExecuteTask(item);
               }
           }
           // 線程占用標記置為 false
           finally { _currentThreadIsProcessingItems = false; }
       }, null);
   }

   // 嘗試在當前線程執(zhí)行指定任務
   protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
   {
       // 若當前線程沒有在執(zhí)行任務則無法執(zhí)行插隊操作
       if (!_currentThreadIsProcessingItems) return false;

       // 若該任務已在隊列中,則出隊
       if (taskWasPreviouslyQueued) 
          // 嘗試執(zhí)行 Task
          if (TryDequeue(task)) 
            return base.TryExecuteTask(task);
          else
             return false; 
       else 
          return base.TryExecuteTask(task);
   }

   // 嘗試將已調度的 Task 移出調度隊列
   protected sealed override bool TryDequeue(Task task)
   {
       lock (_tasks) return _tasks.Remove(task);
   }

   // 獲取最大并發(fā)數(shù)
   public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }

   // 獲取已調度任務隊列迭代器
   protected sealed override IEnumerable<Task> GetScheduledTasks()
   {
       bool lockTaken = false;
       try
       {
             // Monitor.TryEnter 作用為線程鎖,其語法糖為 lock (_tasks)
           Monitor.TryEnter(_tasks, ref lockTaken);
           if (lockTaken) return _tasks;
           else throw new NotSupportedException();
       }
       finally
       {
           if (lockTaken) Monitor.Exit(_tasks);
       }
   }
}

Loom

Loom 通過繼承 UnityEngine.MonoBehaviour需曾,使用 Unity 主線程生命周期 Update 在主線程執(zhí)行方法呆万,同時 Loom 也支持簡單的多線程異步執(zhí)行

Loom 結構和流程

Loom 包含兩個隊列有延遲方法隊列和無延遲方法隊列,兩條隊列方法都可執(zhí)行傳參方法

  1. Actionparam 以及延遲時間打包入結構體放入延遲或無延遲隊列
  2. 若為延遲任務逃顶,則使用 Time.time 獲取添加任務的時間加上延遲時間得到預定執(zhí)行時間打包入延遲任務結構體并入隊
  3. 待一個 Update 周期執(zhí)行,清空執(zhí)行隊列舊任務盈蛮,取出無延遲隊列所有對象抖誉,放入執(zhí)行隊列旁理,清空無延遲隊列孽文,遍歷執(zhí)行執(zhí)行隊列任務
  4. 同一個 Update 周期,清空延遲執(zhí)行隊列舊任務减牺,取出預計執(zhí)行時間小于等于當前時間的任務,放入延遲執(zhí)行隊列,將取出的任務移出延遲隊列,遍歷執(zhí)行延遲執(zhí)行隊列任務
Loom 的使用

用戶可將 Loom 腳本掛載在已有對象上憨琳,也可直接代碼調用方法,Loom 會自動在場景中添加一個不會銷毀的 Loom 單例對象

代碼中使用 QueueOnMainThread 將延遲和無延遲方法加入主線程隊列遍略, RunAsync 執(zhí)行異步方法

public class Loom :MonoBehaviour
{
    public static int maxThreads = 8;
    static int numThreads;

    private static Loom _current;
    //private int _count;
    public static Loom Current
    {
        get
        {
            Initialize();
            return _current;
        }
    }

    void Awake()
    {
        _current = this;
        initialized = true;
    }

    static bool initialized;

    [RuntimeInitializeOnLoadMethod]
    public static void Initialize()
    {
        if (!initialized)
        {

            if (!Application.isPlaying)
                return;
            initialized = true;
            var g = new GameObject("Loom");
            _current = g.AddComponent<Loom>();
#if !ARTIST_BUILD
            UnityEngine.Object.DontDestroyOnLoad(g);
#endif
        }
    }
    public struct NoDelayedQueueItem
    {
        public Action<object> action;
        public object param;
    }

    private List<NoDelayedQueueItem> _actions = new List<NoDelayedQueueItem>();
    public struct DelayedQueueItem
    {
        public float time;
        public Action<object> action;
        public object param;
    }
    private List<DelayedQueueItem> _delayed = new List<DelayedQueueItem>();

    List<DelayedQueueItem> _currentDelayed = new List<DelayedQueueItem>();

    public static void QueueOnMainThread(Action<object> taction, object tparam)
    {
        QueueOnMainThread(taction, tparam, 0f);
    }
    public static void QueueOnMainThread(Action<object> taction, object tparam, float time)
    {
        if (time != 0)
        {
            lock (Current._delayed)
            {
                Current._delayed.Add(new DelayedQueueItem { time = Time.time + time, action = taction, param = tparam });
            }
        }
        else
        {
            lock (Current._actions)
            {
                Current._actions.Add(new NoDelayedQueueItem { action = taction, param = tparam });
            }
        }
    }

    public static Thread RunAsync(Action a)
    {
        Initialize();
        while (numThreads >= maxThreads)
        {
            Thread.Sleep(100);
        }
        Interlocked.Increment(ref numThreads);
        ThreadPool.QueueUserWorkItem(RunAction, a);
        return null;
    }

    private static void RunAction(object action)
    {
        try
        {
            ((Action)action)();
        }
        catch
        {
        }
        finally
        {
            Interlocked.Decrement(ref numThreads);
        }

    }


    void OnDisable()
    {
        if (_current == this)
        {

            _current = null;
        }
    }



    // Use this for initialization
    void Start()
    {

    }

    List<NoDelayedQueueItem> _currentActions = new List<NoDelayedQueueItem>();

    // Update is called once per frame
    void Update()
    {
        if (_actions.Count > 0)
        {
            lock (_actions)
            {
                _currentActions.Clear();
                _currentActions.AddRange(_actions);
                _actions.Clear();
            }
            for (int i = 0; i < _currentActions.Count; i++)
            {
                _currentActions[i].action(_currentActions[i].param);
            }
        }

        if (_delayed.Count > 0)
        {
            lock (_delayed)
            {
                _currentDelayed.Clear();
                _currentDelayed.AddRange(_delayed.Where(d => d.time <= Time.time));
                for (int i = 0; i < _currentDelayed.Count; i++)
                {
                    _delayed.Remove(_currentDelayed[i]);
                }
            }

            for (int i = 0; i < _currentDelayed.Count; i++)
            {
                _currentDelayed[i].action(_currentDelayed[i].param);
            }
        }
    }
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末履因,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子霞篡,更是在濱河造成了極大的恐慌,老刑警劉巖余掖,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件仅醇,死亡現(xiàn)場離奇詭異粉洼,居然都是意外死亡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進店門淌山,熙熙樓的掌柜王于貴愁眉苦臉地迎上來顺少,“玉大人梅猿,你說我怎么就攤上這事。” “怎么了?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵暇昂,是天一觀的道長。 經常有香客問我名段,道長嗅定,這世上最難降的妖魔是什么碎乃? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮梗掰,結果婚禮上嵌言,老公的妹妹穿的比我還像新娘。我一直安慰自己及穗,他們只是感情好摧茴,可當我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著埂陆,像睡著了一般苛白。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上焚虱,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天购裙,我揣著相機與錄音,去河邊找鬼著摔。 笑死缓窜,一個胖子當著我的面吹牛,可吹牛的內容都是我干的谍咆。 我是一名探鬼主播禾锤,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼摹察!你這毒婦竟也來了恩掷?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤供嚎,失蹤者是張志新(化名)和其女友劉穎黄娘,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體克滴,經...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡逼争,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了劝赔。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片誓焦。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖着帽,靈堂內的尸體忽然破棺而出杂伟,到底是詐尸還是另有隱情,我是刑警寧澤仍翰,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布赫粥,位于F島的核電站,受9級特大地震影響予借,放射性物質發(fā)生泄漏越平。R本人自食惡果不足惜频蛔,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望秦叛。 院中可真熱鬧帽驯,春花似錦、人聲如沸书闸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽奸焙。三九已至烫扼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間牌借,已是汗流浹背度气。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留膨报,地道東北人磷籍。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像现柠,于是被迫代替她去往敵國和親院领。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內容