Go 語言爬蟲從并發(fā)式到分布式

Go 語言作為一門為編寫網(wǎng)絡(luò)應(yīng)用程序而生的編程語言袜匿,在擁有比 Java 更強(qiáng)的并發(fā)性的同時暴构,有擁有比 C 和 C++ 更快的開發(fā)速度(得益于簡潔的語法和豐富的標(biāo)準(zhǔn)庫),非常適合用于開發(fā)爬蟲程序魂贬。筆者基于 Go 語言開發(fā)了一個爬蟲程序跟继,并從單任務(wù)版本改良為并發(fā)式版本,最后演進(jìn)為分布式版本蛙婴。下面就分享出并發(fā)式版本和分布式版的架構(gòu)和設(shè)計思想粗井。


并發(fā)式爬蟲

并發(fā)式版本利用了 Go 語言強(qiáng)大的并發(fā)性能,解決了單任務(wù)版本爬取速度慢的問題街图。并發(fā)式版本由四大部分構(gòu)成浇衬,分別是EngineScheduler餐济、WorkerSaver耘擂。

  1. 爬蟲程序輸入一個由 URL 和 Parser 構(gòu)成的種子 Request
  2. Engine 會將接收的每個 Request 傳遞給維護(hù)著一個 Request 隊列和一個 可用 Worker 隊列的 Scheduler
  3. Scheduler 依據(jù) FIFO 的原則將 Request 任務(wù)分配給 Worker
  4. 每個包含 Fetcher 和 Parser 的 Goroutine 代表一個 Worker,每個 Worker 并發(fā)執(zhí)行任務(wù)絮姆,任務(wù)完成后重新加入 Scheduler 中的可用 Worker 隊列醉冤。Fetcher 首先網(wǎng)絡(luò)請求 Request 中的 URL 地址,并將返回的網(wǎng)絡(luò)響應(yīng)傳遞給 Parser篙悯,Parser 再講網(wǎng)絡(luò)響應(yīng)中需要的信息解析并提取出來蚁阳,再返回給 Engine
  5. Worker 返回給 Engine 的 ParseResult 中既包含我們需要得到的信息 Item,又包括需要進(jìn)一步爬取的 Request 任務(wù)鸽照,Engine 再將任務(wù)提交給 Scheduler 之前需要先進(jìn)行去重處理螺捐,以防重復(fù)爬取
  6. Engine 將需要保存的 Item 通過通道發(fā)送給 Saver
并發(fā)式爬蟲架構(gòu)

Model

// 請求,包括URL和指定的解析函數(shù)
type Request struct {
    Url    string
    Parser Parser
}
// 解析結(jié)果
type ParseResult struct {
    Requests []Request
    Items    []Item
}
// 最終要保存的條目
type Item struct {
    Url     string
    Id      string
    Type    string
    Payload interface{}
}

Engine

type ConcurrentEngine struct {
    MaxWorkerCount int // 工作協(xié)程的數(shù)量
    Scheduler      Scheduler // 任務(wù)調(diào)度器
    ItemChan       chan Item // 與ItemSaver之間的通道
    RequestWorker  Processor // Worker的處理器
}

type Processor func(request Request) (ParseResult, error)

type Scheduler interface {
    Submit(request Request)
    GetWorkerChan() chan Request
    Run()
    Ready
}

type Ready interface {
    WorkerReady(chan Request)
}

func (e *ConcurrentEngine) Run(seed ...Request) {
    out := make(chan ParseResult, 1024) // Worker返回任務(wù)結(jié)果的通道矮燎,緩沖區(qū)大小為1024
    e.Scheduler.Run()

    // 根據(jù)配置的Worker數(shù)量創(chuàng)建Goroutin
    for i := 0; i < e.MaxWorkerCount; i++ {
        e.createWorker(e.Scheduler.GetWorkerChan(), out, e.Scheduler)
    }

    for _, r := range seed {
        // 先去重定血,再提交任務(wù)給Scheduler
        if IsDuplicate(r.Url) {
            continue
        }
        e.Scheduler.Submit(r)
    }
    for {
        result := <-out // 獲取Worker返回的結(jié)果
        for _, item := range result.Items {
            go func() { e.ItemChan <- item }() // 給Saver發(fā)送Item
        }
        for _, r := range result.Requests {
            // 先去重,再提交任務(wù)給Scheduler
            if IsDuplicate(r.Url) {
                continue
            }
            e.Scheduler.Submit(r)
        }
    }

}

/**
 * Worker工廠函數(shù)
 * in Scheduler向Worker發(fā)送任務(wù)的通道
 * out Worker向Engine返回結(jié)果的通道
 * s 實現(xiàn)了Ready接口的Scheduler 
*/
func (e *ConcurrentEngine) createWorker(in chan Request, out chan ParseResult, s Ready) {
    go func() {
        for {
            s.WorkerReady(in) // 完成任務(wù)后漏峰,通知Scheduler當(dāng)前Worker可用
            request := <-in // 從Scheduler獲取任務(wù)
            result, err := e.RequestWorker(request) // 執(zhí)行任務(wù)
            if err != nil {
                continue
            }
            out <- result // 返回任務(wù)執(zhí)行結(jié)果
        }
    }()
}

Scheduler

type QueuedScheduler struct {
    requestChan chan engine.Request // 接收Engine提交任務(wù)的通道
    workerChan  chan chan engine.Request // 派發(fā)任務(wù)給Worker的通道
}

func (s *QueuedScheduler) Submit(request engine.Request) {
    s.requestChan <- request
}

func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
    s.workerChan <- w
}

func (s *QueuedScheduler) GetWorkerChan() chan engine.Request {
    return make(chan engine.Request)
}

func (s *QueuedScheduler) Run() {
    s.requestChan = make(chan engine.Request)
    s.workerChan = make(chan chan engine.Request)

    go func() {
        var requestQ []engine.Request // 任務(wù)隊列
        var workerQ []chan engine.Request // Worker隊列
        for {
            var activeRequest engine.Request
            var activeWorker chan engine.Request
            if len(requestQ) > 0 && len(workerQ) > 0 {
                activeRequest = requestQ[0]
                activeWorker = workerQ[0]
            }
            select {
            case r := <-s.requestChan:
                // 若任務(wù)通道中有新任務(wù)糠悼,加入任務(wù)隊列
                requestQ = append(requestQ, r)
            case w := <-s.workerChan:
                // 若可用Worker通道中有新Worker,加入Worker隊列
                workerQ = append(workerQ, w)
            case activeWorker <- activeRequest:
                // 將任務(wù)隊列中的首個元素派發(fā)給可用Worker隊列中的首個元素
                requestQ = requestQ[1:]
                workerQ = workerQ[1:]
            }
        }
    }()
}

Worker

func Worker(r Request) (ParseResult, error) {
    body, err := fetcher.Fetch(r.Url)
    if err != nil {
        log.Error().Msgf("請求[%s]失斍城恰:%s", r.Url, err)
        return ParseResult{}, err
    }
    return r.Parser.Parse(body, r.Url), nil
}

每個 Worker 運行在一個獨立的 Goroutine 當(dāng)中,讀者需要根據(jù)實際爬取的網(wǎng)站來編寫 Fetcher 和 Parser。

ItemSaver

ItemSaver 的任務(wù)就是將 Engine 通過 Channel 傳送過來的 Item 持久化存儲靖苇,讀者可以根據(jù)自己的需求來實現(xiàn) ItemSaver席噩,將 Item 存儲到 MySQLMongoDB贤壁,ElasticSearch 等數(shù)據(jù)庫悼枢。


分布式爬蟲

并發(fā)式版本雖然解決了單任務(wù)版本爬取效率低下的問題,但是在同一機(jī)器(同一 IP)上并發(fā)請求目標(biāo)網(wǎng)站脾拆,很容易因為短時間內(nèi)網(wǎng)絡(luò)請求流量過大而被目標(biāo)網(wǎng)站封禁馒索。
分布式版本將 WorkerSaver 分離部署到不同的機(jī)器上,不同機(jī)器上的 Worker 使用不同 IP名船,不但能夠解決 IP 封禁的問題绰上,還能進(jìn)一步提升爬取效率。

分布式爬蟲架構(gòu)

JSON-RPC

分布式系統(tǒng)需要通過網(wǎng)絡(luò)交互數(shù)據(jù)渠驼,同步系統(tǒng)中的狀態(tài)蜈块。本系統(tǒng)通過 JSON-RPC 同步各個節(jié)點中的狀態(tài),交互任務(wù)與任務(wù)執(zhí)行結(jié)果迷扇。Go 語言標(biāo)準(zhǔn)庫中的 net/rpc 包支持 JSON-RPC百揭,可以通過交換 JSON 格式的數(shù)據(jù)來進(jìn)行 RPC。

// 創(chuàng)建RPC服務(wù)器
func ServeRpc(host string, service interface{}) error {
    rpc.Register(service)
    listener, err := net.Listen("tcp", host)
    if err != nil {
        return err
    }
    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Printf("accept error: %v", err)
            continue
        }
        go jsonrpc.ServeConn(conn)
    }
    return nil
}

// 創(chuàng)建RPC客戶端
func NewClient(host string) (*rpc.Client, error) {
    conn, err := net.Dial("tcp", host)
    if err != nil {
        return nil, err
    }
    return jsonrpc.NewClient(conn), nil
}

序列化與反序列化

系統(tǒng)中的各個節(jié)點在進(jìn)行 JSON-RPC 之前蜓席,必須對需要發(fā)送的對象和函數(shù)序列器一,對接收的對象和函數(shù)反序列化。

type SerializedParser struct {
    Name string
    Args interface{}
}

type Request struct {
    Url    string
    Parser SerializedParser
}

type ParseResult struct {
    Items    []engine.Item
    Requests []Request
}

// 序列化請求對象
func SerializeRequest(r engine.Request) Request {
    name, args := r.Parse.Serialize()
    return Request{
        Url: r.Url,
        Parser: SerializedParser{
            Name: name,
            Args: args,
        },
    }
}

// 序列化結(jié)果對象
func SerializeResult(r engine.ParseResult) (p ParseResult) {
    p.Items = r.Items
    for _, req := range r.Requests {
        p.Requests = append(p.Requests, SerializeRequest(req))
    }
    return p
}

// 反序列解析器
func deserializeParser(p SerializedParser) (engine.Parser, error) {
    switch p.Name {
    case "ParseCity":
        return engine.NewFuncParser(parser.ParseCity, p.Name), nil
    case "ParseCityList":
        return engine.NewFuncParser(parser.ParseCityList, p.Name), nil
    case "ProfileParser":
        if userName, ok := p.Args.(string); ok {
            return parser.NewProfileParser(userName), nil
        } else {
            return nil, errors.New("invalid args for profileParser")
        }
    case "NilParser":
        return engine.NilParse{}, nil
    default:
        return nil, errors.New("unknown parser name")
    }
}

// 反序列化請求
func DeserializeRequest(r Request) (engine.Request, error) {
    parse, err := deserializeParser(r.Parser)
    if err != nil {
        return engine.Request{}, err
    }
    req := engine.Request{
        Url:   r.Url,
        Parse: parse,
    }
    return req, nil
}

// 反序列化結(jié)果
func DeserializeResult(r ParseResult) engine.ParseResult {
    result := engine.ParseResult{
        Items: r.Items,
    }
    for _, req := range r.Requests {
        ereq, err := DeserializeRequest(req)
        if err != nil {
            log.Warn().Msgf("error deserializing request: %v", err)
            continue
        }
        result.Requests = append(result.Requests, ereq)
    }
    return result
}

連接池

分布式系統(tǒng)中維護(hù)著一個 Worker 連接池厨内,Engine 通過連接池將請求任務(wù)派發(fā)到系統(tǒng)中的不同節(jié)點祈秕。

func createClientPool(hosts []string) chan *rpc.Client {
    var clients []*rpc.Client
    for _, h := range hosts {
        client, err := rpcsupport.NewClient(h)
        if err != nil {
            log.Warn().Msgf("error connection to %s : %s", h, err)

        } else {
            clients = append(clients, client)
            log.Warn().Msgf("connected  to %s", h)
        }
    }
    out := make(chan *rpc.Client)
    go func() {
        for {
            for _, c := range clients {
                out <- c
            }
        }
    }()
    return out
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市隘庄,隨后出現(xiàn)的幾起案子踢步,更是在濱河造成了極大的恐慌,老刑警劉巖丑掺,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件获印,死亡現(xiàn)場離奇詭異,居然都是意外死亡街州,警方通過查閱死者的電腦和手機(jī)兼丰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來唆缴,“玉大人鳍征,你說我怎么就攤上這事∶婊眨” “怎么了艳丛?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵匣掸,是天一觀的道長。 經(jīng)常有香客問我氮双,道長碰酝,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任戴差,我火速辦了婚禮送爸,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘暖释。我一直安慰自己袭厂,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布球匕。 她就那樣靜靜地躺著,像睡著了一般谐丢。 火紅的嫁衣襯著肌膚如雪乾忱。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天,我揣著相機(jī)與錄音蹄葱,去河邊找鬼。 笑死惯悠,一個胖子當(dāng)著我的面吹牛竣况,可吹牛的內(nèi)容都是我干的丹泉。 我是一名探鬼主播,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼睁宰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起勋陪,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤诅愚,失蹤者是張志新(化名)和其女友劉穎劫映,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體泳赋,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡祖今,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年千诬,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片邪驮。...
    茶點故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖盘榨,靈堂內(nèi)的尸體忽然破棺而出草巡,到底是詐尸還是另有隱情,我是刑警寧澤弛饭,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布侣颂,位于F島的核電站枪孩,受9級特大地震影響藻肄,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拒担,卻給世界環(huán)境...
    茶點故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望从撼。 院中可真熱鬧,春花似錦低零、人聲如沸婆翔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至雄妥,卻和暖如春最蕾,著一層夾襖步出監(jiān)牢的瞬間老厌,已是汗流浹背瘟则。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工壹粟, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留趁仙,地道東北人垦页。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓雀费,卻偏偏與公主長得像,于是被迫代替她去往敵國和親薄啥。 傳聞我的和親對象是個殘疾皇子辕羽,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,675評論 2 359