Golang實現(xiàn)簡單爬蟲框架(4)——隊列實現(xiàn)并發(fā)任務調度

前言

在上一篇文章《Golang實現(xiàn)簡單爬蟲框架(3)——簡單并發(fā)版》中我們實現(xiàn)了一個最簡單并發(fā)爬蟲怎静,調度器為每一個Request創(chuàng)建一個goroutine仓犬,每個goroutineWorker隊列中分發(fā)任務谜叹,發(fā)完就結束毫玖。所有的Worker都在搶一個channel中的任務织堂。但是這樣做還是有些許不足之處瓦灶,比如控制力弱:所有的Worker在搶同一個channel中的任務,我們沒有辦法控制給哪一個worker任務贫导。

其實我們可以自己做一個任務分發(fā)的機制抛猫,我們來決定分發(fā)給哪一個Worker

注意:本次并發(fā)是在上一篇文章簡單并發(fā)實現(xiàn)的基礎上修改,所以沒有貼出全部代碼脱盲,只是貼出部分修改部分邑滨,要查看完整項目代碼,可以查看上篇文章钱反,或者從github下載項目源代碼查看

1、項目架構

在上一篇文章實現(xiàn)簡單并發(fā)的基礎上匣距,我們修改下Scheduler的任務分發(fā)機制

image
  • Scheduler接收到一個Request后面哥,不能直接發(fā)給Worker,也不能為每個Request創(chuàng)建一個goroutine毅待,所以這里使用一個Request隊列
  • 同時我們想對Worker實現(xiàn)一個更多的控制尚卫,可以決定把任務分發(fā)給哪一個Worker,所以這里我們還需要一個Worker隊列
  • 當有了RequestWorker尸红,我們就可以把選擇的Request發(fā)送給選擇的Worker

2吱涉、隊列實現(xiàn)任務調度器

在scheduler目錄下創(chuàng)建queued.go文件

package scheduler

import "crawler/engine"

// 使用隊列來調度任務

type QueuedScheduler struct {
    requestChan chan engine.Request     // Request channel
    // Worker channel, 其中每一個Worker是一個 chan engine.Request 類型
    workerChan  chan chan engine.Request    
}

// 提交請求任務到 requestChannel
func (s *QueuedScheduler) Submit(request engine.Request) {
    s.requestChan <- request
}

func (s *QueuedScheduler) ConfigMasterWorkerChan(chan engine.Request) {
    panic("implement me")
}

// 告訴外界有一個 worker 可以接收 request
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
    s.workerChan <- w
}

func (s *QueuedScheduler) Run() {
    // 生成channel
    s.workerChan = make(chan chan engine.Request)
    s.requestChan = make(chan engine.Request)
    go func() {
        // 創(chuàng)建請求隊列和工作隊列
        var requestQ []engine.Request
        var workerQ []chan engine.Request
        for {
            var activeWorker chan engine.Request
            var activeRequest engine.Request
            
            // 當requestQ和workerQ同時有數(shù)據(jù)時
            if len(requestQ) > 0 && len(workerQ) > 0 {
                activeWorker = workerQ[0]
                activeRequest = requestQ[0]
            }
            
            select {
            case r := <-s.requestChan: // 當 requestChan 收到數(shù)據(jù)
                requestQ = append(requestQ, r)
            case w := <-s.workerChan: // 當 workerChan 收到數(shù)據(jù)
                workerQ = append(workerQ, w)
            case activeWorker <- activeRequest: // 當請求隊列和認讀隊列都不為空時,給任務隊列分配任務
                requestQ = requestQ[1:]
                workerQ = workerQ[1:]
            }
        }
    }()
}

3外里、爬蟲引擎

修改后的concurrent.go文件如下

package engine

import (
    "log"
)

// 并發(fā)引擎
type ConcurrendEngine struct {
    Scheduler   Scheduler
    WorkerCount int
}

// 任務調度器
type Scheduler interface {
    Submit(request Request) // 提交任務
    ConfigMasterWorkerChan(chan Request)
    WorkerReady(w chan Request)
    Run()
}

func (e *ConcurrendEngine) Run(seeds ...Request) {

    out := make(chan ParseResult)
    e.Scheduler.Run()

    // 創(chuàng)建 goruntine
    for i := 0; i < e.WorkerCount; i++ {
        createWorker(out, e.Scheduler)
    }

    // engine把請求任務提交給 Scheduler
    for _, request := range seeds {
        e.Scheduler.Submit(request)
    }

    itemCount := 0
    for {
        // 接受 Worker 的解析結果
        result := <-out
        for _, item := range result.Items {
            log.Printf("Got item: #%d: %v\n", itemCount, item)
            itemCount++
        }

        // 然后把 Worker 解析出的 Request 送給 Scheduler
        for _, request := range result.Requests {
            e.Scheduler.Submit(request)
        }
    }
}

func createWorker(out chan ParseResult, s Scheduler) {
    // 為每一個Worker創(chuàng)建一個channel
    in := make(chan Request)
    go func() {
        for {
            s.WorkerReady(in) // 告訴調度器任務空閑
            request := <-in
            result, err := worker(request)
            if err != nil {
                continue
            }
            out <- result
        }
    }()
}

4怎爵、main函數(shù)

package main

import (
    "crawler/engine"
    "crawler/scheduler"
    "crawler/zhenai/parser"
)

func main() {
    e := engine.ConcurrendEngine{
        Scheduler:   &scheduler.QueuedScheduler{},// 這里調用并發(fā)調度器
        WorkerCount: 50,
    }
    e.Run(engine.Request{
        Url:       "http://www.zhenai.com/zhenghun",
        ParseFunc: parser.ParseCityList,
    })
}

運行結果如下:

image

5、總結

在這篇文章中我們使用隊列實現(xiàn)對并發(fā)任務的調度盅蝗,從而實現(xiàn)了對Worker的控制鳖链。我們現(xiàn)在并發(fā)有兩種實現(xiàn)方式,但是他們的調度方法是不同的墩莫,為了代碼的統(tǒng)一芙委,所以在下一篇文章中的內容有:

  • 對項目做一個同構
  • 添加數(shù)據(jù)的存儲模塊。

如果想獲取Google工程師深度講解go語言視頻資源的狂秦,可以在評論區(qū)留下郵箱灌侣。

項目的源代碼已經(jīng)托管到Github上,對于各個版本都有記錄裂问,歡迎大家查看侧啼,記得給個star牛柒,在此先謝謝大家

如果覺得博客不錯,勞煩大人給個贊慨菱,

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末焰络,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子符喝,更是在濱河造成了極大的恐慌闪彼,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件协饲,死亡現(xiàn)場離奇詭異畏腕,居然都是意外死亡,警方通過查閱死者的電腦和手機茉稠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門描馅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人而线,你說我怎么就攤上這事铭污。” “怎么了膀篮?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵嘹狞,是天一觀的道長。 經(jīng)常有香客問我誓竿,道長磅网,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任筷屡,我火速辦了婚禮涧偷,結果婚禮上,老公的妹妹穿的比我還像新娘毙死。我一直安慰自己燎潮,他們只是感情好,可當我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布规哲。 她就那樣靜靜地躺著跟啤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪唉锌。 梳的紋絲不亂的頭發(fā)上隅肥,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天,我揣著相機與錄音袄简,去河邊找鬼腥放。 笑死,一個胖子當著我的面吹牛绿语,可吹牛的內容都是我干的秃症。 我是一名探鬼主播候址,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼种柑!你這毒婦竟也來了岗仑?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤聚请,失蹤者是張志新(化名)和其女友劉穎荠雕,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體驶赏,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡炸卑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了煤傍。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片盖文。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖蚯姆,靈堂內的尸體忽然破棺而出五续,到底是詐尸還是另有隱情,我是刑警寧澤龄恋,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布返帕,位于F島的核電站,受9級特大地震影響篙挽,放射性物質發(fā)生泄漏。R本人自食惡果不足惜镊靴,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一铣卡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧偏竟,春花似錦煮落、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至殖蚕,卻和暖如春轿衔,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背睦疫。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工害驹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蛤育。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓宛官,卻偏偏與公主長得像葫松,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子底洗,可洞房花燭夜當晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內容