java爬蟲多線程redis隊列(爬取國美網(wǎng)站的商品信息)

前面那篇爬蟲文章用的是單線程沒有用到其它一些比較提高效率的工具比較遺憾,所以今天做了一個比較全面的爬蟲条舔。首先謝謝 @天不生我萬古長這位小伙伴的留言枫耳,不然還真有點懶了迁杨。因為上班所以也只能利用周末的時間來寫了狐史。其實這次構(gòu)思了很久。本來是想爬淘寶的商品信息,但是遇到了一個坑就是ssl的證書驗證,這里糾結(jié)了半天終于繞過去了余耽。但是由于淘寶的限制比較嚴,ip直接被限制訪問了。我也很無語,如果同樣有小伙伴遇到了https請求的證書驗證通過不了,建議去看一下這一篇博客,感覺寫的不錯。http://blog.csdn.net/u014256984/article/details/73330573 這里主要講的就是通過java代碼獲取證書文件,然后將證書文件放入到jdk下面,具體我就不細說了。說一說今天的重點朴爬。

首先說一下我的目標頁面具滴。國美的搜索頁
國美搜索頁面.png
搜索列表頁.png

技術(shù)點

httpClient Jsoup 這些都是爬蟲最基本的,就不老生常談了。這里我說一說用的新的技術(shù)點,以及新的技術(shù)點遇到了哪些坑。

  • redis 以及redis的隊列應用
    這里用redis主要的作用就是保存需要解析的url 以及已經(jīng)解析過的url兩個隊列囚玫。這里我遇到最多的問題铃在,就是用多線程執(zhí)行的時候出現(xiàn)redis鏈接重置的問題。網(wǎng)上查了一下也沒有一個統(tǒng)一的答案,我也只是根據(jù)控制臺輸出的錯誤信息感覺可能是在多線程執(zhí)行的時候,redis創(chuàng)建了多次連接。為什么會創(chuàng)建多次連接就會出現(xiàn)重置的問題贡必。我的猜測就是因為redis本省是不支持windows的,只是微軟在打了補丁的情況下才支持盒揉。這可能有一點影響羡洛。這方面我也沒有去深究橄仍。我的解決方案就是創(chuàng)建一個redis的單例模式第晰。

  • mongodb
    首先說一下為什么要用mongodb

    1. mongodb是非關(guān)系型數(shù)據(jù)庫纺非。
    2. mongodb相對于關(guān)系型數(shù)據(jù)庫他的效率要高很多很多跳夭。
    3. mongodb存儲數(shù)據(jù)理論上是沒有上限的贩汉,當然這是理論酒贬。
    4. mongodb4.5以后是天生自帶連接池的考蕾。
  • 線程池
    在處理多線程的問題的時候葵姥,如果創(chuàng)建一個線程池管理線程态辛。其實這里的效果是非常好的。但是好是好用,坑卻特別多玫霎,一定要注意對于有些數(shù)據(jù)進行操作的時候要進行枷鎖的操作叉钥,為了保證數(shù)據(jù)的準確性。

說了這么多也感覺有點詞窮了赖临,還是上代碼疑苔。

  • redis的隊列創(chuàng)建
package com.xdl.redisUtil;

import redis.clients.jedis.Jedis;

/**
 * 
* @ClassName: redisqueue
* redis隊列
* @author liangchu
* @date 2018-1-6 上午11:52:44 
*
 */
public class RedisQueue {
    // 這是單例
    private static  Jedis jedis = RedisSingleton.getJedisInstance();
    
    /*public RedisQueue(){
        //連接本地的 Redis 服務    
        jedis = RedisSingleton.getJedisInstance();
        
    }*/
    
    //將未訪問的url加入到toVisit表中(使用的是尾插法)
    public static void addToVisit(String url) {
        jedis.rpush("toVisit", url);
    }

    //將未訪問的url彈出進行解析
    public static String getToVisit() {
        return jedis.lpop("toVisit");
    }

    //將已經(jīng)解析過的url添加到已訪問隊列中
    public static void addVisited(String url) {
        jedis.rpush("visited", url);
    }

    //判斷待訪問url隊列是否為空
    public static boolean toVisitIsEmpty() {
        Long length = jedis.llen("toVisit");

        if (length == 0) {
            return true;
        } else {
            return false;
        }
    }
    
    
}

package com.xdl.redisUtil;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;

import com.mongodb.MongoClient;

public class MultithreadCrawler {

    /**
     * @throws Exception 
     * @throws InterruptedException 
     * @Title: main
     * @Description: TODO(這里用一句話描述這個方法的作用)
     * @param @param args    參數(shù)
     * @return void 返回類型
     * @author  liangchu
     * @date 2018-1-6 下午12:19:53 
     * @throws
     */
    public static void main(String[] args) throws  Exception {
        
     //拿到種子鏈接 這里主要從這幾個方面抓取數(shù)據(jù)
        List<String> strings = new ArrayList<String>();
        strings.add("手機");
        strings.add("男裝");
        strings.add("女裝");
        strings.add("電腦");
        strings.add("相機");
        strings.add("食品");
        //將種子鏈接寫進redis數(shù)據(jù)庫的待抓取列表
        for (String url : strings) {
            RedisQueue.addToVisit("http://search.gome.com.cn/search?question="+url+"&searchType=goods&page=1");
        }
        //創(chuàng)建一個收集線程的列表
        List<Thread> threadList = new ArrayList<Thread>();
        //創(chuàng)建線程的個數(shù)
        int threadNum = 5;
        // mongodb連接
        MongoClient mongo = new MongoClient("127.0.0.1", 27017);
        RunThread run = new RunThread();
        run.setThreads(threadNum,mongo);
        //創(chuàng)建5個線程隐圾,并對其進行收集
        for (int i = 0; i < threadNum; i++) {
            Thread thread = new Thread(run);
            thread.start();
            threadList.add(thread);
        }
        //main線程需要等待所有子線程退出
        while (threadList.size() > 0) {
            Thread child = threadList.remove(0);
            child.join();
        }
    }   
}

  • run函數(shù)
package com.xdl.redisUtil;

import java.util.ArrayList;
import java.util.List;

import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoClient;

public class RunThread extends Thread {
    MongoClient mongo = null;
    //線程計數(shù)器需要對所有線程可見翁授,是共享變量
    int threads = 0;
    //redis隊列的對象减途,也是所有對象共享的變量
    //創(chuàng)建線程鎖
    private static Object lock = new Object();
    public void setThreads(int threads,MongoClient mongo) {
        this.threads = threads;
        this.mongo = mongo;
    }

    @SuppressWarnings("deprecation")
    public void parseToVisitUrltoRedis() throws Exception {
        //用來保存新提取出來的url列表(此變量不應是共享變量,我們把它變?yōu)槊總€線程的私有變量)
        //我們應該知道的是在Java中哪些變量在線程之間是不共享的邻奠,參考資料:
        List<String> urlList = new ArrayList<String>();
        boolean flag = true;
        while (flag) {
            //從爬蟲隊列中取出待抓取的url
            if (!RedisQueue.toVisitIsEmpty()) {
                String url = RedisQueue.getToVisit();
                /**
                 * 對此url進行解析贰镣,提取出新的url列表
                 * 解析出來的url順便就寫進urlList中了
                 *
                 * 在這個過程中不要求保證同步,每個線程都負責解析自己所屬的url楼入,解析完成
                 * 之后將url寫入自己的urlList之中阐肤,當在解析過程中發(fā)生阻塞毫炉,則切換到其他
                 * 線程,保證程序的高并發(fā)性腾供。
                 */
                
             // 創(chuàng)建httpclient實例  
                CloseableHttpClient httpClient = HttpClients.createDefault();  
                // 創(chuàng)建httpget實例  
                HttpGet httpGet = new HttpGet(url); 
                // 執(zhí)行http get 請求  
                CloseableHttpResponse response = null;  
                response = httpClient.execute(httpGet);  
                HttpEntity entity = response.getEntity();// 獲取返回實體  
                // EntityUtils.toString(entity,"utf-8");//獲取網(wǎng)頁內(nèi)容节值,指定編碼  
                String html = EntityUtils.toString(entity, "UTF-8");  
                response.close();  
                httpClient.close();
                Document doc = Jsoup.parse(html);                               
                // 獲取產(chǎn)品列表信息
                Element elementP =  doc.getElementById("product-box");
                // 獲取產(chǎn)品列
                Elements elements = elementP.select("li[class=product-item]")
                        .select("div[class=item-tab-warp]");
               
                // 下一頁的信息就存入redis隊列當中 做下一次分析的url鏈接所用
                // 如果這個沒有數(shù)據(jù)這個線程就退出
                if(elements.size() <=0){
                    flag = false;
                    return ;
                }
                for (Element element : elements) {
                    // 獲取產(chǎn)品價格
                    String price = element.select("div[class=item-tab]").select("div[class=item-price-info]")
                            .select("p[class=item-price]")
                            .select("span[class=price asynPrice]").text();
                    // 獲取產(chǎn)品名稱 和產(chǎn)品鏈接
                    String producthref = element.select("p[class=item-name]")
                            .select("a[class=emcodeItem item-link]").attr("href");
                    String productTitle = element.select("p[class=item-name]")
                            .select("a[class=emcodeItem item-link]").attr("title");
                    // 評價人數(shù)
                    String productStatus = element.select("p[class=item-comment-dispatching]")
                            .select("a[class=comment]").text();
                    // 經(jīng)營品牌
                    String product = element.select("p[class=item-shop]")
                            .text();
                    // 將這些信息存入mogondb中   
                    DB  db =  mongo.getDB("taobao");
                    DBCollection emp = db.getCollection("productinfo");
                    DBObject obj = new BasicDBObject();
                    obj.put("productTitle", productTitle);
                    obj.put("producthref", producthref);
                    obj.put("productStatus", productStatus);
                    obj.put("product", product);
                    obj.put("price", price);
                    emp.insert(obj);
                    // 這里我也糾結(jié)了好久要不要關(guān),如果關(guān)了就會報錯 所以最后就沒關(guān)了如果各位有好的解決方案 記得告訴我O(∩_∩)O
                    //mongo.close(); 
                }
                // 這里是獲取它的下一頁榜聂,然后將下一頁的連接加入到redis隊列當中
               int page = Integer.parseInt(url.substring(url.lastIndexOf("=")+1))+1;
               String redisToVisit = url.substring(0, url.lastIndexOf("=")+1)+page;
               if(page >5){
                   flag = false;
                   return;
               }
                /**
                 * 在此同步塊中主要進行提取出來的url的寫操作搞疗,必須是同步操作,保證一個同
                 * 一時間只有一個線程在對Redis數(shù)據(jù)庫進行寫操作须肆。
                 */
                synchronized(lock){
                    // 加入到redis隊列中
                    RedisQueue.addToVisit(redisToVisit);
                }
            } else {
                //在改變線程計數(shù)器的值的時候必須保證線程的同步性
                synchronized (lock) {
                    //等待線程數(shù)的計數(shù)器的計數(shù)器減1
                    threads--;
                    //如果仍然有其他線程在活動匿乃,則通知此線程進行等待
                    if (threads > 0) {
                        /*調(diào)用線程的wait方法會將此線程掛起,直到有其他線程調(diào)用notify\
                        notifyAll將此線程進行喚醒*/
                        wait();
                        threads++;
                    } else {
                        //如果其他的線程都在等待豌汇,說明待抓取隊列已空幢炸,則通知所有線程進行退出
                        notifyAll();
                        return;
                    }
                }
            }
        }
    }

    public void run() {
        //雖然run方法不能拋出異常,但是可以在run方法中進行try拒贱,catch
        try {
            parseToVisitUrltoRedis();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  • 主函數(shù)
package com.xdl.redisUtil;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;

import com.mongodb.MongoClient;

public class MultithreadCrawler {

    /**
     * @throws Exception 
     * @throws InterruptedException 
     * @Title: main
     * @Description: TODO(這里用一句話描述這個方法的作用)
     * @param @param args    參數(shù)
     * @return void 返回類型
     * @author  liangchu
     * @date 2018-1-6 下午12:19:53 
     * @throws
     */
    public static void main(String[] args) throws  Exception {
        
        //拿到種子鏈接 這里主要從 手機 服飾 電器 食品 這幾個大的方面來抓取
        List<String> strings = new ArrayList<String>();
        strings.add("手機");
        strings.add("男裝");
        strings.add("女裝");
        strings.add("電腦");
        strings.add("相機");
        strings.add("食品");
        //將種子鏈接寫進redis數(shù)據(jù)庫的待抓取列表
        for (String url : strings) {
            RedisQueue.addToVisit("http://search.gome.com.cn/search?question="+url+"&searchType=goods&page=1");
        }
        //創(chuàng)建一個收集線程的列表
        List<Thread> threadList = new ArrayList<Thread>();
        //創(chuàng)建線程的個數(shù)
        int threadNum = 1;
        MongoClient mongo = new MongoClient("127.0.0.1", 27017);
        RunThread run = new RunThread();
        run.setThreads(threadNum,mongo);
        //創(chuàng)建5個線程宛徊,并對其進行收集
        for (int i = 0; i < threadNum; i++) {
            Thread thread = new Thread(run);
            thread.start();
            threadList.add(thread);
        }
        //main線程需要等待所有子線程退出
        while (threadList.size() > 0) {
            Thread child = threadList.remove(0);
            child.join();
        }
    }   
}

商品信息列表.png

總結(jié)

不得不說加入了redis隊列和mongodb存儲數(shù)據(jù) 效率簡直要起飛了佛嬉。15s不到就抓了1200條商品信息。因為有了上次的教訓不敢抓得太久闸天,所以只抓取了1200條巷燥。如果有不怕封的小伙伴可以試試,當然后果是自負号枕。O(∩∩)O,終于弄完了整整一天陨享。下次加入quartz定時任務葱淳,這樣獲取股票,天氣抛姑,航班什么的都可以獲取實時的了赞厕。如果有需求的小伙伴可以留言,有時間一定完成定硝。good night!!(*^_^*) 嘻嘻

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末皿桑,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子蔬啡,更是在濱河造成了極大的恐慌诲侮,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,378評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件箱蟆,死亡現(xiàn)場離奇詭異沟绪,居然都是意外死亡,警方通過查閱死者的電腦和手機空猜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,970評論 3 399
  • 文/潘曉璐 我一進店門绽慈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人辈毯,你說我怎么就攤上這事坝疼。” “怎么了谆沃?”我有些...
    開封第一講書人閱讀 168,983評論 0 362
  • 文/不壞的土叔 我叫張陵钝凶,是天一觀的道長。 經(jīng)常有香客問我管毙,道長腿椎,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,938評論 1 299
  • 正文 為了忘掉前任夭咬,我火速辦了婚禮啃炸,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘卓舵。我一直安慰自己南用,他們只是感情好,可當我...
    茶點故事閱讀 68,955評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著裹虫,像睡著了一般肿嘲。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上筑公,一...
    開封第一講書人閱讀 52,549評論 1 312
  • 那天雳窟,我揣著相機與錄音,去河邊找鬼匣屡。 笑死封救,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的捣作。 我是一名探鬼主播誉结,決...
    沈念sama閱讀 41,063評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼券躁!你這毒婦竟也來了惩坑?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,991評論 0 277
  • 序言:老撾萬榮一對情侶失蹤也拜,失蹤者是張志新(化名)和其女友劉穎以舒,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體搪泳,經(jīng)...
    沈念sama閱讀 46,522評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡稀轨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,604評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了岸军。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片奋刽。...
    茶點故事閱讀 40,742評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖艰赞,靈堂內(nèi)的尸體忽然破棺而出佣谐,到底是詐尸還是另有隱情,我是刑警寧澤方妖,帶...
    沈念sama閱讀 36,413評論 5 351
  • 正文 年R本政府宣布狭魂,位于F島的核電站,受9級特大地震影響党觅,放射性物質(zhì)發(fā)生泄漏雌澄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,094評論 3 335
  • 文/蒙蒙 一杯瞻、第九天 我趴在偏房一處隱蔽的房頂上張望镐牺。 院中可真熱鬧,春花似錦魁莉、人聲如沸睬涧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,572評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽畦浓。三九已至痹束,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間讶请,已是汗流浹背祷嘶。 一陣腳步聲響...
    開封第一講書人閱讀 33,671評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留夺溢,地道東北人。 一個月前我還...
    沈念sama閱讀 49,159評論 3 378
  • 正文 我出身青樓企垦,卻偏偏與公主長得像,于是被迫代替她去往敵國和親晒来。 傳聞我的和親對象是個殘疾皇子钞诡,可洞房花燭夜當晚...
    茶點故事閱讀 45,747評論 2 361

推薦閱讀更多精彩內(nèi)容