Go 語言作為一門為編寫網(wǎng)絡(luò)應(yīng)用程序而生的編程語言袜匿,在擁有比 Java 更強(qiáng)的并發(fā)性的同時暴构,有擁有比 C 和 C++ 更快的開發(fā)速度(得益于簡潔的語法和豐富的標(biāo)準(zhǔn)庫),非常適合用于開發(fā)爬蟲程序魂贬。筆者基于 Go 語言開發(fā)了一個爬蟲程序跟继,并從單任務(wù)版本改良為并發(fā)式版本,最后演進(jìn)為分布式版本蛙婴。下面就分享出并發(fā)式版本和分布式版的架構(gòu)和設(shè)計思想粗井。
并發(fā)式爬蟲
并發(fā)式版本利用了 Go 語言強(qiáng)大的并發(fā)性能,解決了單任務(wù)版本爬取速度慢的問題街图。并發(fā)式版本由四大部分構(gòu)成浇衬,分別是Engine、Scheduler餐济、Worker和Saver耘擂。
- 爬蟲程序輸入一個由 URL 和 Parser 構(gòu)成的種子 Request
- Engine 會將接收的每個 Request 傳遞給維護(hù)著一個 Request 隊列和一個 可用 Worker 隊列的 Scheduler
- Scheduler 依據(jù) FIFO 的原則將 Request 任務(wù)分配給 Worker
- 每個包含 Fetcher 和 Parser 的 Goroutine 代表一個 Worker,每個 Worker 并發(fā)執(zhí)行任務(wù)絮姆,任務(wù)完成后重新加入 Scheduler 中的可用 Worker 隊列醉冤。Fetcher 首先網(wǎng)絡(luò)請求 Request 中的 URL 地址,并將返回的網(wǎng)絡(luò)響應(yīng)傳遞給 Parser篙悯,Parser 再講網(wǎng)絡(luò)響應(yīng)中需要的信息解析并提取出來蚁阳,再返回給 Engine
- Worker 返回給 Engine 的 ParseResult 中既包含我們需要得到的信息 Item,又包括需要進(jìn)一步爬取的 Request 任務(wù)鸽照,Engine 再將任務(wù)提交給 Scheduler 之前需要先進(jìn)行去重處理螺捐,以防重復(fù)爬取
- Engine 將需要保存的 Item 通過通道發(fā)送給 Saver
Model
// 請求,包括URL和指定的解析函數(shù)
type Request struct {
Url string
Parser Parser
}
// 解析結(jié)果
type ParseResult struct {
Requests []Request
Items []Item
}
// 最終要保存的條目
type Item struct {
Url string
Id string
Type string
Payload interface{}
}
Engine
type ConcurrentEngine struct {
MaxWorkerCount int // 工作協(xié)程的數(shù)量
Scheduler Scheduler // 任務(wù)調(diào)度器
ItemChan chan Item // 與ItemSaver之間的通道
RequestWorker Processor // Worker的處理器
}
type Processor func(request Request) (ParseResult, error)
type Scheduler interface {
Submit(request Request)
GetWorkerChan() chan Request
Run()
Ready
}
type Ready interface {
WorkerReady(chan Request)
}
func (e *ConcurrentEngine) Run(seed ...Request) {
out := make(chan ParseResult, 1024) // Worker返回任務(wù)結(jié)果的通道矮燎,緩沖區(qū)大小為1024
e.Scheduler.Run()
// 根據(jù)配置的Worker數(shù)量創(chuàng)建Goroutin
for i := 0; i < e.MaxWorkerCount; i++ {
e.createWorker(e.Scheduler.GetWorkerChan(), out, e.Scheduler)
}
for _, r := range seed {
// 先去重定血,再提交任務(wù)給Scheduler
if IsDuplicate(r.Url) {
continue
}
e.Scheduler.Submit(r)
}
for {
result := <-out // 獲取Worker返回的結(jié)果
for _, item := range result.Items {
go func() { e.ItemChan <- item }() // 給Saver發(fā)送Item
}
for _, r := range result.Requests {
// 先去重,再提交任務(wù)給Scheduler
if IsDuplicate(r.Url) {
continue
}
e.Scheduler.Submit(r)
}
}
}
/**
* Worker工廠函數(shù)
* in Scheduler向Worker發(fā)送任務(wù)的通道
* out Worker向Engine返回結(jié)果的通道
* s 實現(xiàn)了Ready接口的Scheduler
*/
func (e *ConcurrentEngine) createWorker(in chan Request, out chan ParseResult, s Ready) {
go func() {
for {
s.WorkerReady(in) // 完成任務(wù)后漏峰,通知Scheduler當(dāng)前Worker可用
request := <-in // 從Scheduler獲取任務(wù)
result, err := e.RequestWorker(request) // 執(zhí)行任務(wù)
if err != nil {
continue
}
out <- result // 返回任務(wù)執(zhí)行結(jié)果
}
}()
}
Scheduler
type QueuedScheduler struct {
requestChan chan engine.Request // 接收Engine提交任務(wù)的通道
workerChan chan chan engine.Request // 派發(fā)任務(wù)給Worker的通道
}
func (s *QueuedScheduler) Submit(request engine.Request) {
s.requestChan <- request
}
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
s.workerChan <- w
}
func (s *QueuedScheduler) GetWorkerChan() chan engine.Request {
return make(chan engine.Request)
}
func (s *QueuedScheduler) Run() {
s.requestChan = make(chan engine.Request)
s.workerChan = make(chan chan engine.Request)
go func() {
var requestQ []engine.Request // 任務(wù)隊列
var workerQ []chan engine.Request // Worker隊列
for {
var activeRequest engine.Request
var activeWorker chan engine.Request
if len(requestQ) > 0 && len(workerQ) > 0 {
activeRequest = requestQ[0]
activeWorker = workerQ[0]
}
select {
case r := <-s.requestChan:
// 若任務(wù)通道中有新任務(wù)糠悼,加入任務(wù)隊列
requestQ = append(requestQ, r)
case w := <-s.workerChan:
// 若可用Worker通道中有新Worker,加入Worker隊列
workerQ = append(workerQ, w)
case activeWorker <- activeRequest:
// 將任務(wù)隊列中的首個元素派發(fā)給可用Worker隊列中的首個元素
requestQ = requestQ[1:]
workerQ = workerQ[1:]
}
}
}()
}
Worker
func Worker(r Request) (ParseResult, error) {
body, err := fetcher.Fetch(r.Url)
if err != nil {
log.Error().Msgf("請求[%s]失斍城恰:%s", r.Url, err)
return ParseResult{}, err
}
return r.Parser.Parse(body, r.Url), nil
}
每個 Worker 運行在一個獨立的 Goroutine 當(dāng)中,讀者需要根據(jù)實際爬取的網(wǎng)站來編寫 Fetcher 和 Parser。
ItemSaver
ItemSaver 的任務(wù)就是將 Engine 通過 Channel 傳送過來的 Item 持久化存儲靖苇,讀者可以根據(jù)自己的需求來實現(xiàn) ItemSaver席噩,將 Item 存儲到 MySQL,MongoDB贤壁,ElasticSearch 等數(shù)據(jù)庫悼枢。
分布式爬蟲
并發(fā)式版本雖然解決了單任務(wù)版本爬取效率低下的問題,但是在同一機(jī)器(同一 IP)上并發(fā)請求目標(biāo)網(wǎng)站脾拆,很容易因為短時間內(nèi)網(wǎng)絡(luò)請求流量過大而被目標(biāo)網(wǎng)站封禁馒索。
分布式版本將 Worker 和 Saver 分離部署到不同的機(jī)器上,不同機(jī)器上的 Worker 使用不同 IP名船,不但能夠解決 IP 封禁的問題绰上,還能進(jìn)一步提升爬取效率。
JSON-RPC
分布式系統(tǒng)需要通過網(wǎng)絡(luò)交互數(shù)據(jù)渠驼,同步系統(tǒng)中的狀態(tài)蜈块。本系統(tǒng)通過 JSON-RPC 同步各個節(jié)點中的狀態(tài),交互任務(wù)與任務(wù)執(zhí)行結(jié)果迷扇。Go 語言標(biāo)準(zhǔn)庫中的 net/rpc 包支持 JSON-RPC百揭,可以通過交換 JSON 格式的數(shù)據(jù)來進(jìn)行 RPC。
// 創(chuàng)建RPC服務(wù)器
func ServeRpc(host string, service interface{}) error {
rpc.Register(service)
listener, err := net.Listen("tcp", host)
if err != nil {
return err
}
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("accept error: %v", err)
continue
}
go jsonrpc.ServeConn(conn)
}
return nil
}
// 創(chuàng)建RPC客戶端
func NewClient(host string) (*rpc.Client, error) {
conn, err := net.Dial("tcp", host)
if err != nil {
return nil, err
}
return jsonrpc.NewClient(conn), nil
}
序列化與反序列化
系統(tǒng)中的各個節(jié)點在進(jìn)行 JSON-RPC 之前蜓席,必須對需要發(fā)送的對象和函數(shù)序列器一,對接收的對象和函數(shù)反序列化。
type SerializedParser struct {
Name string
Args interface{}
}
type Request struct {
Url string
Parser SerializedParser
}
type ParseResult struct {
Items []engine.Item
Requests []Request
}
// 序列化請求對象
func SerializeRequest(r engine.Request) Request {
name, args := r.Parse.Serialize()
return Request{
Url: r.Url,
Parser: SerializedParser{
Name: name,
Args: args,
},
}
}
// 序列化結(jié)果對象
func SerializeResult(r engine.ParseResult) (p ParseResult) {
p.Items = r.Items
for _, req := range r.Requests {
p.Requests = append(p.Requests, SerializeRequest(req))
}
return p
}
// 反序列解析器
func deserializeParser(p SerializedParser) (engine.Parser, error) {
switch p.Name {
case "ParseCity":
return engine.NewFuncParser(parser.ParseCity, p.Name), nil
case "ParseCityList":
return engine.NewFuncParser(parser.ParseCityList, p.Name), nil
case "ProfileParser":
if userName, ok := p.Args.(string); ok {
return parser.NewProfileParser(userName), nil
} else {
return nil, errors.New("invalid args for profileParser")
}
case "NilParser":
return engine.NilParse{}, nil
default:
return nil, errors.New("unknown parser name")
}
}
// 反序列化請求
func DeserializeRequest(r Request) (engine.Request, error) {
parse, err := deserializeParser(r.Parser)
if err != nil {
return engine.Request{}, err
}
req := engine.Request{
Url: r.Url,
Parse: parse,
}
return req, nil
}
// 反序列化結(jié)果
func DeserializeResult(r ParseResult) engine.ParseResult {
result := engine.ParseResult{
Items: r.Items,
}
for _, req := range r.Requests {
ereq, err := DeserializeRequest(req)
if err != nil {
log.Warn().Msgf("error deserializing request: %v", err)
continue
}
result.Requests = append(result.Requests, ereq)
}
return result
}
連接池
分布式系統(tǒng)中維護(hù)著一個 Worker 連接池厨内,Engine 通過連接池將請求任務(wù)派發(fā)到系統(tǒng)中的不同節(jié)點祈秕。
func createClientPool(hosts []string) chan *rpc.Client {
var clients []*rpc.Client
for _, h := range hosts {
client, err := rpcsupport.NewClient(h)
if err != nil {
log.Warn().Msgf("error connection to %s : %s", h, err)
} else {
clients = append(clients, client)
log.Warn().Msgf("connected to %s", h)
}
}
out := make(chan *rpc.Client)
go func() {
for {
for _, c := range clients {
out <- c
}
}
}()
return out
}