參考 http://www.reibang.com/p/21de03ac682c
使用兩級(jí)channel袁梗,一個(gè)用來(lái)存放任務(wù)隊(duì)列眠寿,另一個(gè)用來(lái)控制處理任務(wù)隊(duì)列中任務(wù)的線程的數(shù)量
- 任務(wù)類型
定義一個(gè)任務(wù)類型誓琼,本質(zhì)是一個(gè)函數(shù),函數(shù)里面完成我們的業(yè)務(wù)邏輯等的處理
type task func(curTime time.Time)
- worker線程
woker是就是執(zhí)行作業(yè)(task)的線程
type Worker {
// 這里線程池的二級(jí)channel可以這么理解披泪,線程池中多個(gè)channel是用來(lái)存放woker線程踪蹬,用來(lái)控制線程的數(shù)量,而每個(gè)線程的結(jié)構(gòu)體又是一個(gè)channel恶迈,這個(gè)channel的類型是task涩金,用來(lái)等待任務(wù)的發(fā)生
workPool chan chan task //線程池,即woker所屬的線程池
taskChannel chan task //任務(wù)通道
quit chan bool //退出通道
}
// 新建一個(gè)woker線程
func newWorker(workPool chan chan task) *woker {
return &woker{
wokerPool:workPool, //表示work所在的線程池
taskChannel:make(chan task),
quit: make(chan bool),
}
}
// 給線程定義一個(gè)start方法暇仲,表示監(jiān)聽(tīng)到有任務(wù)來(lái)了開(kāi)始干活
func(this *worker) start() {
go func() {
// 表示線程池中的某一個(gè)woker開(kāi)始處理任務(wù)了步做,這里線程池如果滿了就不再接收新任務(wù)了,會(huì)在這里阻塞
// 這里就是一級(jí)channel 用來(lái)限制線程池中worker線程的使用
this.workerPool <- this.taskChannel
// 二級(jí)channel奈附,這里開(kāi)始處理任務(wù)全度,channel中沒(méi)有任務(wù)就阻塞,直到該線程被停止
select {
case taskObj := <-this.taskChannel:
taskObj(time.Now())
case <-this.quit:
return
}
}()
}
// 線程停止工作
func(this *Worker) stop() {
this.quit<-true
}
- 任務(wù)分發(fā)器
任務(wù)分發(fā)器可以把任務(wù)隊(duì)列中的任務(wù)逐個(gè)分發(fā)給線程池中的線程去處理
type Dispatch struct {
workPool chan chan task
maxNum int 32 // 線程池中線程數(shù)的最大數(shù)量
taskQueue chan task // 任務(wù)通道
}
// 新建一個(gè)任務(wù)分發(fā)器
func NewDispatcher(maxWorkerNum int32) *Dispatcher {
return &Dispatcher {
workPool:make(chan chan task, maxWorkerNum),
maxNum:mxWorkerNum,
taskQueue:make(chan task),
}
}
// 添加任務(wù)
func (this *Dispatcher) addTask(t task) {
this.taskQueue <- t
}
// 分配任務(wù)
func (this *Dispatcher) dispatcher() {
for {
select {
// 從任務(wù)隊(duì)列中取出一個(gè)任務(wù)
case taskObj := <- this.taskQueue:
go func(t task) {
// 從線程池中取出一個(gè)線程
workerChannel := <-this.workerPool
workerChannel <- taskObj
}(taskObj)
}
}
}
// 啟動(dòng)任務(wù)分配器,任務(wù)分配球開(kāi)始運(yùn)行并分發(fā)任務(wù)
func (this *Dispatcher) Run() {
// 新建工作線程
for i:=0; i<int (this.maxNum); i++ {
workerObj := newWorker(this.workerPool)
//啟動(dòng)線程
workerObj.start()
}
// 分發(fā)任務(wù)
go this.dispatcher()
}
測(cè)試文件
// 新建線程池容量為20的任務(wù)分發(fā)器
dispatcher := NewDispatcher(20)
dispacher.run()
//添加任務(wù)
for i:=0; i<50; i++ {
go func() {
dispatcher.addTask(test)
}
}