一.? pipeline任務(wù)描述
? ? ? ? 在執(zhí)行某序列化任務(wù)的時候,通常可將任務(wù)劃分為多個stage分別進行模塊化,每一個stage可抽象為一個處理方法贼急、一個處理類或一個處理模塊,最簡單一個的流程任務(wù)(無分支)瓤摧,如下圖:
? ? ? ? ? 對于一個完整的處理流程有如下說明:
? ? ? ? ? 1.? 每個處理單元具有業(yè)務(wù)處理原子性特征竿裂,每個處理單元僅處理單一的功能需求
? ? ? ? ? 2.? 每個處理單元可能有多個來自其他單元的輸入數(shù)據(jù)玉吁,也可以向多個其他單元輸出數(shù)據(jù)
? ? ? ? ? 3.? 某單元的輸入可依賴于上一個單元或多個單元的輸出照弥,需等待所有的前置單元都處理完才能執(zhí)行此單元
二.? 可能的執(zhí)行場景
? ? ? ?基于以上說明,可以將pipeline的流程簡單概括為一下幾種可能的執(zhí)行場景:
????樣例01表示最簡單的流線式模塊處理进副,無分支这揣,前一個節(jié)點處理結(jié)束再處理下一個節(jié)點。
????樣例02表示單個節(jié)點可能有0到N個輸入影斑,也可能有1到N個輸出给赞;同時,單節(jié)點的多個輸入可來自不同的節(jié)點的任意輸出或單節(jié)點的多個輸出數(shù)據(jù)流向不同的后置節(jié)點矫户。
? ???樣例03表示pipeline的處理流程節(jié)點不是一棵簡單的樹形結(jié)構(gòu)片迅,可能存在多個根節(jié)點。
三.? 利用JSON對pipeline進行結(jié)構(gòu)描述
? ? ? 如何設(shè)計一種通用的數(shù)據(jù)結(jié)構(gòu)來描述一個pipeline,既可以實現(xiàn)以上樣例的幾種場景皆辽,也可以實現(xiàn)其他更復雜的pipeline場景柑蛇。設(shè)計要點:
? ? ? 1. 對pipline中每個抽象節(jié)點的實例化描述
? ? ? ?2. 對pipeline中每個節(jié)點的模塊參數(shù)的描述(即節(jié)點執(zhí)行需要的配置參數(shù),而非input端口數(shù)據(jù))
? ? ? ?3.?對pipeline中每個節(jié)點的輸入輸入端口的描述
? ? ? ?4.? 對pipeline中每個節(jié)點的對其他節(jié)點輸入輸入端口依賴關(guān)系的描述
????最終的實現(xiàn)方式如下(JSON數(shù)據(jù)):
? ? 其中包括以下幾點:
? ? 1.? pipeline_id 每個pipeline都有一個唯一的ID驱闷,多個pipeline同時獨立運行時會用到
? ? 2.? pipeline_items是一個數(shù)組耻台,里面每一個大括號都是一個特定類型的模塊實例化時需要的信息
? ? 3.? id為某模塊實例化后的ID號,在一個pipeline中此id唯一
? ? 4.? type為此模塊的類型(整個pipeline中涉及到多個類型的模塊)?
? ? 3.? parameters為該模塊需要執(zhí)行時需要的特定參數(shù)空另,每個模塊的parameters定義可能不同
? ? 4.? ?inputs的定義盆耽,inputs中定義了一個模塊所需輸入數(shù)據(jù)的來源,framNodeId和fromOutputId代表某輸入是從哪一個前置模塊的哪一個輸出端口輸出的數(shù)據(jù)
? ? 注:某模塊的輸入端口的描述名稱(input_01扼菠,input_02)需要預(yù)先在模塊對外接口中定義好摄杂,并且一一對應(yīng),否則可能會導致傳遞的數(shù)據(jù)值或者數(shù)據(jù)類型錯亂循榆。
三.? PipelineFramework的實現(xiàn)概要
? ??PipelineFramework程序框架用來加載pipeline JSON析恢,組裝、調(diào)度模塊的順序執(zhí)行冯痢,其實現(xiàn)有以下幾個問題需要解決:
? ?1.? 一個模塊需要執(zhí)行氮昧,如何保證其依賴的所有前置模塊都已經(jīng)執(zhí)行結(jié)束
? ?2.? 如何在內(nèi)存中將每個模塊的每個輸出端口緩存框杜,提供給后置模塊去使用
? ?3.? 如何定義所有模塊統(tǒng)一的接口,然后使PipelineFramework通過統(tǒng)一的方式調(diào)用(類似類的多態(tài)性)
? ??第一個問題的解決:
? ? ?多次全流程模塊掃描袖肥,每一次拿出尚未執(zhí)行并且已經(jīng)滿足所有前置條件的模塊進行運行咪辱,執(zhí)行結(jié)束后將模塊ID和執(zhí)行后的結(jié)果保存到內(nèi)存變量中。當某一次掃描中所有的模塊已經(jīng)被執(zhí)行了則退出掃描椎组,具體代碼如下:
? ??第二個問題的解決:
? ? ?當每個模塊執(zhí)行結(jié)束后的返回結(jié)果是一個map類型的數(shù)據(jù)油狂,key為該模塊輸出端口的ID(如output_01,output_02),value為輸出數(shù)據(jù)(由于python弱類型引用,因此value可以為任意類型),部分代碼如下:
? ??第三個問題的解決:
? ? ? ? ? 1.? 每個模塊都封裝成類寸癌,且包含都兩個方法:__init__方法進行參數(shù)和輸入數(shù)據(jù)的初始化专筷,? module_process進行具體的業(yè)務(wù)處理(如上圖)。
? ? ? ? ? 2.? 通過定義PipelineModuleFactory進行統(tǒng)一的模塊接口獲取和調(diào)用蒸苇。
? ? ? ? ? 3.? 在PipelineFramework中進行統(tǒng)一的調(diào)用執(zhí)行
四.? 后續(xù)的優(yōu)化
? ? ?1. 通過multiprocessing 實現(xiàn)多模塊并行執(zhí)行
? ? ? 2. 模塊運行過程監(jiān)控和重試機制?