延時(shí)任務(wù)經(jīng)常在項(xiàng)目中被用到,比如生成訂單之后15分鐘訂單過(guò)期刀诬,發(fā)送xx事件開(kāi)始前的用戶(hù)提醒短信,周期性一致性檢查等;在go中可以通過(guò)ticker很方便的實(shí)現(xiàn)一個(gè)簡(jiǎn)單的延時(shí)任務(wù)功能涉波;但是如果要實(shí)現(xiàn)分布式的,獨(dú)立的延時(shí)任務(wù)進(jìn)程炭序,沒(méi)有找到適合的模塊啤覆;本文描述了一個(gè)精度為1s的延時(shí)任務(wù)的實(shí)現(xiàn);可以實(shí)現(xiàn)分布式多進(jìn)程部署惭聂,也能夠很方便的實(shí)現(xiàn)負(fù)載均衡窗声;
github地址 https://github.com/heteddy/delaytask-go
通常延時(shí)任務(wù)通過(guò)時(shí)間輪的方式實(shí)現(xiàn),可以參照下面的圖辜纲,時(shí)間輪包含8個(gè)slot笨觅,每個(gè)slot關(guān)聯(lián)一個(gè)鏈表拦耐,定時(shí)器觸發(fā)旋轉(zhuǎn)一個(gè)slot,并執(zhí)行對(duì)應(yīng)的任務(wù)见剩;
kafka中定時(shí)任務(wù)也是采用了時(shí)間輪杀糯,采用了多層輪的概念,如果任務(wù)時(shí)間超過(guò)下層輪的周期苍苞,就依次向上放入上層的輪中固翰,但這種多層的輪需要隨著時(shí)間移動(dòng)對(duì)應(yīng)的task;管理上比較復(fù)雜羹呵。
delaytask-go 采用了在任務(wù)的鏈表節(jié)點(diǎn)中加入round 概念骂际,即運(yùn)行round圈之后再執(zhí)行當(dāng)前的task;定義如下:
type TaskNode struct {
Runner
round uint64 // 當(dāng)運(yùn)行輪數(shù)之后冈欢,再運(yùn)行
// pool 的接口
worker IWorker // 執(zhí)行task的worker pool
}
type runnerInfo struct {
task *TaskNode // runner所在的taskNode
slot int64 // runner在wheel中的插槽
}
type Wheel struct {
ticks time.Duration
// 時(shí)間輪 槽的數(shù)量
count int64
// 索引
index int64
// 槽
slots []*Node // 時(shí)間輪插槽歉铝,每個(gè)包含TaskNode 鏈表和taskNode count
// 保存task和task所在的index
runnerMap map[int64]*runnerInfo
}
實(shí)現(xiàn)延時(shí)任務(wù)有以下幾個(gè)關(guān)鍵點(diǎn)
- 保證任務(wù)的時(shí)間精度,不能出現(xiàn)過(guò)多的延時(shí)
- 內(nèi)存占用凑耻,當(dāng)任務(wù)比較多太示,延時(shí)比較長(zhǎng)的任務(wù),進(jìn)程盡可能少的占用內(nèi)存香浩,
- 分布式部署
- 現(xiàn)場(chǎng)恢復(fù)先匪,當(dāng)執(zhí)行任務(wù)模塊宕機(jī)之后,盡快回復(fù)現(xiàn)場(chǎng)
任務(wù)精度
為了保證定時(shí)任務(wù)的執(zhí)行弃衍,定時(shí)任務(wù)通常切換到獨(dú)立的goroutine 池中執(zhí)行呀非;同時(shí)為了保證添加任務(wù)和定時(shí)器的觸發(fā)的Sequential Process 順序處理;因此把時(shí)間輪也放入一個(gè)單獨(dú)的goroutine中镜盯,添加任務(wù)和定時(shí)器事件都切換到timewheel goroutine中執(zhí)行岸裙,保證了順序性;而且避免了使用鎖速缆;
內(nèi)存占用
如果任務(wù)很少降允,直接放入內(nèi)存;但是如果有百萬(wàn)級(jí)別的任務(wù)艺糜,尤其是延時(shí)比較長(zhǎng)的任務(wù)剧董,全部放入內(nèi)存,本機(jī)內(nèi)存使用率很低破停;delaytask-go根據(jù)timewheel內(nèi)置了一個(gè)threshold(timewheel round time的整數(shù)倍)翅楼,當(dāng)執(zhí)行時(shí)間>這個(gè)threshold的時(shí)候,將任務(wù)序列化放入redis中真慢,通過(guò)sorted set管理waiting task毅臊;timewheel每轉(zhuǎn)一圈(round time)就去redis中取出來(lái)即將要執(zhí)行的任務(wù)
分布式部署
通過(guò)redis pub/sub,客戶(hù)端將任務(wù)序列化之后,發(fā)送到redis中黑界,delaytask-go訂閱任務(wù)管嬉;如果要做到負(fù)載均衡皂林,可以通過(guò)訂閱的channel做到,比如delaytaskchannel-0蚯撩,delaytaskchannel-1础倍,delaytaskchannel-2;客戶(hù)端通過(guò)一定的負(fù)載均衡算法將任務(wù)發(fā)送的指定的channel中胎挎;
現(xiàn)場(chǎng)的恢復(fù)
將進(jìn)入timewheel中的task序列化到 redis ongoing task list著隆;每當(dāng)task執(zhí)行完成之后就從ongoing中刪除對(duì)應(yīng)的task;如果系統(tǒng)崩潰呀癣,重啟時(shí)先從ongoing task中讀取task 如果task未超時(shí),放入timewheel中執(zhí)行弦赖。
測(cè)試
測(cè)試機(jī) macbook:
- 4 * Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
- 16G內(nèi)存
執(zhí)行了50000次 http get 百度的任務(wù)(部分超時(shí)失敗项栏,設(shè)置了1s超時(shí))之后內(nèi)存消耗大約為8M,峰值大約為12M蹬竖,每秒500+個(gè)任務(wù)并發(fā)的測(cè)試結(jié)果:任務(wù)的 計(jì)劃執(zhí)行時(shí)間 ToRunAt和實(shí)際執(zhí)行時(shí)間RunAt誤差小于1s沼沈;
可以根據(jù)任務(wù)的類(lèi)型修改goroutine池中worker的數(shù)量。io任務(wù)可以適當(dāng)增加worker數(shù)量币厕;默認(rèn)為cpu number
cpu的性能測(cè)試火焰圖列另,
cpu 統(tǒng)計(jì),除了系統(tǒng)調(diào)度占用旦装,logrus 和 redis操作占用了很多時(shí)間页衙,去掉log應(yīng)該可以運(yùn)行的更快;