這是一篇使用并發(fā)和通道來實現(xiàn)控制程序生命周期的并發(fā)模式示例互捌,該示例演示了如何控制程序在規(guī)定時間段內(nèi)的執(zhí)行遗契,并可以手動中斷來終止程序的運行辐棒。
功能
展示如何通過通道來監(jiān)視程序的執(zhí)行時間病曾,如果程序執(zhí)行時間過長牍蜂,也可以終止程序
使用場景
當(dāng)需要調(diào)度后臺處理任務(wù)的時候,這種模式會很有用泰涂。該程序可能會作為 cron 作業(yè)執(zhí)行鲫竞,或者在基于定時任務(wù)的云環(huán)境 (如 iron.io) 里執(zhí)行。
實現(xiàn)思路
創(chuàng)建一個執(zhí)行者 runner, 給 runner 設(shè)置一個超時時間 timeout 和任務(wù)切片 tasks逼蒙,然后遍歷執(zhí)行 tasks 所有任務(wù)从绘。
設(shè)置一個存儲中斷信號的字段 interrupt,通過 interrupt 來判斷是否已經(jīng)中斷程序是牢。
設(shè)置一個記錄每個任務(wù)執(zhí)行錯誤結(jié)果的字段 complete僵井,監(jiān)聽 complete, 判斷是那種錯誤類型,然后做相應(yīng)的處理驳棱。
執(zhí)行所有任務(wù)批什,并監(jiān)聽不同錯誤碼,執(zhí)行不同的業(yè)務(wù)邏輯社搅。
實現(xiàn)詳情
首先我們需要聲明一個 runner 結(jié)構(gòu)
type runner struct {
tasks []func(int)
timeout <-chan time.Time
interrupt chan os.Signal
complete chan error
}
runner 中包括任務(wù)切片 tasks, tasks 是一個存儲 func(int) 類型的切片驻债,后面會遍歷 tasks 來進(jìn)行處理任務(wù)乳规。
timeout 字段是一個存放超時時間的只讀的通道,通過該字段來判斷任務(wù)執(zhí)行是否超時合呐。
interrupt 字段是存放 os.Signal 類型的通道暮的,接收到來自終端的中斷信號會存放在該字段中。
complete 字段是存放任務(wù)執(zhí)行的錯誤結(jié)果淌实,如果沒有錯誤則是 nil冻辩。
有了 runner 執(zhí)行者這個結(jié)構(gòu)后,我們可以聲明一個 New 工廠函數(shù)來創(chuàng)建 runner 類型的對象拆祈,并初始化需要的字段微猖。
// 工廠函數(shù)創(chuàng)建 runner
func New(timeout time.Duration) *runner {
return &runner{
timeout: time.After(timeout),
interrupt: make(chan os.Signal),
complete: make(chan error),
}
}
創(chuàng)建 runner 的時候,我們需要傳入一個 time.Duration 類型的參數(shù)缘屹,然后內(nèi)部調(diào)用 time.After() 這個函數(shù)來返回一個time.Time 類型的只讀通道凛剥。interrupt 和 complete 字段正常初始化即可。tasks 默認(rèn)是空切片(表示還沒有任何任務(wù))轻姿。
有了一個 runner 執(zhí)行者對象后犁珠,在執(zhí)行任務(wù)之前我們需要給 runner 的增加任務(wù),那我們需要寫一個給 runner 增加任務(wù)的方法互亮。
// 增加任務(wù)
// 可變參數(shù) ...func(int) 表示參數(shù)可以是多個參數(shù)
func (r *runner) Add(tasks ...func(int)) {
// 使用 tasks... 解構(gòu) tasks
r.tasks = append(r.tasks, tasks...)
}
增加任務(wù)方法 Add 接收一個參數(shù)類型為 func(int) 的可變參數(shù)犁享,可變參數(shù)意味著參數(shù)的數(shù)量是可變的,可以是單個豹休,也可以是多個炊昆。
當(dāng)調(diào)用 Add 方法后,就會把傳入的參數(shù)賦值給 runner 類型對象的 tasks 字段威根,此時 runner 類型對象就有任務(wù)了凤巨。
因為任務(wù)執(zhí)行過程會有多個錯誤值,比如超時錯誤和中斷錯誤洛搀,所以我們先定義兩個錯誤變量敢茁,以備后面使用。
// 錯誤類型
var (
ErrTimeout = errors.New("超時錯誤")
ErrInterrupt = errors.New("程序中斷錯誤")
)
接下來就是任務(wù)的執(zhí)行留美。
// 執(zhí)行任務(wù)
func (r *runner) Run() error {
for id, task := range r.tasks {
// 判斷是否已經(jīng)中斷程序
if r.isInterrupt() {
return ErrInterrupt
}
task(id)
}
return nil
}
執(zhí)行任務(wù)就是遍歷runner 對象中的 tasks 的所有任務(wù)彰檬,然后執(zhí)行每一個任務(wù)即可,但是在執(zhí)行任務(wù)之前谎砾,需要判斷是否已經(jīng)中斷了程序逢倍。如果已經(jīng)中斷了程序,則直接返回中斷錯誤 ErrInterrupt景图。
以下是判斷程序中斷的方法
// 判斷是否中斷程序
func (r *runner) isInterrupt() bool {
select {
case <- r.interrupt:
signal.Stop(r.interrupt)
return true
default:
return false
}
}
該方法是 runner 類型的一個方法较雕,方法內(nèi)使用了 select 多路復(fù)用來進(jìn)行監(jiān)聽
interrupt 通道是否有中斷信號,如果監(jiān)聽到有中斷信號症歇,則任務(wù)是用戶中斷了程序郎笆,此時會調(diào)用 signal.Stop() 方法 中斷程序谭梗,然后返回 true ,表示程序已經(jīng)被中斷宛蚓。
接下來激捏,我們的實現(xiàn)一個方法來整合整個任務(wù)執(zhí)行的流程,包括任務(wù)的執(zhí)行凄吏,中斷和超時的監(jiān)聽远舅。
// 執(zhí)行所有任務(wù),并監(jiān)聽通道事件
func (r *runner) Start() error {
// 收到的所有中斷信號
signal.Notify(r.interrupt, os.Interrupt)
?
go func() {
r.complete <- r.Run()
}()
?
select {
case err := <- r.complete:
return err
case <- r.timeout:
return ErrTimeout
}
}
Start 方法中痕钢,會接收所有的中斷信號图柏,放在 interrupt 通道中,然后調(diào)用 Run 方法來執(zhí)行所有任務(wù)任连,并把執(zhí)行的結(jié)果存放到 complete 通道中蚤吹,最后通過 select 多路復(fù)用方式監(jiān)聽 complete 通道和 timeout 通道中的消息,一旦有錯誤就返回錯誤碼随抠。
執(zhí)行者調(diào)用 Start 方法后拿到錯誤碼裁着,執(zhí)行自己的業(yè)務(wù)邏輯,如果沒有錯誤碼返回則表示所有任務(wù)在規(guī)定的超時時間內(nèi)成功執(zhí)行了所有任務(wù)拱她。
目前所有任務(wù)的執(zhí)行二驰,錯誤碼監(jiān)聽等工作已經(jīng)全部完成。
接下來我們創(chuàng)建一個 runner 對象來驗證一下程序秉沼。
func main() {
timeout := 2 * time.Second
runner := New(timeout)
runner.Add(CreateTask(), CreateTask(), CreateTask())
?
if err := runner.Start(); err != nil {
switch err {
case ErrInterrupt:
fmt.Println(ErrInterrupt)
case ErrTimeout:
fmt.Println(ErrTimeout)
}
}
fmt.Println("程序結(jié)束")
}
因為 Add 方法參數(shù)要求是一個傳入 int 類型的函數(shù)桶雀,所以為了方便創(chuàng)建任務(wù),我們聲明一個使用了閉包的 CreateTask 函數(shù)來返回任務(wù)函數(shù)唬复。
// 創(chuàng)建任務(wù)
func CreateTask() func(int) {
return func(id int) {
fmt.Println("正在執(zhí)行 Task ", id)
// 模擬任務(wù)執(zhí)行
time.Sleep(time.Duration(id) * time.Second)
}
}
到目前為止所有的代碼實現(xiàn)已經(jīng)全部編寫完成
以下是完整的示例代碼
package main
?
import (
"errors"
"fmt"
"os"
"os/signal"
"time"
)
?
type runner struct {
tasks []func(int)
timeout <-chan time.Time
interrupt chan os.Signal
complete chan error
}
?
// 工廠函數(shù)創(chuàng)建 runner
func New(timeout time.Duration) *runner
{
return &runner{
timeout: time.After(timeout),
interrupt: make(chan os.Signal),
complete: make(chan error),
}
}
?
// 錯誤類型
var (
ErrTimeout = errors.New("超時錯誤")
ErrInterrupt = errors.New("程序中斷錯誤")
)
?
// 執(zhí)行任務(wù)
func (r *runner) Run() error {
for id, task := range r.tasks {
// 判斷是否已經(jīng)中斷程序
if r.isInterrupt() {
return ErrInterrupt
}
task(id)
}
return nil
}
?
// 判斷是否中斷程序
func (r *runner) isInterrupt() bool {
select {
case <- r.interrupt:
signal.Stop(r.interrupt)
return true
default:
return false
}
}
?
// 增加任務(wù)
// 可變參數(shù) ...func(int) 表示參數(shù)可以是多個參數(shù)
func (r *runner) Add(tasks ...func(int)) {
// 使用 tasks... 解構(gòu) tasks
r.tasks = append(r.tasks, tasks...)
}
?
// 執(zhí)行所有任務(wù)矗积,并監(jiān)聽通道事件
func (r *runner) Start() error {
// 收到的所有中斷信號
signal.Notify(r.interrupt, os.Interrupt)
?
go func() {
r.complete <- r.Run()
}()
?
select {
case err := <- r.complete:
return err
case <- r.timeout:
return ErrTimeout
}
}
?
// 創(chuàng)建任務(wù)
func CreateTask() func(int) {
return func(id int) {
fmt.Println("正在執(zhí)行 Task ", id)
time.Sleep(time.Duration(id) * time.Second)
}
}
?
?
func main() {
timeout := 2 * time.Second
runner := New(timeout)
runner.Add(CreateTask(), CreateTask(), CreateTask())
?
if err := runner.Start(); err != nil {
switch err {
case ErrInterrupt:
fmt.Println(ErrInterrupt)
case ErrTimeout:
fmt.Println(ErrTimeout)
}
}
fmt.Println("程序結(jié)束")
}
一起精進(jìn)Go技術(shù), 關(guān)注公眾號:陸貴成