前言
工作以來(lái)做項(xiàng)目大部分的使用的輪子(第三方庫(kù))都是一些大牛寫(xiě)好開(kāi)源出來(lái)的蟆盐,自己只是拼拼湊湊利用現(xiàn)有的輪子完成工作就算完事了。現(xiàn)在我也來(lái)造個(gè)小輪子吧室谚,不過(guò)這個(gè)輪子是在寫(xiě)測(cè)試程序和分析程序時(shí)提取的毡鉴,并沒(méi)有用在線上項(xiàng)目中。這里記錄下來(lái)秒赤,也看看有沒(méi)有人用得上猪瞬。
簡(jiǎn)介
grqueue是goroutine queue是縮寫(xiě),實(shí)際是一個(gè)利用goroutine實(shí)現(xiàn)的一個(gè)同步隊(duì)列入篮,用于程序中可同步任務(wù)的并發(fā)執(zhí)行陈瘦,可以減少等待時(shí)間。比如批量日志分析潮售,并發(fā)請(qǐng)求等等痊项。
原理及實(shí)現(xiàn)
qrqueue的原理很簡(jiǎn)單,利用channel將需要執(zhí)行的task存起來(lái)酥诽,通過(guò)go routine不斷從channel中取出task執(zhí)行鞍泉,再利用官方sync包的WaitGroup等待執(zhí)行完畢。具體routine的數(shù)量和channel的容量可以由使用者自定義盆均,使用者也可以設(shè)置回調(diào)函數(shù)用于處理每個(gè)task結(jié)束后和所有task結(jié)束時(shí)需要處理的事務(wù)塞弊。簡(jiǎn)單畫(huà)了個(gè)圖
實(shí)現(xiàn)代碼也很簡(jiǎn)單,一百行不到泪姨,具體可以看看代碼和注釋游沿,各位發(fā)現(xiàn)有問(wèn)題或者可改進(jìn)的點(diǎn)歡迎拍磚評(píng)論。
package grqueue
import (
"sync"
)
type GoroutineQueue struct {
Number int //并發(fā)執(zhí)行的任務(wù)個(gè)數(shù)
Total int //總?cè)蝿?wù)數(shù)
tasks chan func() interface{}
task_end_callback func(result interface{})
finish_callback func()
wg sync.WaitGroup
}
func NewGoroutineQueue(number int, total int) *GoroutineQueue {
queue := &GoroutineQueue{
tasks: make(chan func() interface{}, total),
Number: number,
Total: total}
return queue
}
//開(kāi)始執(zhí)行task
func (queue *GoroutineQueue) Start() {
defer close(queue.tasks)
//加鎖肮砾,鎖的數(shù)量是tasks的數(shù)量
queue.wg.Add(len(queue.tasks))
for i := 0; i < queue.Number; i++ {
//分number個(gè)routine執(zhí)行work
go queue.work()
}
//等待routine執(zhí)行完畢
queue.wg.Wait()
//所有task完畢诀黍,若finish回調(diào)函數(shù)存在則執(zhí)行則回調(diào)
if queue.finish_callback != nil {
queue.finish_callback()
}
}
func (queue *GoroutineQueue) work() {
for {
//不斷取出task執(zhí)行,直到chan關(guān)閉
task, ok := <-queue.tasks
if !ok {
break
}
res := task()
//完成一個(gè)task立即回調(diào)
if queue.task_end_callback != nil {
queue.task_end_callback(res)
}
//每執(zhí)行完一個(gè)task仗处,解鎖一次
wg.Done()
}
}
func (queue *GoroutineQueue) AddTask(task func() interface{}) {
queue.tasks <- task
}
func (queue *GoroutineQueue) SetFinishCallback(callback func()) {
queue.finish_callback = callback
}
func (queue *GoroutineQueue) SetTaskEndCallback(callback func(result interface{})) {
queue.task_end_callback = callback
}
使用
代碼已放github眯勾,直接使用直接go get
就可以了
go get github.com/yaodd/grqueue
使用示例在github的README有寫(xiě),這里就不重復(fù)敘述了婆誓。
以上
--------------2018-03-23更新-------------
實(shí)際項(xiàng)目中應(yīng)用發(fā)現(xiàn)wg不應(yīng)該作為全局變量使用吃环,而是應(yīng)該作為GoroutineQueue成員變量使用,已修正洋幻。