pyspider process和result部分源碼分析

終于弄清楚霞篡,pyspider為什么重寫on_result之后闷祥,調(diào)試的時候可以把數(shù)據(jù)插入數(shù)據(jù)庫,而不重寫的時候不行。

這一篇文章主要是記錄process和result部分的內(nèi)容赁豆。之后會通過這些內(nèi)容改寫一下數(shù)據(jù)庫贝或。

def run(self):
        '''Run loop'''
        logger.info("processor starting...")

        while not self._quit:
            try:
                task, response = self.inqueue.get(timeout=1)
                self.on_task(task, response)
                self._exceptions = 0
            except Queue.Empty as e:
                continue
            except KeyboardInterrupt:
                break
            except Exception as e:
                logger.exception(e)
                self._exceptions += 1
                if self._exceptions > self.EXCEPTION_LIMIT:
                    break
                continue

        logger.info("processor exiting...")

process進(jìn)程通過run開始蹄溉,這段代碼通過一個while循環(huán)屎暇,監(jiān)聽inqueue隊列,然后把task,response在on_task運(yùn)行况脆。inqueue隊列是processor2result隊列饭宾,如果你用消息隊列的話,你可以看到消息隊列里面有這個隊列格了。

def on_task(self, task, response):
        '''Deal one task'''
        start_time = time.time()
        response = rebuild_response(response)

        try:
            assert 'taskid' in task, 'need taskid in task'
            project = task['project']
            updatetime = task.get('project_updatetime', None)
            md5sum = task.get('project_md5sum', None)
            project_data = self.project_manager.get(project, updatetime, md5sum)
            assert project_data, "no such project!"
            if project_data.get('exception'):
                ret = ProcessorResult(logs=(project_data.get('exception_log'), ),
                                      exception=project_data['exception'])
            else:
                #注意這里把爬蟲實例執(zhí)行并且把結(jié)果返回給隊列看铆,最后返回一個processresult對象
                ret = project_data['instance'].run_task(
                    project_data['module'], task, response)
        except Exception as e:
            logstr = traceback.format_exc()
            ret = ProcessorResult(logs=(logstr, ), exception=e)
        process_time = time.time() - start_time

        if not ret.extinfo.get('not_send_status', False):
            if ret.exception:
                track_headers = dict(response.headers)
            else:
                track_headers = {}
                for name in ('etag', 'last-modified'):
                    if name not in response.headers:
                        continue
                    track_headers[name] = response.headers[name]

            status_pack = {
                'taskid': task['taskid'],
                'project': task['project'],
                'url': task.get('url'),
                'track': {
                    'fetch': {
                        'ok': response.isok(),
                        'redirect_url': response.url if response.url != response.orig_url else None,
                        'time': response.time,
                        'error': response.error,
                        'status_code': response.status_code,
                        'encoding': getattr(response, '_encoding', None),
                        'headers': track_headers,
                        'content': response.text[:500] if ret.exception else None,
                    },
                    'process': {
                        'ok': not ret.exception,
                        'time': process_time,
                        'follows': len(ret.follows),
                        'result': (
                            None if ret.result is None
                            else utils.text(ret.result)[:self.RESULT_RESULT_LIMIT]
                        ),
                        'logs': ret.logstr()[-self.RESULT_LOGS_LIMIT:],
                        'exception': ret.exception,
                    },
                    'save': ret.save,
                },
            }
            if 'schedule' in task:
                status_pack['schedule'] = task['schedule']

            # FIXME: unicode_obj should used in scheduler before store to database
            # it's used here for performance.
            #把status信息放入status_queue,這個隊列還不知道做什么用
            self.status_queue.put(utils.unicode_obj(status_pack))

        # FIXME: unicode_obj should used in scheduler before store to database
        # it's used here for performance.
        #如果有新的url放入這個隊列
        if ret.follows:
            for each in (ret.follows[x:x + 1000] for x in range(0, len(ret.follows), 1000)):
                self.newtask_queue.put([utils.unicode_obj(newtask) for newtask in each])

on_task比較長。
先看project_manager這個方法盛末,獲得project的信息弹惦,這里比較主要的是instance否淤,是一個爬蟲的handlerbase實例,可以調(diào)用run_task()方法棠隐,這塊怎么來的以后再說吧石抡,沒時間了

project_data = self.project_manager.get(project, updatetime, md5sum)

然后是往隊列里面扔?xùn)|西,status_queue不知道干啥助泽,newtask_queue應(yīng)該是新的url隊列啰扛,另外ret是processresult對象,以后看

 self.status_queue.put(utils.unicode_obj(status_pack))
self.newtask_queue.put([utils.unicode_obj(newtask) for newtask in each])

run_task方法

def run_task(self, module, task, response):
        """
        Processing the task, catching exceptions and logs, return a `ProcessorResult` object
        """
        self.logger = logger = module.logger
        result = None
        exception = None
        stdout = sys.stdout
        self.task = task
        if isinstance(response, dict):
            response = rebuild_response(response)
        self.response = response
        self.save = (task.get('track') or {}).get('save', {})

        try:
            if self.__env__.get('enable_stdout_capture', True):
                sys.stdout = ListO(module.log_buffer)
            self._reset()
            result = self._run_task(task, response)
            if inspect.isgenerator(result):
                for r in result:
                    self._run_func(self.on_result, r, response, task)
            else:
                self._run_func(self.on_result, result, response, task)
        except Exception as e:
            logger.exception(e)
            exception = e
        finally:
            follows = self._follows
            messages = self._messages
            logs = list(module.log_buffer)
            extinfo = self._extinfo
            save = self.save

            sys.stdout = stdout
            self.task = None
            self.response = None
            self.save = None

        module.log_buffer[:] = []
        return ProcessorResult(result, follows, messages, logs, exception, extinfo, save)

這里面有調(diào)用_run_func其實就是調(diào)用callback的那個函數(shù)嗡贺,這里是執(zhí)行采集的地方隐解,

result = self._run_task(task, response)
            if inspect.isgenerator(result):
                for r in result:
                    self._run_func(self.on_result, r, response, task)
            else:
                self._run_func(self.on_result, result, response, task)

這里執(zhí)行方法獲得結(jié)果,如果是return的數(shù)據(jù)诫睬,執(zhí)行on_result方法煞茫。

 def on_result(self, result):
        """Receiving returns from other callback, override me."""
        if not result:
            return
        assert self.task, "on_result can't outside a callback."
        if self.is_debugger():
            pprint(result)
        if self.__env__.get('result_queue'):
            self.__env__['result_queue'].put((self.task, result))

on_result把結(jié)果放在result_queue里面

大概process就干了這些

 def run(self):
        '''Run loop'''
        logger.info("result_worker starting...")

        while not self._quit:
            try:
                task, result = self.inqueue.get(timeout=1)
                self.on_result(task, result)
            except Queue.Empty as e:
                continue
            except KeyboardInterrupt:
                break
            except AssertionError as e:
                logger.error(e)
                continue
            except Exception as e:
                logger.exception(e)
                continue

        logger.info("result_worker exiting...")

result開始監(jiān)聽隊列執(zhí)行on_result

  def on_result(self, task, result):
        '''Called every result'''
        if not result:
            return
        if 'taskid' in task and 'project' in task and 'url' in task:
            logger.info('result %s:%s %s -> %.30r' % (
                task['project'], task['taskid'], task['url'], result))
            return self.resultdb.save(
                project=task['project'],
                taskid=task['taskid'],
                url=task['url'],
                result=result
            )
        else:
            logger.warning('result UNKNOW -> %.30r' % result)
            return

存入數(shù)據(jù)庫,用project,taskid,url的表摄凡,就是默認(rèn)的那個

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末续徽,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子亲澡,更是在濱河造成了極大的恐慌炸宵,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谷扣,死亡現(xiàn)場離奇詭異,居然都是意外死亡捎琐,警方通過查閱死者的電腦和手機(jī)会涎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來瑞凑,“玉大人末秃,你說我怎么就攤上這事∽延” “怎么了练慕?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長技掏。 經(jīng)常有香客問我铃将,道長,這世上最難降的妖魔是什么哑梳? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任劲阎,我火速辦了婚禮,結(jié)果婚禮上鸠真,老公的妹妹穿的比我還像新娘悯仙。我一直安慰自己龄毡,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布锡垄。 她就那樣靜靜地躺著沦零,像睡著了一般。 火紅的嫁衣襯著肌膚如雪货岭。 梳的紋絲不亂的頭發(fā)上路操,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天,我揣著相機(jī)與錄音茴她,去河邊找鬼寻拂。 笑死,一個胖子當(dāng)著我的面吹牛丈牢,可吹牛的內(nèi)容都是我干的祭钉。 我是一名探鬼主播,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼己沛,長吁一口氣:“原來是場噩夢啊……” “哼慌核!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起申尼,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤垮卓,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后师幕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體粟按,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年霹粥,在試婚紗的時候發(fā)現(xiàn)自己被綠了灭将。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡后控,死狀恐怖庙曙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情浩淘,我是刑警寧澤捌朴,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站张抄,受9級特大地震影響砂蔽,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜署惯,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一察皇、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦什荣、人聲如沸矾缓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽嗜闻。三九已至,卻和暖如春桅锄,著一層夾襖步出監(jiān)牢的瞬間琉雳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工友瘤, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留翠肘,地道東北人。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓辫秧,卻偏偏與公主長得像束倍,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子盟戏,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理绪妹,服務(wù)發(fā)現(xiàn),斷路器柿究,智...
    卡卡羅2017閱讀 134,671評論 18 139
  • 昨天的文讓大家惡心了一把蝇摸,今天我做點補(bǔ)償哈~~科普一個真正的萌物——矮種馬婶肩。 馬中柯基,絕對萌神 看到這個標(biāo)題是不...
    肖爺_族長閱讀 8,936評論 4 15
  • 使用Javascript可以方便獲得頁面的參數(shù)信息貌夕,常用的幾種如下: 設(shè)置或獲取對象指定的文件名或路徑 設(shè)置或獲取...
    duJing閱讀 305評論 0 0
  • 剛才吃早餐時蜂嗽,想出了10篇短篇小說,以下是題目殃恒,望您斧正植旧! 1.《四歲毛孩心機(jī)重,嫌自己壓力大离唐,竟將壓力甩給了親哥...
    叔孫伯閱讀 597評論 0 6