控制程序的生命周期

這是一篇使用并發(fā)和通道來實現(xiàn)控制程序生命周期的并發(fā)模式示例互捌,該示例演示了如何控制程序在規(guī)定時間段內(nèi)的執(zhí)行遗契,并可以手動中斷來終止程序的運行辐棒。

功能

展示如何通過通道來監(jiān)視程序的執(zhí)行時間病曾,如果程序執(zhí)行時間過長牍蜂,也可以終止程序

使用場景

當(dāng)需要調(diào)度后臺處理任務(wù)的時候,這種模式會很有用泰涂。該程序可能會作為 cron 作業(yè)執(zhí)行鲫竞,或者在基于定時任務(wù)的云環(huán)境 (如 iron.io) 里執(zhí)行。

實現(xiàn)思路

  1. 創(chuàng)建一個執(zhí)行者 runner, 給 runner 設(shè)置一個超時時間 timeout 和任務(wù)切片 tasks逼蒙,然后遍歷執(zhí)行 tasks 所有任務(wù)从绘。

  2. 設(shè)置一個存儲中斷信號的字段 interrupt,通過 interrupt 來判斷是否已經(jīng)中斷程序是牢。

  3. 設(shè)置一個記錄每個任務(wù)執(zhí)行錯誤結(jié)果的字段 complete僵井,監(jiān)聽 complete, 判斷是那種錯誤類型,然后做相應(yīng)的處理驳棱。

  4. 執(zhí)行所有任務(wù)批什,并監(jiān)聽不同錯誤碼,執(zhí)行不同的業(yè)務(wù)邏輯社搅。

實現(xiàn)詳情

首先我們需要聲明一個 runner 結(jié)構(gòu)

type runner struct {
  tasks []func(int)
  timeout <-chan time.Time
  interrupt chan os.Signal
  complete chan error
}

runner 中包括任務(wù)切片 tasks, tasks 是一個存儲 func(int) 類型的切片驻债,后面會遍歷 tasks 來進(jìn)行處理任務(wù)乳规。

timeout 字段是一個存放超時時間的只讀的通道,通過該字段來判斷任務(wù)執(zhí)行是否超時合呐。

interrupt 字段是存放 os.Signal 類型的通道暮的,接收到來自終端的中斷信號會存放在該字段中。

complete 字段是存放任務(wù)執(zhí)行的錯誤結(jié)果淌实,如果沒有錯誤則是 nil冻辩。

有了 runner 執(zhí)行者這個結(jié)構(gòu)后,我們可以聲明一個 New 工廠函數(shù)來創(chuàng)建 runner 類型的對象拆祈,并初始化需要的字段微猖。

// 工廠函數(shù)創(chuàng)建 runner
func New(timeout time.Duration) *runner {
  return &runner{
    timeout: time.After(timeout),
    interrupt: make(chan os.Signal),
    complete: make(chan error),
  }
}

創(chuàng)建 runner 的時候,我們需要傳入一個 time.Duration 類型的參數(shù)缘屹,然后內(nèi)部調(diào)用 time.After() 這個函數(shù)來返回一個time.Time 類型的只讀通道凛剥。interrupt 和 complete 字段正常初始化即可。tasks 默認(rèn)是空切片(表示還沒有任何任務(wù))轻姿。

有了一個 runner 執(zhí)行者對象后犁珠,在執(zhí)行任務(wù)之前我們需要給 runner 的增加任務(wù),那我們需要寫一個給 runner 增加任務(wù)的方法互亮。

// 增加任務(wù)
// 可變參數(shù) ...func(int) 表示參數(shù)可以是多個參數(shù)
func (r *runner) Add(tasks ...func(int)) {
  // 使用 tasks... 解構(gòu) tasks
  r.tasks = append(r.tasks, tasks...)
}

增加任務(wù)方法 Add 接收一個參數(shù)類型為 func(int) 的可變參數(shù)犁享,可變參數(shù)意味著參數(shù)的數(shù)量是可變的,可以是單個豹休,也可以是多個炊昆。

當(dāng)調(diào)用 Add 方法后,就會把傳入的參數(shù)賦值給 runner 類型對象的 tasks 字段威根,此時 runner 類型對象就有任務(wù)了凤巨。

因為任務(wù)執(zhí)行過程會有多個錯誤值,比如超時錯誤和中斷錯誤洛搀,所以我們先定義兩個錯誤變量敢茁,以備后面使用。

// 錯誤類型
var (
  ErrTimeout = errors.New("超時錯誤")
  ErrInterrupt = errors.New("程序中斷錯誤")
)

接下來就是任務(wù)的執(zhí)行留美。

// 執(zhí)行任務(wù)
func (r *runner) Run() error {
  for id, task := range r.tasks {
    // 判斷是否已經(jīng)中斷程序
    if r.isInterrupt() {
      return ErrInterrupt
    }
    task(id)
  }
  return nil
}

執(zhí)行任務(wù)就是遍歷runner 對象中的 tasks 的所有任務(wù)彰檬,然后執(zhí)行每一個任務(wù)即可,但是在執(zhí)行任務(wù)之前谎砾,需要判斷是否已經(jīng)中斷了程序逢倍。如果已經(jīng)中斷了程序,則直接返回中斷錯誤 ErrInterrupt景图。

以下是判斷程序中斷的方法

// 判斷是否中斷程序
func (r *runner) isInterrupt() bool {
  select {
  case <- r.interrupt:
    signal.Stop(r.interrupt)
    return true
  default:
    return false
  }
}

該方法是 runner 類型的一個方法较雕,方法內(nèi)使用了 select 多路復(fù)用來進(jìn)行監(jiān)聽

interrupt 通道是否有中斷信號,如果監(jiān)聽到有中斷信號症歇,則任務(wù)是用戶中斷了程序郎笆,此時會調(diào)用 signal.Stop() 方法 中斷程序谭梗,然后返回 true ,表示程序已經(jīng)被中斷宛蚓。

接下來激捏,我們的實現(xiàn)一個方法來整合整個任務(wù)執(zhí)行的流程,包括任務(wù)的執(zhí)行凄吏,中斷和超時的監(jiān)聽远舅。

// 執(zhí)行所有任務(wù),并監(jiān)聽通道事件
func (r *runner) Start() error  {
  // 收到的所有中斷信號
  signal.Notify(r.interrupt, os.Interrupt)
?
  go func() {
    r.complete <- r.Run()
  }()
?
  select {
  case err := <- r.complete:
    return err
  case <- r.timeout:
    return ErrTimeout
  }
}

Start 方法中痕钢,會接收所有的中斷信號图柏,放在 interrupt 通道中,然后調(diào)用 Run 方法來執(zhí)行所有任務(wù)任连,并把執(zhí)行的結(jié)果存放到 complete 通道中蚤吹,最后通過 select 多路復(fù)用方式監(jiān)聽 complete 通道和 timeout 通道中的消息,一旦有錯誤就返回錯誤碼随抠。

執(zhí)行者調(diào)用 Start 方法后拿到錯誤碼裁着,執(zhí)行自己的業(yè)務(wù)邏輯,如果沒有錯誤碼返回則表示所有任務(wù)在規(guī)定的超時時間內(nèi)成功執(zhí)行了所有任務(wù)拱她。

目前所有任務(wù)的執(zhí)行二驰,錯誤碼監(jiān)聽等工作已經(jīng)全部完成。

接下來我們創(chuàng)建一個 runner 對象來驗證一下程序秉沼。

func main() {
  timeout := 2 * time.Second
  runner := New(timeout)
  runner.Add(CreateTask(), CreateTask(), CreateTask())
?
  if err := runner.Start(); err != nil {
    switch err {
    case ErrInterrupt:
      fmt.Println(ErrInterrupt)
    case ErrTimeout:
      fmt.Println(ErrTimeout)
    }
  }
  fmt.Println("程序結(jié)束")
}

因為 Add 方法參數(shù)要求是一個傳入 int 類型的函數(shù)桶雀,所以為了方便創(chuàng)建任務(wù),我們聲明一個使用了閉包的 CreateTask 函數(shù)來返回任務(wù)函數(shù)唬复。

// 創(chuàng)建任務(wù)
func CreateTask() func(int)  {
  return func(id int) {
    fmt.Println("正在執(zhí)行 Task ", id)
    // 模擬任務(wù)執(zhí)行
    time.Sleep(time.Duration(id) * time.Second)
  }
}

到目前為止所有的代碼實現(xiàn)已經(jīng)全部編寫完成

以下是完整的示例代碼

package main
?
import (
  "errors"
  "fmt"
  "os"
  "os/signal"
  "time"
)
?
type runner struct {
  tasks []func(int)
  timeout <-chan time.Time
  interrupt chan os.Signal
  complete chan error
}
?
// 工廠函數(shù)創(chuàng)建 runner
func New(timeout time.Duration) *runner
 {
  return &runner{
    timeout: time.After(timeout),
    interrupt: make(chan os.Signal),
    complete: make(chan error),
  }
}
?
// 錯誤類型
var (
  ErrTimeout = errors.New("超時錯誤")
  ErrInterrupt = errors.New("程序中斷錯誤")
)
?
// 執(zhí)行任務(wù)
func (r *runner) Run() error {
  for id, task := range r.tasks {
    // 判斷是否已經(jīng)中斷程序
    if r.isInterrupt() {
      return ErrInterrupt
    }
    task(id)
  }
  return nil
}
?
// 判斷是否中斷程序
func (r *runner) isInterrupt() bool {
  select {
  case <- r.interrupt:
    signal.Stop(r.interrupt)
    return true
  default:
    return false
  }
}
?
// 增加任務(wù)
// 可變參數(shù) ...func(int) 表示參數(shù)可以是多個參數(shù)
func (r *runner) Add(tasks ...func(int)) {
  // 使用 tasks... 解構(gòu) tasks
  r.tasks = append(r.tasks, tasks...)
}
?
// 執(zhí)行所有任務(wù)矗积,并監(jiān)聽通道事件
func (r *runner) Start() error  {
  // 收到的所有中斷信號
  signal.Notify(r.interrupt, os.Interrupt)
?
  go func() {
    r.complete <- r.Run()
  }()
?
  select {
  case err := <- r.complete:
    return err
  case <- r.timeout:
    return ErrTimeout
  }
}
?
// 創(chuàng)建任務(wù)
func CreateTask() func(int)  {
  return func(id int) {
    fmt.Println("正在執(zhí)行 Task ", id)
    time.Sleep(time.Duration(id) * time.Second)
  }
}
?
?
func main() {
  timeout := 2 * time.Second
  runner := New(timeout)
  runner.Add(CreateTask(), CreateTask(), CreateTask())
?
  if err := runner.Start(); err != nil {
    switch err {
    case ErrInterrupt:
      fmt.Println(ErrInterrupt)
    case ErrTimeout:
      fmt.Println(ErrTimeout)
    }
  }
  fmt.Println("程序結(jié)束")
}

一起精進(jìn)Go技術(shù), 關(guān)注公眾號:陸貴成

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末盅抚,一起剝皮案震驚了整個濱河市漠魏,隨后出現(xiàn)的幾起案子倔矾,更是在濱河造成了極大的恐慌妄均,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件哪自,死亡現(xiàn)場離奇詭異丰包,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)壤巷,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進(jìn)店門邑彪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人胧华,你說我怎么就攤上這事寄症≈姹耄” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵有巧,是天一觀的道長释漆。 經(jīng)常有香客問我,道長篮迎,這世上最難降的妖魔是什么男图? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮甜橱,結(jié)果婚禮上逊笆,老公的妹妹穿的比我還像新娘。我一直安慰自己岂傲,他們只是感情好难裆,可當(dāng)我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著镊掖,像睡著了一般差牛。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上堰乔,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天偏化,我揣著相機(jī)與錄音,去河邊找鬼镐侯。 笑死侦讨,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的苟翻。 我是一名探鬼主播韵卤,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼崇猫!你這毒婦竟也來了沈条?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤诅炉,失蹤者是張志新(化名)和其女友劉穎蜡歹,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體涕烧,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡月而,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了议纯。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片父款。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出憨攒,到底是詐尸還是另有隱情世杀,我是刑警寧澤,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布肝集,位于F島的核電站玫坛,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏包晰。R本人自食惡果不足惜湿镀,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望伐憾。 院中可真熱鬧勉痴,春花似錦、人聲如沸树肃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽胸嘴。三九已至雏掠,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間劣像,已是汗流浹背乡话。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留耳奕,地道東北人绑青。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像屋群,于是被迫代替她去往敵國和親闸婴。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容