Java 通用爬蟲框架中多線程的使用

spider.jpg

一. 前言

NetDiscovery 是本人開發(fā)的一款基于 Vert.x凿掂、RxJava 2 等框架實現(xiàn)的通用爬蟲框架。它包含了豐富的特性纹蝴。

二. 多線程的使用

NetDiscovery 雖然借助了 RxJava 2 來實現(xiàn)線程的切換庄萎,仍然有大量使用多線程的場景。本文列舉一些爬蟲框架常見的多線程使用場景塘安。

2.1 爬蟲的暫停糠涛、恢復(fù)

暫停和恢復(fù)是最常見的爬蟲使用場景,這里借助 CountDownLatch 類實現(xiàn)兼犯。

CountDownLatch是一個同步工具類忍捡,它允許一個或多個線程一直等待集漾,直到其他線程的操作執(zhí)行完后再執(zhí)行砸脊。

暫停方法會初始化一個 CountDownLatch 類 pauseCountDown具篇,并設(shè)置它的計數(shù)值為1。

恢復(fù)方法會執(zhí)行 pauseCountDown 的 countDown() 凌埂,正好它的計數(shù)到達零驱显。

    /**
     * 爬蟲暫停,當前正在抓取的請求會繼續(xù)抓取完成瞳抓,之后的請求會等到resume的調(diào)用才繼續(xù)抓取
     */
    public void pause() {
        this.pauseCountDown = new CountDownLatch(1);
        this.pause = true;
        stat.compareAndSet(SPIDER_STATUS_RUNNING, SPIDER_STATUS_PAUSE);
    }

    /**
     * 爬蟲重新開始
     */
    public void resume() {

        if (stat.get() == SPIDER_STATUS_PAUSE
                && this.pauseCountDown!=null) {

            this.pauseCountDown.countDown();
            this.pause = false;
            stat.compareAndSet(SPIDER_STATUS_PAUSE, SPIDER_STATUS_RUNNING);
        }
    }

從消息隊列中取出爬蟲的 Request 時埃疫,會先判斷是否需要暫停爬蟲的行為,如果需要暫停則執(zhí)行 pauseCountDown 的 await()孩哑。await() 會使線程一直受阻塞栓霜,也就是暫停爬蟲的行為,直到 CountDownLatch 的計數(shù)為0横蜒,此時正好能夠恢復(fù)爬蟲運行的狀態(tài)胳蛮。

        while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {

            //暫停抓取
            if (pause && pauseCountDown!=null) {
                try {
                    this.pauseCountDown.await();
                } catch (InterruptedException e) {
                    log.error("can't pause : ", e);
                }

                initialDelay();
            }
            // 從消息隊列中取出request
           final Request request = queue.poll(name);
           ......
      }

2.2 多緯度控制爬取速度

下圖反映了單個爬蟲的流程。

basic_principle.png

如果爬蟲爬取速度太快一定會被對方系統(tǒng)識別愁铺,NetDiscovery 可以通過限速來實現(xiàn)基本的反反爬蟲鹰霍。

NetDiscovery 內(nèi)部支持多個緯度實現(xiàn)爬蟲限速。這些緯度也基本上對應(yīng)了單個爬蟲的流程茵乱。

2.2.1 Request

首先茂洒,爬蟲封裝的請求 Request 支持暫停。從消息隊列取出 Request 之后瓶竭,會校驗該 Request 是否需要暫停督勺。

        while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {

            //暫停抓取
            ......

            // 從消息隊列中取出request
            final Request request = queue.poll(name);

            if (request == null) {

                waitNewRequest();
            } else {

                if (request.getSleepTime() > 0) {

                    try {
                        Thread.sleep(request.getSleepTime());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                ......
            }
        }

2.2.2 Download

爬蟲下載時,下載器會創(chuàng)建 RxJava 的 Maybe 對象斤贰。Download 的限速借助于 RxJava 的 compose智哀、Transformer 來實現(xiàn)。

下面的代碼展示了 DownloaderDelayTransformer:

import cn.netdiscovery.core.domain.Request;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.MaybeTransformer;

import java.util.concurrent.TimeUnit;

/**
 * Created by tony on 2019-04-26.
 */
public class DownloaderDelayTransformer implements MaybeTransformer {

    private Request request;

    public DownloaderDelayTransformer(Request request) {
        this.request = request;
    }

    @Override
    public MaybeSource apply(Maybe upstream) {

        return request.getDownloadDelay() > 0 ? upstream.delay(request.getDownloadDelay(), TimeUnit.MILLISECONDS) : upstream;
    }
}

下載器只要借助 compose 荧恍、DownloaderDelayTransformer瓷叫,就可以實現(xiàn) Download 的限速。

以 UrlConnectionDownloader 為例:

        Maybe.create(new MaybeOnSubscribe<InputStream>() {

                @Override
                public void subscribe(MaybeEmitter<InputStream> emitter) throws Exception {

                    emitter.onSuccess(httpUrlConnection.getInputStream());
                }
            })
             .compose(new DownloaderDelayTransformer(request))
             .map(new Function<InputStream, Response>() {

                @Override
                public Response apply(InputStream inputStream) throws Exception {

                    ......
                    return response;
                }
            });

2.2.3 Domain

Domain 的限速參考了 Scrapy 框架的實現(xiàn)送巡,將每個域名以及它對應(yīng)的最近訪問時間存到 ConcurrentHashMap 中摹菠。每次請求時,可以設(shè)置 Request 的 domainDelay 屬性骗爆,從而實現(xiàn)單個 Request 對某個 Domain 的限速次氨。

import cn.netdiscovery.core.domain.Request;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created by tony on 2019-05-06.
 */
public class Throttle {

    private Map<String,Long> domains = new ConcurrentHashMap<String,Long>();

    private static class Holder {
        private static final Throttle instance = new Throttle();
    }

    private Throttle() {
    }

    public static final Throttle getInsatance() {
        return Throttle.Holder.instance;
    }

    public void wait(Request request) {

        String domain = request.getUrlParser().getHost();
        Long lastAccessed = domains.get(domain);

        if (lastAccessed!=null && lastAccessed>0) {
            long sleepSecs = request.getDomainDelay() - (System.currentTimeMillis() - lastAccessed);
            if (sleepSecs > 0) {
                try {
                    Thread.sleep(sleepSecs);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        domains.put(domain,System.currentTimeMillis());
    }
}

待 Request 從消息隊列中取出時,會先判斷 Request 是否需要暫停之后摘投,然后再判斷一下 Domain 的訪問是否需要暫停煮寡。

        while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {

            //暫停抓取
            ......

            // 從消息隊列中取出request
            final Request request = queue.poll(name);

            if (request == null) {

                waitNewRequest();
            } else {

                if (request.getSleepTime() > 0) {

                    try {
                        Thread.sleep(request.getSleepTime());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                Throttle.getInsatance().wait(request);
 
                ......
            }
        }

2.2.4 Pipeline

爬蟲處理 Request 的流程大體是這樣的:調(diào)用網(wǎng)絡(luò)請求 (包括重試機制) -> 將 response 存放到 page -> 解析 page -> 順序執(zhí)行 pipelines -> 完成一次 Request 請求虹蓄。

                // request正在處理
                downloader.download(request)
                        .retryWhen(new RetryWithDelay(maxRetries, retryDelayMillis, request)) // 對網(wǎng)絡(luò)請求的重試機制
                        .map(new Function<Response, Page>() {

                            @Override
                            public Page apply(Response response) throws Exception {
                                // 將 response 存放到 page
                                ......                            
                                return page;
                            }
                        })
                        .map(new Function<Page, Page>() {

                            @Override
                            public Page apply(Page page) throws Exception {

                                if (parser != null) {

                                    parser.process(page);
                                }

                                return page;
                            }
                        })
                        .map(new Function<Page, Page>() {

                            @Override
                            public Page apply(Page page) throws Exception {

                                if (!page.getResultItems().isSkip() && Preconditions.isNotBlank(pipelines)) {

                                    pipelines.stream()
                                            .forEach(pipeline -> {
                                                pipeline.process(page.getResultItems());
                                            });
                                }

                                return page;
                            }
                        })
                        .observeOn(Schedulers.io())
                        .subscribe(new Consumer<Page>() {

                            @Override
                            public void accept(Page page) throws Exception {

                                log.info(page.getUrl());

                                if (request.getAfterRequest() != null) {

                                    request.getAfterRequest().process(page);
                                }

                                signalNewRequest();
                            }
                        }, new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {

                                log.error(throwable.getMessage(), throwable);
                            }
                        });

Pipeline 的限速實質(zhì)借助了 RxJava 的 delay 和 block 操作符實現(xiàn)。

map(new Function<Page, Page>() {

        @Override
        public Page apply(Page page) throws Exception {

               if (!page.getResultItems().isSkip() && Preconditions.isNotBlank(pipelines)) {

                   pipelines.stream()
                          .forEach(pipeline -> {

                                if (pipeline.getPipelineDelay()>0) {

                                        // Pipeline Delay
                                        Observable.just("pipeline delay").delay(pipeline.getPipelineDelay(),TimeUnit.MILLISECONDS).blockingFirst();
                                 }

                                pipeline.process(page.getResultItems());
                          });
               }

                return page;
       }
})

另外幸撕,NetDiscovery 支持通過配置 application.yaml 或 application.properties 文件薇组,來配置爬蟲。當然也支持配置限速的參數(shù)杈帐,同時支持使用隨機的數(shù)值來配置相應(yīng)的限速參數(shù)体箕。

2.3 非阻塞的爬蟲運行

早期的版本,爬蟲運行之后無法再添加新的 Request挑童。因為爬蟲消費完隊列中的 Request 之后累铅,默認退出程序了。

新版本借助于 Condition站叼,即使某個爬蟲正在運行仍然可以添加 Request 到它到消息隊列中娃兽。

Condition 的作用是對鎖進行更精確的控制。它用來替代傳統(tǒng)的 Object 的wait()尽楔、notify() 實現(xiàn)線程間的協(xié)作投储,相比使用 Object 的 wait()、notify()阔馋,使用Condition 的 await()玛荞、signal() 這種方式實現(xiàn)線程間協(xié)作更加安全和高效。

在 Spider 中需要定義好 ReentrantLock 以及 Condition呕寝。

然后再定義 waitNewRequest() 勋眯、signalNewRequest() 方法,它們的作用分別是掛起當前的爬蟲線程等待新的 Request 下梢、喚醒爬蟲線程消費消息隊列中的 Request客蹋。

    private ReentrantLock newRequestLock = new ReentrantLock();
    private Condition newRequestCondition = newRequestLock.newCondition();
  
    ......

    private void waitNewRequest() {
        newRequestLock.lock();

        try {
            newRequestCondition.await(sleepTime, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("waitNewRequest - interrupted, error {}", e);
        } finally {
            newRequestLock.unlock();
        }
    }

    public void signalNewRequest() {
        newRequestLock.lock();

        try {
            newRequestCondition.signalAll();
        } finally {
            newRequestLock.unlock();
        }
    }

可以看到,如果從消息隊列中取不出 Request孽江,則會運行 waitNewRequest()讶坯。

        while (getSpiderStatus() != SPIDER_STATUS_STOPPED) {

            //暫停抓取
            if (pause && pauseCountDown!=null) {
                try {
                    this.pauseCountDown.await();
                } catch (InterruptedException e) {
                    log.error("can't pause : ", e);
                }

                initialDelay();
            }

            // 從消息隊列中取出request
            final Request request = queue.poll(name);

            if (request == null) {

                waitNewRequest();
            } else {
                ......
            }
     }

然后,在 Queue 接口中包含了一個 default 方法 pushToRunninSpider() ,它內(nèi)部除了將 request push 到 queue 中岗屏,還有調(diào)用了 spider.signalNewRequest()辆琅。

    /**
     * 把Request請求添加到正在運行爬蟲的Queue中,無需阻塞爬蟲的運行
     *
     * @param request request
     */
    default void pushToRunninSpider(Request request, Spider spider) {

        push(request);
        spider.signalNewRequest();
    }

最后这刷,即使爬蟲已經(jīng)運行婉烟,也可以在任意時刻將 Request 添加到該爬蟲對應(yīng)到Queue 中。

        Spider spider = Spider.create(new DisruptorQueue())
                .name("tony")
                .url("http://www.163.com");

        CompletableFuture.runAsync(()->{
            spider.run();
        });

        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        spider.getQueue().pushToRunninSpider(new Request("https://www.baidu.com", "tony"),spider);

        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        spider.getQueue().pushToRunninSpider(new Request("http://www.reibang.com", "tony"),spider);

        System.out.println("end....");

總結(jié)

爬蟲框架 github 地址:https://github.com/fengzhizi715/NetDiscovery

本文總結(jié)了通用爬蟲框架在某些特定場景中如何使用多線程崭歧。未來,NetDiscovery 還會增加更為通用的功能撞牢。

該系列的相關(guān)文章:
Disruptor 實踐:整合到現(xiàn)有的爬蟲框架
從API到DSL —— 使用 Kotlin 特性為爬蟲框架進一步封裝
使用Kotlin Coroutines簡單改造原有的爬蟲框架
為爬蟲框架構(gòu)建Selenium模塊率碾、DSL模塊(Kotlin實現(xiàn))
基于Vert.x和RxJava 2構(gòu)建通用的爬蟲框架

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末叔营,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子所宰,更是在濱河造成了極大的恐慌绒尊,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,270評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件仔粥,死亡現(xiàn)場離奇詭異婴谱,居然都是意外死亡,警方通過查閱死者的電腦和手機躯泰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評論 3 395
  • 文/潘曉璐 我一進店門谭羔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人麦向,你說我怎么就攤上這事瘟裸。” “怎么了诵竭?”我有些...
    開封第一講書人閱讀 165,630評論 0 356
  • 文/不壞的土叔 我叫張陵话告,是天一觀的道長。 經(jīng)常有香客問我卵慰,道長沙郭,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,906評論 1 295
  • 正文 為了忘掉前任裳朋,我火速辦了婚禮病线,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘再扭。我一直安慰自己氧苍,他們只是感情好,可當我...
    茶點故事閱讀 67,928評論 6 392
  • 文/花漫 我一把揭開白布泛范。 她就那樣靜靜地躺著让虐,像睡著了一般。 火紅的嫁衣襯著肌膚如雪罢荡。 梳的紋絲不亂的頭發(fā)上赡突,一...
    開封第一講書人閱讀 51,718評論 1 305
  • 那天,我揣著相機與錄音区赵,去河邊找鬼惭缰。 笑死,一個胖子當著我的面吹牛笼才,可吹牛的內(nèi)容都是我干的漱受。 我是一名探鬼主播,決...
    沈念sama閱讀 40,442評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼骡送,長吁一口氣:“原來是場噩夢啊……” “哼昂羡!你這毒婦竟也來了絮记?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,345評論 0 276
  • 序言:老撾萬榮一對情侶失蹤虐先,失蹤者是張志新(化名)和其女友劉穎怨愤,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蛹批,經(jīng)...
    沈念sama閱讀 45,802評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡撰洗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,984評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了腐芍。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片差导。...
    茶點故事閱讀 40,117評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖甸赃,靈堂內(nèi)的尸體忽然破棺而出柿汛,到底是詐尸還是另有隱情,我是刑警寧澤埠对,帶...
    沈念sama閱讀 35,810評論 5 346
  • 正文 年R本政府宣布络断,位于F島的核電站,受9級特大地震影響项玛,放射性物質(zhì)發(fā)生泄漏貌笨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,462評論 3 331
  • 文/蒙蒙 一襟沮、第九天 我趴在偏房一處隱蔽的房頂上張望锥惋。 院中可真熱鬧,春花似錦开伏、人聲如沸膀跌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽捅伤。三九已至,卻和暖如春巫玻,著一層夾襖步出監(jiān)牢的瞬間丛忆,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評論 1 272
  • 我被黑心中介騙來泰國打工仍秤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留熄诡,地道東北人。 一個月前我還...
    沈念sama閱讀 48,377評論 3 373
  • 正文 我出身青樓诗力,卻偏偏與公主長得像凰浮,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,060評論 2 355