基于DAG實(shí)現(xiàn)的任務(wù)編排框架&平臺(tái)

最近在做的工作比較需要一個(gè)支持任務(wù)編排工作流的框架或者平臺(tái),這里記錄下實(shí)現(xiàn)上的一些思路拇涤。

任務(wù)編排工作流

任務(wù)編排是什么意思呢趾访,顧名思義就是可以把"任務(wù)"這個(gè)原子單位按照自己的方式進(jìn)行編排吼驶,任務(wù)之間可能互相依賴。復(fù)雜一點(diǎn)的編排之后就能形成一個(gè) workflow 工作流了少办。我們希望這個(gè)工作流按照我們編排的方式去執(zhí)行每個(gè)原子 task 任務(wù)苞慢。如下圖所示,我們希望先并發(fā)運(yùn)行 Task A 和 Task C英妓,Task A 執(zhí)行完后串行運(yùn)行 Task B挽放,在并發(fā)等待 Task B 和 C 都結(jié)束后運(yùn)行 Task D,這樣就完成了一個(gè)典型的任務(wù)編排工作流蔓纠。


image

DAG 有向無環(huán)圖

首先我們了解圖這個(gè)數(shù)據(jù)結(jié)構(gòu)辑畦,每個(gè)元素稱為頂點(diǎn) vertex,頂點(diǎn)之間的連線稱為邊 edge腿倚。像我們畫的這種帶箭頭關(guān)系的稱為有向圖纯出,箭頭關(guān)系之間能形成一個(gè)環(huán)的成為有環(huán)圖,反之稱為無環(huán)圖敷燎。顯然運(yùn)用在我們?nèi)蝿?wù)編排工作流上暂筝,最合適的是 DAG 有向無環(huán)圖。

我們?cè)诖a里怎么存儲(chǔ)圖呢硬贯,有兩種數(shù)據(jù)結(jié)構(gòu):鄰接矩陣和鄰接表焕襟。

下圖表示一個(gè)有向圖的鄰接矩陣,例如 x->y 的邊饭豹,只需將 Array[x][y]標(biāo)識(shí)為 1 即可鸵赖。

image

此外我們也可以使用鄰接表來存儲(chǔ),這種存儲(chǔ)方式較好地彌補(bǔ)了鄰接矩陣?yán)速M(fèi)空間的缺點(diǎn)拄衰,但相對(duì)來說鄰接矩陣能更快地判斷連通性它褪。

image

一般在代碼實(shí)現(xiàn)上,我們會(huì)選擇鄰接矩陣肾砂,這樣我們?cè)谂袛鄡牲c(diǎn)之間是否有邊更方便點(diǎn)。

一個(gè)任務(wù)編排框架

了解了 DAG 的基本知識(shí)后我們可以來簡單實(shí)現(xiàn)一下宏悦。首先是存儲(chǔ)結(jié)構(gòu)镐确,我們的 Dag 表示一整個(gè)圖,Node 表示各個(gè)頂點(diǎn)饼煞,每個(gè)頂點(diǎn)有其 parents 和 children:

//Dag
public final class DefaultDag<T, R> implements Dag<T, R> {

    private Map<T, Node<T, R>> nodes = new HashMap<T, Node<T, R>>();
    ...
}

//Node
public final class Node<T, R> {
    /**
     * incoming dependencies for this node
     */
    private Set<Node<T, R>> parents = new LinkedHashSet<Node<T, R>>();
    /**
     * outgoing dependencies for this node
     */
    private Set<Node<T, R>> children = new LinkedHashSet<Node<T, R>>();
    ...
}

畫兩個(gè)頂點(diǎn)源葫,以及為這兩個(gè)頂點(diǎn)連邊操作如下:

    public void addDependency(final T evalFirstNode, final T evalLaterNode) {
        Node<T, R> firstNode = createNode(evalFirstNode);
        Node<T, R> afterNode = createNode(evalLaterNode);

        addEdges(firstNode, afterNode);
    }

   private Node<T, R> createNode(final T value) {
        Node<T, R> node = new Node<T, R>(value);
        return node;
    }

    private void addEdges(final Node<T, R> firstNode, final Node<T, R> afterNode) {
        if (!firstNode.equals(afterNode)) {
            firstNode.getChildren().add(afterNode);
            afterNode.getParents().add(firstNode);
        }
    }

到現(xiàn)在我們其實(shí)已經(jīng)把基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)寫好了,但我們作為一個(gè)任務(wù)編排框架最終是需要線程去執(zhí)行的砖瞧,我們把它和線程池一起給包裝一下息堂。

//任務(wù)編排線程池
public class DefaultDexecutor <T, R> {

    //執(zhí)行線程,和2種重試線程
    private final ExecutorService<T, R> executionEngine;
    private final ExecutorService immediatelyRetryExecutor;
    private final ScheduledExecutorService scheduledRetryExecutor;
    //執(zhí)行狀態(tài)
    private final ExecutorState<T, R> state;
    ...
}
//執(zhí)行狀態(tài)
public class DefaultExecutorState<T, R> {
    //底層圖數(shù)據(jù)結(jié)構(gòu)
    private final Dag<T, R> graph;
    //已完成
    private final Collection<Node<T, R>> processedNodes;
    //未完成
    private final Collection<Node<T, R>> unProcessedNodes;
    //錯(cuò)誤task
    private final Collection<ExecutionResult<T, R>> erroredTasks;
    //執(zhí)行結(jié)果
    private final Collection<ExecutionResult<T, R>> executionResults;
}

可以看到我們的線程包括執(zhí)行線程池,2 種重試線程池荣堰。我們使用 ExecutorState 來保存一些整個(gè)任務(wù)工作流執(zhí)行過程中的一些狀態(tài)記錄床未,包括已完成和未完成的 task,每個(gè) task 執(zhí)行的結(jié)果等振坚。同時(shí)它也依賴我們底層的圖數(shù)據(jù)結(jié)構(gòu) DAG薇搁。

接下來我們要做的事其實(shí)很簡單,就是 BFS 這整個(gè) DAG 數(shù)據(jù)結(jié)構(gòu)渡八,然后提交到線程池中去執(zhí)行就可以了啃洋,過程中注意一些節(jié)點(diǎn)狀態(tài)的保持,結(jié)果的保存即可屎鳍。

image

還是以上圖為例宏娄,值得說的一點(diǎn)是在 Task D 這個(gè)點(diǎn)需要有一個(gè)并發(fā)等待的操作,即 Task D 需要依賴 Task B 和 Task C 執(zhí)行結(jié)束后再往下執(zhí)行逮壁。這里有很多辦法孵坚,我選擇了共享變量的方式來完成并發(fā)等待。遍歷工作流中被遞歸的方法的偽代碼如下:

private void doProcessNodes(final Set<Node<T, R>> nodes) {
        for (Node<T, R> node : nodes) {
        //共享變量 并發(fā)等待
        if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents())) {
            Task<T, R> task = newTask(node);
            this.executionEngine.submit(task);
            ...
            ExecutionResult<T, R> executionResult = this.executionEngine.processResult();
            if (executionResult.isSuccess()) {
                state.markProcessingDone(processedNode);
            }
            //繼續(xù)執(zhí)行孩子節(jié)點(diǎn)
            doExecute(processedNode.getChildren());
            ...
        }
    }
}

這樣我們基本完成了這個(gè)任務(wù)編排框架的工作貌踏,現(xiàn)在我們可以如下來進(jìn)行示例圖中的任務(wù)編排以及執(zhí)行:

DefaultExecutor<String, String> executor = newTaskExecutor();
executor.addDependency("A", "B");
executor.addDependency("B", "D");
executor.addDependency("C", "D");
executor.execute();

任務(wù)編排平臺(tái)化

好了現(xiàn)在我們已經(jīng)有一款任務(wù)編排框架了十饥,但很多時(shí)候我們想要可視化、平臺(tái)化祖乳,讓使用者更加無腦逗堵。

框架與平臺(tái)最大的區(qū)別在哪里?是可拖拽的可視化輸入么眷昆?我覺得這個(gè)的復(fù)雜度更多在前端蜒秤。而對(duì)于后端平臺(tái)來講,與框架最大的區(qū)別是數(shù)據(jù)的持久化亚斋。

對(duì)于 DAG 的頂點(diǎn)來說作媚,我們需要將每個(gè)節(jié)點(diǎn) Task 的信息給持久化到關(guān)系數(shù)據(jù)庫中,包括 Task 的狀態(tài)帅刊、輸出結(jié)果等纸泡。而對(duì)于 DAG 的邊來說,我們也得用數(shù)據(jù)庫來存儲(chǔ)各 Task 之間的方向關(guān)系赖瞒。此外女揭,在遍歷執(zhí)行 DAG 的整個(gè)過程中的中間狀態(tài)數(shù)據(jù),我們也得搬運(yùn)到數(shù)據(jù)庫中栏饮。

首先我們可以設(shè)計(jì)一個(gè) workflow 表吧兔,來表示一個(gè)工作流。接著我們?cè)O(shè)計(jì)一個(gè) task 表袍嬉,來表示一個(gè)執(zhí)行單元境蔼。task 表主要字段如下灶平,這里主要是 task_parents 的設(shè)計(jì),它是一個(gè) string箍土,存儲(chǔ) parents 的 taskId逢享,多個(gè)由分隔符分隔。

task_id
workflow_id
task_name
task_status
result
task_parents
image

依賴是上圖這個(gè)例子涮帘,對(duì)比框架來說拼苍,我們首先得將其存儲(chǔ)到數(shù)據(jù)庫中去,最終可能得到如下數(shù)據(jù):

task_id  workflow_id  task_name  task_status  result  task_parents
  1          1           A           0                    -1
  2          1           B           0                    1
  3          1           C           0                    -1
  4          1           D           0                    2,3

可以看到调缨,這樣也能很好地存儲(chǔ) DAG 數(shù)據(jù)疮鲫,和框架中代碼的輸入方式差別并不是很大。

接下來我們要做的是遍歷執(zhí)行整個(gè) workflow弦叶,這邊和框架的差別也不大俊犯。首先我們可以利用select * from task where workflow_id = 1 and task_parents = -1來獲取初始化節(jié)點(diǎn) Task A 和 Task C,將其提交到我們的線程池中伤哺。

接著對(duì)應(yīng)框架代碼中的doExecute(processedNode.getChildren());燕侠,我們使用select * from task where task_parents like %3%,就可以得到 Task C 的孩子節(jié)點(diǎn) Task D立莉,這里使用了模糊查詢是因?yàn)槲覀兊?task_parents 可能是由多個(gè)父親的 taskId 與分隔號(hào)組合而成的字符串绢彤。查詢到孩子節(jié)點(diǎn)后,繼續(xù)提交到線程池即可蜓耻。

別忘了我們?cè)?Task D 這邊還有一個(gè)并發(fā)等待的操作茫舶,對(duì)應(yīng)框架代碼中的if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents()))。這邊我們只要判斷select count(1) from task where task_id in (2,3) and status != 1的個(gè)數(shù)為 0 即可刹淌,即保證 parents task 全部成功饶氏。

另外值得注意的是 task 的重試。在框架中有勾,失敗 task 的重試可以是立即使用當(dāng)前線程重試或者放到一個(gè)定時(shí)線程池中去重試疹启。而在平臺(tái)上,我們的重試基本上來自于用戶在界面上的點(diǎn)擊蔼卡,即主線程喊崖。

至此,我們已經(jīng)將任務(wù)編排框架的功能基本平臺(tái)化了雇逞。作為一個(gè)任務(wù)編排平臺(tái)荤懂,可拖拽編排的可視化輸入、整個(gè)工作流狀態(tài)的可視化展示喝峦、任務(wù)的可人工重試都是其優(yōu)點(diǎn)势誊。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末呜达,一起剝皮案震驚了整個(gè)濱河市谣蠢,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖眉踱,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件挤忙,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡谈喳,警方通過查閱死者的電腦和手機(jī)册烈,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來婿禽,“玉大人赏僧,你說我怎么就攤上這事∨で悖” “怎么了淀零?”我有些...
    開封第一講書人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長膛壹。 經(jīng)常有香客問我驾中,道長,這世上最難降的妖魔是什么模聋? 我笑而不...
    開封第一講書人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任肩民,我火速辦了婚禮,結(jié)果婚禮上链方,老公的妹妹穿的比我還像新娘持痰。我一直安慰自己,他們只是感情好侄柔,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開白布共啃。 她就那樣靜靜地躺著,像睡著了一般暂题。 火紅的嫁衣襯著肌膚如雪移剪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,301評(píng)論 1 301
  • 那天薪者,我揣著相機(jī)與錄音纵苛,去河邊找鬼。 笑死言津,一個(gè)胖子當(dāng)著我的面吹牛攻人,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播悬槽,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼怀吻,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了初婆?” 一聲冷哼從身側(cè)響起蓬坡,我...
    開封第一講書人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤猿棉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后屑咳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體萨赁,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年兆龙,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了杖爽。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡紫皇,死狀恐怖慰安,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情聪铺,我是刑警寧澤泻帮,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站计寇,受9級(jí)特大地震影響锣杂,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜番宁,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一元莫、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蝶押,春花似錦踱蠢、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至赶盔,卻和暖如春企锌,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背于未。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來泰國打工撕攒, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人烘浦。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓抖坪,卻偏偏與公主長得像,于是被迫代替她去往敵國和親闷叉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子擦俐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容