一. 前言
NetDiscovery 是本人開發(fā)的一款基于 Vert.x凿掂、RxJava 2 等框架實現(xiàn)的通用爬蟲框架。它包含了豐富的特性纹蝴。
二. 多線程的使用
NetDiscovery 雖然借助了 RxJava 2 來實現(xiàn)線程的切換庄萎,仍然有大量使用多線程的場景。本文列舉一些爬蟲框架常見的多線程使用場景塘安。
2.1 爬蟲的暫停糠涛、恢復(fù)
暫停和恢復(fù)是最常見的爬蟲使用場景,這里借助 CountDownLatch 類實現(xiàn)兼犯。
CountDownLatch是一個同步工具類忍捡,它允許一個或多個線程一直等待集漾,直到其他線程的操作執(zhí)行完后再執(zhí)行砸脊。
暫停方法會初始化一個 CountDownLatch 類 pauseCountDown具篇,并設(shè)置它的計數(shù)值為1。
恢復(fù)方法會執(zhí)行 pauseCountDown 的 countDown() 凌埂,正好它的計數(shù)到達零驱显。
/**
* 爬蟲暫停,當前正在抓取的請求會繼續(xù)抓取完成瞳抓,之后的請求會等到resume的調(diào)用才繼續(xù)抓取
*/
public void pause() {
this.pauseCountDown = new CountDownLatch(1);
this.pause = true;
stat.compareAndSet(SPIDER_STATUS_RUNNING, SPIDER_STATUS_PAUSE);
}
/**
* 爬蟲重新開始
*/
public void resume() {
if (stat.get() == SPIDER_STATUS_PAUSE
&& this.pauseCountDown!=null) {
this.pauseCountDown.countDown();
this.pause = false;
stat.compareAndSet(SPIDER_STATUS_PAUSE, SPIDER_STATUS_RUNNING);
}
}
從消息隊列中取出爬蟲的 Request 時埃疫,會先判斷是否需要暫停爬蟲的行為,如果需要暫停則執(zhí)行 pauseCountDown 的 await()孩哑。await() 會使線程一直受阻塞栓霜,也就是暫停爬蟲的行為,直到 CountDownLatch 的計數(shù)為0横蜒,此時正好能夠恢復(fù)爬蟲運行的狀態(tài)胳蛮。
while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {
//暫停抓取
if (pause && pauseCountDown!=null) {
try {
this.pauseCountDown.await();
} catch (InterruptedException e) {
log.error("can't pause : ", e);
}
initialDelay();
}
// 從消息隊列中取出request
final Request request = queue.poll(name);
......
}
2.2 多緯度控制爬取速度
下圖反映了單個爬蟲的流程。
如果爬蟲爬取速度太快一定會被對方系統(tǒng)識別愁铺,NetDiscovery 可以通過限速來實現(xiàn)基本的反反爬蟲鹰霍。
在 NetDiscovery 內(nèi)部支持多個緯度實現(xiàn)爬蟲限速。這些緯度也基本上對應(yīng)了單個爬蟲的流程茵乱。
2.2.1 Request
首先茂洒,爬蟲封裝的請求 Request 支持暫停。從消息隊列取出 Request 之后瓶竭,會校驗該 Request 是否需要暫停督勺。
while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {
//暫停抓取
......
// 從消息隊列中取出request
final Request request = queue.poll(name);
if (request == null) {
waitNewRequest();
} else {
if (request.getSleepTime() > 0) {
try {
Thread.sleep(request.getSleepTime());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
......
}
}
2.2.2 Download
爬蟲下載時,下載器會創(chuàng)建 RxJava 的 Maybe 對象斤贰。Download 的限速借助于 RxJava 的 compose智哀、Transformer 來實現(xiàn)。
下面的代碼展示了 DownloaderDelayTransformer:
import cn.netdiscovery.core.domain.Request;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.MaybeTransformer;
import java.util.concurrent.TimeUnit;
/**
* Created by tony on 2019-04-26.
*/
public class DownloaderDelayTransformer implements MaybeTransformer {
private Request request;
public DownloaderDelayTransformer(Request request) {
this.request = request;
}
@Override
public MaybeSource apply(Maybe upstream) {
return request.getDownloadDelay() > 0 ? upstream.delay(request.getDownloadDelay(), TimeUnit.MILLISECONDS) : upstream;
}
}
下載器只要借助 compose 荧恍、DownloaderDelayTransformer瓷叫,就可以實現(xiàn) Download 的限速。
以 UrlConnectionDownloader 為例:
Maybe.create(new MaybeOnSubscribe<InputStream>() {
@Override
public void subscribe(MaybeEmitter<InputStream> emitter) throws Exception {
emitter.onSuccess(httpUrlConnection.getInputStream());
}
})
.compose(new DownloaderDelayTransformer(request))
.map(new Function<InputStream, Response>() {
@Override
public Response apply(InputStream inputStream) throws Exception {
......
return response;
}
});
2.2.3 Domain
Domain 的限速參考了 Scrapy 框架的實現(xiàn)送巡,將每個域名以及它對應(yīng)的最近訪問時間存到 ConcurrentHashMap 中摹菠。每次請求時,可以設(shè)置 Request 的 domainDelay 屬性骗爆,從而實現(xiàn)單個 Request 對某個 Domain 的限速次氨。
import cn.netdiscovery.core.domain.Request;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by tony on 2019-05-06.
*/
public class Throttle {
private Map<String,Long> domains = new ConcurrentHashMap<String,Long>();
private static class Holder {
private static final Throttle instance = new Throttle();
}
private Throttle() {
}
public static final Throttle getInsatance() {
return Throttle.Holder.instance;
}
public void wait(Request request) {
String domain = request.getUrlParser().getHost();
Long lastAccessed = domains.get(domain);
if (lastAccessed!=null && lastAccessed>0) {
long sleepSecs = request.getDomainDelay() - (System.currentTimeMillis() - lastAccessed);
if (sleepSecs > 0) {
try {
Thread.sleep(sleepSecs);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
domains.put(domain,System.currentTimeMillis());
}
}
待 Request 從消息隊列中取出時,會先判斷 Request 是否需要暫停之后摘投,然后再判斷一下 Domain 的訪問是否需要暫停煮寡。
while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {
//暫停抓取
......
// 從消息隊列中取出request
final Request request = queue.poll(name);
if (request == null) {
waitNewRequest();
} else {
if (request.getSleepTime() > 0) {
try {
Thread.sleep(request.getSleepTime());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Throttle.getInsatance().wait(request);
......
}
}
2.2.4 Pipeline
爬蟲處理 Request 的流程大體是這樣的:調(diào)用網(wǎng)絡(luò)請求 (包括重試機制) -> 將 response 存放到 page -> 解析 page -> 順序執(zhí)行 pipelines -> 完成一次 Request 請求虹蓄。
// request正在處理
downloader.download(request)
.retryWhen(new RetryWithDelay(maxRetries, retryDelayMillis, request)) // 對網(wǎng)絡(luò)請求的重試機制
.map(new Function<Response, Page>() {
@Override
public Page apply(Response response) throws Exception {
// 將 response 存放到 page
......
return page;
}
})
.map(new Function<Page, Page>() {
@Override
public Page apply(Page page) throws Exception {
if (parser != null) {
parser.process(page);
}
return page;
}
})
.map(new Function<Page, Page>() {
@Override
public Page apply(Page page) throws Exception {
if (!page.getResultItems().isSkip() && Preconditions.isNotBlank(pipelines)) {
pipelines.stream()
.forEach(pipeline -> {
pipeline.process(page.getResultItems());
});
}
return page;
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<Page>() {
@Override
public void accept(Page page) throws Exception {
log.info(page.getUrl());
if (request.getAfterRequest() != null) {
request.getAfterRequest().process(page);
}
signalNewRequest();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
log.error(throwable.getMessage(), throwable);
}
});
Pipeline 的限速實質(zhì)借助了 RxJava 的 delay 和 block 操作符實現(xiàn)。
map(new Function<Page, Page>() {
@Override
public Page apply(Page page) throws Exception {
if (!page.getResultItems().isSkip() && Preconditions.isNotBlank(pipelines)) {
pipelines.stream()
.forEach(pipeline -> {
if (pipeline.getPipelineDelay()>0) {
// Pipeline Delay
Observable.just("pipeline delay").delay(pipeline.getPipelineDelay(),TimeUnit.MILLISECONDS).blockingFirst();
}
pipeline.process(page.getResultItems());
});
}
return page;
}
})
另外幸撕,NetDiscovery 支持通過配置 application.yaml 或 application.properties 文件薇组,來配置爬蟲。當然也支持配置限速的參數(shù)杈帐,同時支持使用隨機的數(shù)值來配置相應(yīng)的限速參數(shù)体箕。
2.3 非阻塞的爬蟲運行
早期的版本,爬蟲運行之后無法再添加新的 Request挑童。因為爬蟲消費完隊列中的 Request 之后累铅,默認退出程序了。
新版本借助于 Condition站叼,即使某個爬蟲正在運行仍然可以添加 Request 到它到消息隊列中娃兽。
Condition 的作用是對鎖進行更精確的控制。它用來替代傳統(tǒng)的 Object 的wait()尽楔、notify() 實現(xiàn)線程間的協(xié)作投储,相比使用 Object 的 wait()、notify()阔馋,使用Condition 的 await()玛荞、signal() 這種方式實現(xiàn)線程間協(xié)作更加安全和高效。
在 Spider 中需要定義好 ReentrantLock 以及 Condition呕寝。
然后再定義 waitNewRequest() 勋眯、signalNewRequest() 方法,它們的作用分別是掛起當前的爬蟲線程等待新的 Request 下梢、喚醒爬蟲線程消費消息隊列中的 Request客蹋。
private ReentrantLock newRequestLock = new ReentrantLock();
private Condition newRequestCondition = newRequestLock.newCondition();
......
private void waitNewRequest() {
newRequestLock.lock();
try {
newRequestCondition.await(sleepTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("waitNewRequest - interrupted, error {}", e);
} finally {
newRequestLock.unlock();
}
}
public void signalNewRequest() {
newRequestLock.lock();
try {
newRequestCondition.signalAll();
} finally {
newRequestLock.unlock();
}
}
可以看到,如果從消息隊列中取不出 Request孽江,則會運行 waitNewRequest()讶坯。
while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {
//暫停抓取
if (pause && pauseCountDown!=null) {
try {
this.pauseCountDown.await();
} catch (InterruptedException e) {
log.error("can't pause : ", e);
}
initialDelay();
}
// 從消息隊列中取出request
final Request request = queue.poll(name);
if (request == null) {
waitNewRequest();
} else {
......
}
}
然后,在 Queue 接口中包含了一個 default 方法 pushToRunninSpider() ,它內(nèi)部除了將 request push 到 queue 中岗屏,還有調(diào)用了 spider.signalNewRequest()辆琅。
/**
* 把Request請求添加到正在運行爬蟲的Queue中,無需阻塞爬蟲的運行
*
* @param request request
*/
default void pushToRunninSpider(Request request, Spider spider) {
push(request);
spider.signalNewRequest();
}
最后这刷,即使爬蟲已經(jīng)運行婉烟,也可以在任意時刻將 Request 添加到該爬蟲對應(yīng)到Queue 中。
Spider spider = Spider.create(new DisruptorQueue())
.name("tony")
.url("http://www.163.com");
CompletableFuture.runAsync(()->{
spider.run();
});
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
spider.getQueue().pushToRunninSpider(new Request("https://www.baidu.com", "tony"),spider);
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
spider.getQueue().pushToRunninSpider(new Request("http://www.reibang.com", "tony"),spider);
System.out.println("end....");
總結(jié)
爬蟲框架 github 地址:https://github.com/fengzhizi715/NetDiscovery
本文總結(jié)了通用爬蟲框架在某些特定場景中如何使用多線程崭歧。未來,NetDiscovery 還會增加更為通用的功能撞牢。
該系列的相關(guān)文章:
Disruptor 實踐:整合到現(xiàn)有的爬蟲框架
從API到DSL —— 使用 Kotlin 特性為爬蟲框架進一步封裝
使用Kotlin Coroutines簡單改造原有的爬蟲框架
為爬蟲框架構(gòu)建Selenium模塊率碾、DSL模塊(Kotlin實現(xiàn))
基于Vert.x和RxJava 2構(gòu)建通用的爬蟲框架