原文鏈接
并發(fā)版爬蟲架構(gòu)
將爬蟲分為兩部分:
一酪劫、隊(duì)列調(diào)度器:提供下載請求給Process
二舀凛、Process
:包括下載請求扼仲、解析下載的內(nèi)容远寸、返回新請求列表給隊(duì)列調(diào)度器、輸出下載內(nèi)容屠凶。
具體實(shí)現(xiàn):
- 在主線程中使用一個(gè)隊(duì)列調(diào)度器來儲存待訪問的請求驰后,若隊(duì)列調(diào)度器為空并且沒有正在運(yùn)行的
Process
線程,則認(rèn)為爬蟲結(jié)束矗愧。 - 主線程控制
Process
線程并發(fā)的數(shù)量灶芝,執(zhí)行處理在隊(duì)列調(diào)度器中得到的請求。下載請求的內(nèi)容唉韭,交由頁面處理器處理夜涕,下載失敗則重新添加該鏈接到隊(duì)列調(diào)度器中。 - 判斷頁面處理器返回的請求鏈接是否訪問過属愤,若未訪問則加入到隊(duì)列調(diào)度器女器。將解析得到的內(nèi)容交由輸出線程處理。
爬蟲引擎
package spider
import (
"downloader"
"github.com/PuerkitoBio/goquery"
"log"
"pageprocess"
"pipeline"
"scheduler"
"strconv"
"time"
)
// threadnum - 線程數(shù)量
// scheduler - 調(diào)度器
// downloader - 下載器
// pageprocess - 頁面處理
// pipeline - 輸出
type Spider struct {
threadnum uint8
scheduler scheduler.Scheduler
downloader downloader.DownLoader
pageprocess pageprocess.PageProcess
pipeline pipeline.PipeLine
}
// NewSpider 創(chuàng)建一個(gè)爬蟲引擎
func NewSpider(threadnum int,path string) *Spider{
return &Spider{
scheduler:scheduler.NewQueueSCheduler(),
downloader:downloader.NewHttpDownLoader(),
pageprocess:pageprocess.NewPageProcess(),
pipeline:pipeline.NewFilePipeLine(path),
threadnum:uint8(threadnum),
}
}
// Run 引擎運(yùn)行
func (s *Spider) Run(){
// Process并發(fā)數(shù)量
rm := NewResourceManagerChan(s.threadnum)
log.Println("[Spider] 爬蟲運(yùn)行 - 處理線程數(shù):" + strconv.Itoa(rm.Cap()))
for{
url,ok := s.scheduler.Pop()
// 爬取隊(duì)列為空 并且 沒有Process線程在處理 認(rèn)為爬蟲結(jié)束
if ok == false && rm.Has() == 0{
log.Println("[Spider] 爬蟲運(yùn)行結(jié)束")
break
}else if ok == false{ // Process線程正在處理住诸,可能還會有新的請求加入調(diào)度
log.Println("[Spider] 爬取隊(duì)列為空 - 等待處理")
time.Sleep(500 * time.Millisecond)
continue
}
// 控制Process線程并發(fā)數(shù)量
rm.GetOne()
go func(url string) {
defer rm.FreeOne()
s.Process(url)
}(url)
}
}
// 添加請求鏈接
func (s *Spider) AddUrl(url string) *Spider{
s.scheduler.Push(url)
return s
}
func (s *Spider) AddUrls(urls []string) *Spider{
for _,url := range urls{
s.scheduler.Push(url)
}
return s
}
// 處理請求鏈接
func (s *Spider) Process(url string){
// 下載鏈接
resp := s.downloader.DownLoad(url)
if resp == nil{
/*下載失敗重新加入調(diào)度隊(duì)列中*/
if !s.downloader.Visited(url){
s.scheduler.Push(url)
}
return
}
// 頁面處理 - 使用goquery包簡單處理
doc,err := goquery.NewDocumentFromReader(resp.Body)
if err != nil{
log.Println("[Process] 解析錯(cuò)誤")
s.scheduler.Push(url)
return
}
// 將新請求鏈接加入到調(diào)度器中
links := s.pageprocess.Process(doc)
for _,url := range links{
if !s.downloader.Visited(url){
s.scheduler.Push(url)
}
}
// 輸出文檔
go s.pipeline.Process(doc)
}
// 控制線程并發(fā)數(shù)
package spider
type ResourceManager struct {
tc chan uint8
}
func NewResourceManagerChan(num uint8) *ResourceManager{
tc := make(chan uint8,num)
return &ResourceManager{tc:tc}
}
func (r *ResourceManager) GetOne(){
r.tc <- 1
}
func (r *ResourceManager) FreeOne(){
<- r.tc
}
func (r *ResourceManager) Cap() int{
return cap(r.tc)
}
func (r *ResourceManager) Has() int{
return len(r.tc)
}
func (r *ResourceManager) Left() int{
return cap(r.tc) - len(r.tc)
}
隊(duì)列調(diào)度器
隊(duì)列調(diào)度器實(shí)現(xiàn)獲取以及儲存請求驾胆。
請求的重復(fù)性交給下載器來判斷(考慮只有下載成功的請求才不需要訪問)。
簡化的請求為string
類型的url
鏈接贱呐。
package scheduler
import (
"container/list"
"crypto/md5"
"sync"
)
type QueueScheduler struct {
queue *list.List
locker *sync.Mutex
listkey map[[md5.Size]byte] *list.Element
}
func NewQueueSCheduler() *QueueScheduler{
queue := list.New()
locker := new(sync.Mutex)
listkey := make(map[[md5.Size]byte] *list.Element)
return &QueueScheduler{
queue:queue,
locker:locker,
listkey:listkey}
}
// Pop - 從隊(duì)列中獲取一個(gè)鏈接
func (s *QueueScheduler) Pop() (string,bool){
s.locker.Lock()
if s.queue.Len() <= 0{
s.locker.Unlock()
return "",false
}
e := s.queue.Front()
ret := e.Value.(string)
// 清除listkey中該元素,加入到訪問隊(duì)列中
key := md5.Sum([]byte(ret))
delete(s.listkey,key)
s.queue.Remove(e)
s.locker.Unlock()
return ret,true
}
// Push - 將鏈接放入隊(duì)列中
func (s *QueueScheduler) Push(url string){
s.locker.Lock()
key := md5.Sum([]byte(url))
// 鏈接已存在
if _,ok := s.listkey[key]; ok{
s.locker.Unlock()
return
}
e := s.queue.PushBack(url)
s.listkey[key] = e
s.locker.Unlock()
}
下載器
下載器提供接口下載請求丧诺,并返回下載得到的內(nèi)容。
下載器提供接口判斷請求是否已經(jīng)被處理過吼句。
若下載失敗則標(biāo)記當(dāng)前請求訪問失敗锅必,反之標(biāo)記當(dāng)前請求訪問成功,使用map
儲存惕艳。
簡化的下載器僅使用的http
包中的Get
方法。
package downloader
import (
"crypto/md5"
"log"
"net/http"
"sync"
)
type HttpDownLoader struct {
locker *sync.Mutex
downloaded map[[md5.Size]byte] bool
}
func NewHttpDownLoader() *HttpDownLoader{
locker := new(sync.Mutex)
downloaded := make(map[[md5.Size]byte]bool)
return &HttpDownLoader{
locker:locker,
downloaded:downloaded,
}
}
// 下載鏈接
func (h *HttpDownLoader) DownLoad(url string) *http.Response{
key := md5.Sum([]byte(url))
resp,err := http.Get(url)
h.locker.Lock()
// 已經(jīng)被訪問過了驹愚,不需要訪問远搪。
if ok,has := h.downloaded[key]; has && ok{
h.locker.Unlock()
return nil
}
// 訪問失敗
if err != nil || resp.StatusCode != http.StatusOK{
log.Println("[DownLoader] 下載鏈接失敗:" + url)
h.downloaded[key] = false
h.locker.Unlock()
return nil
}
h.downloaded[key] = true
h.locker.Unlock()
log.Println("[DownLoader] 下載鏈接成功:" + url)
return resp
}
// 鏈接是否被訪問
func (h *HttpDownLoader) Visited(url string) bool{
key := md5.Sum([]byte(url))
var ret bool
h.locker.Lock()
if ok,has := h.downloaded[key]; has && ok{
ret = true
}else{
ret = false
}
h.locker.Unlock()
return ret
}
頁面處理
頁面處理需要返回鏈接請求集合,這里簡化為[]string
類型逢捺。
頁面處理需要返回文檔谁鳍,這里直接簡化為goquery
包中的document
。
package pageprocess
import (
"github.com/PuerkitoBio/goquery"
)
type PageProcess struct {
}
func NewPageProcess() PageProcess{
return PageProcess{}
}
// 返回鏈接函數(shù)
func (p *PageProcess) Process(d *goquery.Document) []string{
var links []string
// 獲取鏈接的處理代碼
return links
}
輸出
package pipeline
import (
"github.com/PuerkitoBio/goquery"
"log"
"os"
)
type FilePipeLine struct {
dir string
}
func NewFilePipeLine(dir string) *FilePipeLine{
return &FilePipeLine{dir:dir}
}
func (p *FilePipeLine) Process(doc *goquery.Document){
// 文件寫入實(shí)現(xiàn)
}