前言
在上一篇文章《Golang實現(xiàn)簡單爬蟲框架(3)——簡單并發(fā)版》中我們實現(xiàn)了一個最簡單并發(fā)爬蟲怎静,調度器為每一個Request
創(chuàng)建一個goroutine
仓犬,每個goroutine
往Worker
隊列中分發(fā)任務谜叹,發(fā)完就結束毫玖。所有的Worker
都在搶一個channel
中的任務织堂。但是這樣做還是有些許不足之處瓦灶,比如控制力弱:所有的Worker在搶同一個channel
中的任務,我們沒有辦法控制給哪一個worker任務贫导。
其實我們可以自己做一個任務分發(fā)的機制抛猫,我們來決定分發(fā)給哪一個Worker
注意:本次并發(fā)是在上一篇文章簡單并發(fā)實現(xiàn)的基礎上修改,所以沒有貼出全部代碼脱盲,只是貼出部分修改部分邑滨,要查看完整項目代碼,可以查看上篇文章钱反,或者從github下載項目源代碼查看
1、項目架構
在上一篇文章實現(xiàn)簡單并發(fā)的基礎上匣距,我們修改下Scheduler
的任務分發(fā)機制
- 當
Scheduler
接收到一個Request
后面哥,不能直接發(fā)給Worker
,也不能為每個Request
創(chuàng)建一個goroutine
毅待,所以這里使用一個Request隊列 - 同時我們想對
Worker
實現(xiàn)一個更多的控制尚卫,可以決定把任務分發(fā)給哪一個Worker
,所以這里我們還需要一個Worker
隊列 - 當有了
Request
和Worker
尸红,我們就可以把選擇的Request發(fā)送給選擇的Worker
2吱涉、隊列實現(xiàn)任務調度器
在scheduler目錄下創(chuàng)建queued.go文件
package scheduler
import "crawler/engine"
// 使用隊列來調度任務
type QueuedScheduler struct {
requestChan chan engine.Request // Request channel
// Worker channel, 其中每一個Worker是一個 chan engine.Request 類型
workerChan chan chan engine.Request
}
// 提交請求任務到 requestChannel
func (s *QueuedScheduler) Submit(request engine.Request) {
s.requestChan <- request
}
func (s *QueuedScheduler) ConfigMasterWorkerChan(chan engine.Request) {
panic("implement me")
}
// 告訴外界有一個 worker 可以接收 request
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
s.workerChan <- w
}
func (s *QueuedScheduler) Run() {
// 生成channel
s.workerChan = make(chan chan engine.Request)
s.requestChan = make(chan engine.Request)
go func() {
// 創(chuàng)建請求隊列和工作隊列
var requestQ []engine.Request
var workerQ []chan engine.Request
for {
var activeWorker chan engine.Request
var activeRequest engine.Request
// 當requestQ和workerQ同時有數(shù)據(jù)時
if len(requestQ) > 0 && len(workerQ) > 0 {
activeWorker = workerQ[0]
activeRequest = requestQ[0]
}
select {
case r := <-s.requestChan: // 當 requestChan 收到數(shù)據(jù)
requestQ = append(requestQ, r)
case w := <-s.workerChan: // 當 workerChan 收到數(shù)據(jù)
workerQ = append(workerQ, w)
case activeWorker <- activeRequest: // 當請求隊列和認讀隊列都不為空時,給任務隊列分配任務
requestQ = requestQ[1:]
workerQ = workerQ[1:]
}
}
}()
}
3外里、爬蟲引擎
修改后的concurrent.go文件如下
package engine
import (
"log"
)
// 并發(fā)引擎
type ConcurrendEngine struct {
Scheduler Scheduler
WorkerCount int
}
// 任務調度器
type Scheduler interface {
Submit(request Request) // 提交任務
ConfigMasterWorkerChan(chan Request)
WorkerReady(w chan Request)
Run()
}
func (e *ConcurrendEngine) Run(seeds ...Request) {
out := make(chan ParseResult)
e.Scheduler.Run()
// 創(chuàng)建 goruntine
for i := 0; i < e.WorkerCount; i++ {
createWorker(out, e.Scheduler)
}
// engine把請求任務提交給 Scheduler
for _, request := range seeds {
e.Scheduler.Submit(request)
}
itemCount := 0
for {
// 接受 Worker 的解析結果
result := <-out
for _, item := range result.Items {
log.Printf("Got item: #%d: %v\n", itemCount, item)
itemCount++
}
// 然后把 Worker 解析出的 Request 送給 Scheduler
for _, request := range result.Requests {
e.Scheduler.Submit(request)
}
}
}
func createWorker(out chan ParseResult, s Scheduler) {
// 為每一個Worker創(chuàng)建一個channel
in := make(chan Request)
go func() {
for {
s.WorkerReady(in) // 告訴調度器任務空閑
request := <-in
result, err := worker(request)
if err != nil {
continue
}
out <- result
}
}()
}
4怎爵、main函數(shù)
package main
import (
"crawler/engine"
"crawler/scheduler"
"crawler/zhenai/parser"
)
func main() {
e := engine.ConcurrendEngine{
Scheduler: &scheduler.QueuedScheduler{},// 這里調用并發(fā)調度器
WorkerCount: 50,
}
e.Run(engine.Request{
Url: "http://www.zhenai.com/zhenghun",
ParseFunc: parser.ParseCityList,
})
}
運行結果如下:
5、總結
在這篇文章中我們使用隊列實現(xiàn)對并發(fā)任務的調度盅蝗,從而實現(xiàn)了對Worker的控制鳖链。我們現(xiàn)在并發(fā)有兩種實現(xiàn)方式,但是他們的調度方法是不同的墩莫,為了代碼的統(tǒng)一芙委,所以在下一篇文章中的內容有:
- 對項目做一個同構
- 添加數(shù)據(jù)的存儲模塊。
如果想獲取Google工程師深度講解go語言視頻資源的狂秦,可以在評論區(qū)留下郵箱灌侣。
項目的源代碼已經(jīng)托管到Github上,對于各個版本都有記錄裂问,歡迎大家查看侧啼,記得給個star牛柒,在此先謝謝大家
如果覺得博客不錯,勞煩大人給個贊慨菱,