Celery部署爬蟲(三)

celery

今天就來點比較有意思的東西

面前兩篇Celery部署爬蟲(一) Celery部署爬蟲(二) 充其量就是 Celery 的入門級弯汰,接下來就深入編寫 Celery 框架指厌,讓 Celery 更 加健壯楣嘁。


image

首先是定時任務型将,上文的編寫的定時任務是在 config 文件里進行配置的钞速,其實在Celery中提供了一系列的裝飾器,比如前面說的 @app.task 等等藕夫,來看看如何使用裝飾器來實現(xiàn) app 實例中的定時任務

# 定時任務在文件中的寫法
# -*- coding:utf-8 -*-
from Celery import app
from celery.schedules import crontab
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

# 通過beat組件周期性將任務發(fā)送給woker執(zhí)行孽糖。在示例中 新建文件period_task.py 并添加任務到配置文件中
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10.0, add.s(1,3), name='1+3=') # 每10秒執(zhí)行add
    sender.add_periodic_task(
        crontab(hour=16, minute=56, day_of_week=1),      #每周一下午四點五十六執(zhí)行sayhai
        sayhi.s('wd'),name='say_hi'
    )

也就是說通過方法調用的形式來執(zhí)行定時任務

一般而言,這種格式需要在配置 Beat 中無法達到邏輯需求的時候使用毅贮,比較少見办悟。

如果在 config 配置文件中編寫了定時任務并希望 Celery Worker 服務進程的同時啟動 Beat 模塊,需要加入 -B 參數(shù)

celery -A haha worker -B -l info

關于路由指定的隊列

一般是一個 task 對應一個隊列 各個隊列相互獨立 可以執(zhí)行不同的操作 比如taskA執(zhí)行即時任務滩褥,taskB執(zhí)行定時 延時任務等等 需要的是在 config 文件里聲明

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_add",Exchange("for_add"),routing_key="for_add"),
Queue("for_max",Exchange("for_max"),routing_key="for_max")
)
# 路由
CELERY_ROUTES = {
'tasks.add':{"queue":"for_add","routing_key":"for_add"},
'tasks.max':{"queue":"for_max","routing_key":"for_max"}
}

# 路由指明隊列和routing_key
# routing_key認證所在的隊列 用redis名稱要一致(對于redis來說)

指定 CELERY_QUEUES 和 CELERY_ROUTES 選擇要執(zhí)行的任務,這個玩意在分布式任務上還是很方便的病蛉,下面說。

自定義Task

如果在Celery中想自定義 Task 基類,那就需要繼承 Task 模塊

# coding=utf-8
from Celery import app
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from celery import Task
logger = get_task_logger(__name__)

class demotask(Task): # 這是三種運行的狀態(tài)

    def on_success(self, retval, task_id, args, kwargs):   # 任務成功執(zhí)行
        logger.info('task id:{} , arg:{} , successful !'.format(task_id,args))


    def on_failure(self, exc, task_id, args, kwargs, einfo):  #任務失敗執(zhí)行
        logger.info('task id:{} , arg:{} , failed ! error : {}' .format(task_id,args,exc))


    def on_retry(self, exc, task_id, args, kwargs, einfo):    #任務重試執(zhí)行
        logger.info('task id:{} , arg:{} , retry !  info: {}'.format(task_id, args, exc))


@app.task(base=demotask) # 需要繼承這個基類
def func1(x,y):
    try:
        a=[]
        a[10] =1
    except Exception as e:
        logger.info(e)
    return x+y

@app.task(base=demotask)
def func2(name):
    a=[]
    a[10] =1
    return 'hi {}'.format(name)

@app.task(base=demotask)
def func3(a,b):
    return 'result = {} '.format(a+b)

這算是復寫了 Task 模塊铺然,定義了幾種狀態(tài)俗孝,這個可以根據(jù)需求,只要在 task 中聲明要繼承的自定義類即可魄健。

如果想要深入了解 Task 基類赋铝,可以查看 Celery 源碼,這就不廢話了沽瘦。

PS:當使用多個裝飾器裝飾任務函數(shù)時革骨,確保 task 裝飾器最后應用(在python中,這意味它必須在第一個位置)

@app.task
@decorator2
@decorator1
def add(x, y):
    return x + y

另外在Celery中還提供了自帶的 logger 日志模塊析恋,可以這么用

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y
image

Celery的可視化監(jiān)控

Celery Flower 是一款Celery官方推薦使用的監(jiān)控工具

能夠實時監(jiān)控 celery 的 Worker Tasks Borker Result等服務

安裝也是極其簡單

pip install flower

運行服務

celery flower --broker=redis://localhost:6379 --address=127.0.0.1 --port=5555

或者

flower -A proj --broker=redis://localhost:6379

這樣就可以訪問 Flower Web 了苛蒲,瀏覽器訪問 http: //127.0.0.1:5555 查看運行的服務

這樣一來,在 Celery部署爬蟲(一)的百度百科的爬蟲就可以重新編寫了

Config.py文件

# config.py
from __future__ import absolute_import
# broker
import datetime
BROKER_URL = 'redis://127.0.0.1:6379/0'
# backen
CELERY_RESULT_BACKEND = 'mongodb://127.0.0.1:27017'
# 導入任務绿满,如tasks.py
CELERY_IMPORTS = ('task', )
# 列化任務載荷的默認的序列化方式
CELERY_TASK_SERIALIZER = 'json'
# 結果序列化方式
CELERY_RESULT_SERIALIZER = 'json'

CELERY_ACCEPT_CONTENT = ['json']

CELERY_TIMEZONE='Asia/Shanghai'    # 指定時區(qū)臂外,不指定默認為 'UTC'
# CELERY_TIMEZONE='UTC'
# CELERY_ENABLE_UTC = TrueE='UTC'# CELERY_ENABLE_UTC = True

Celery.py文件

# Celery.py
# coding=utf-8
from __future__ import absolute_import
from celery import Celery

app = Celery("STQ")
# 加載配置模塊
app.config_from_object('config')

if __name__ == '__main__':
      app.start()

Task.py 文件

# coding=utf-8
from __future__ import absolute_import
import re
import requests
from lxml import etree
import urllib.request
from Celery import app

headers ={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"}


def get_html(url):
    r = urllib.request.Request(url,headers=headers)
    res = urllib.request.urlopen(r)
    return res.read().decode("utf-8")

@app.task
def parse(html):
    ret = re.compile(r'<title>(.*?)</title>')
    title = re.findall(ret,html)
    return title

Run_task.py 文件

from __future__ import absolute_import
from tasks import get_html,parse

with open("url.txt","r") as f:
    for data in f.readlines():
        url = data.strip('\n')
        html = get_html(url)
        result = parse.delay(html)

黑窗口鍵入

celery -A Celery worker -l info -P gevent -c 10

然后發(fā)布任務讓它從消息隊列中消費

python run_task.py

這樣一來。所有的參數(shù)配置都可以在 config 文件中進行配置了

關于路由 隊列的使用

CELERY_QUEUES 設置一個指定 routing_key 的隊列

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_add",Exchange("for_add"),routing_key="for_add"),
)

CELERY_ROUTES 設置路由

通過 routing_key 來關聯(lián)上面的 CELERY_QUEUES喇颁,所以這里的 routing_key 需要和上面參數(shù)的一致漏健。

# 路由
CELERY_ROUTES = {
'tasks.add':{"queue":"for_add","routing_key":"for_add"},
}

tasks.add 表示在 tasks.py 的函數(shù)文件的 add 方法屬于哪個路由,而這個路由關聯(lián)著哪個隊列橘霎。也就是說聲明哪個任務放入哪個隊列

如果開啟了這個路由蔫浆,那么此任務就會被執(zhí)行,反之則不會姐叁。

那么如果沒有指定隊列的方法要怎么執(zhí)行呢瓦盛?

celery -A Celery worker -l info -n add -Q for_add -P gevent -c 10

可以在命令中去指定某個任務對應某個隊列,所以外潜,發(fā)布出去的任務會通過路由關聯(lián)到指定隊列原环,不同 worker 會從不同的隊列來消費任務。

而且每個隊列都是相互獨立的处窥,這樣一來嘱吗,每個任務之間就不會相互影響了,即時任務滔驾,定時任務就可以有明確的分工谒麦。

image

分布式集群

接下來就該說說 Celery 分布式,基本的架構就是通過中間件來共享消息隊列哆致。

事實上分布式架構的核心無非就是機器間的通信绕德,而一般的分布式爬蟲架構都會體現(xiàn)在共享數(shù)據(jù)庫隊列,Redis 和 RabbitMQ 就是典型的消息隊列摊阀。

比如通過redis來配置共享中間件

 redis://ip:6379/0

比如現(xiàn)在有 Master, Slave1, Slave2三臺主機坛梁,使用Master編寫主配置文件,celery主文件炸客,任務和調度文件坟乾。

然后拷貝兩份分別放到兩臺Slave里去,在黑窗口中分別鍵入

celery -A Celery worker -l info 

可以看到,在終端中服務器之間已經(jīng)相互連通


image

然后發(fā)布任務

python run_haha.py

本來監(jiān)聽就監(jiān)聽著 redis隊列 的機器就會進行任務消費


image

需要知道的一點就是,運行隊列里的任務的時候,每個機器真正調用的是自身任務文件里面的任務函數(shù)枝缔,只要該函數(shù)存在于隊列之中。

然后就和一般的分布式?jīng)]啥兩樣了蚊惯。

好了愿卸,今天就這樣了

但是,我在想截型,是不是應該來點刺激的

歡迎轉載趴荸,但要聲明出處,不然我順著網(wǎng)線過去就是一拳宦焦。
個人技術博客:http://www.gzky.live

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末发钝,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子波闹,更是在濱河造成了極大的恐慌酝豪,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件精堕,死亡現(xiàn)場離奇詭異孵淘,居然都是意外死亡,警方通過查閱死者的電腦和手機歹篓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進店門瘫证,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人庄撮,你說我怎么就攤上這事背捌。” “怎么了重窟?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵载萌,是天一觀的道長惧财。 經(jīng)常有香客問我巡扇,道長,這世上最難降的妖魔是什么垮衷? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任厅翔,我火速辦了婚禮,結果婚禮上搀突,老公的妹妹穿的比我還像新娘刀闷。我一直安慰自己,他們只是感情好,可當我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布甸昏。 她就那樣靜靜地躺著顽分,像睡著了一般。 火紅的嫁衣襯著肌膚如雪施蜜。 梳的紋絲不亂的頭發(fā)上卒蘸,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天,我揣著相機與錄音翻默,去河邊找鬼缸沃。 笑死,一個胖子當著我的面吹牛修械,可吹牛的內容都是我干的趾牧。 我是一名探鬼主播,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼肯污,長吁一口氣:“原來是場噩夢啊……” “哼翘单!你這毒婦竟也來了?” 一聲冷哼從身側響起蹦渣,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤县恕,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后剂桥,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體忠烛,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年权逗,在試婚紗的時候發(fā)現(xiàn)自己被綠了美尸。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡斟薇,死狀恐怖师坎,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情堪滨,我是刑警寧澤胯陋,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布,位于F島的核電站袱箱,受9級特大地震影響遏乔,放射性物質發(fā)生泄漏。R本人自食惡果不足惜发笔,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一盟萨、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧了讨,春花似錦捻激、人聲如沸制轰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽垃杖。三九已至,卻和暖如春丈屹,著一層夾襖步出監(jiān)牢的瞬間缩滨,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工泉瞻, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留脉漏,地道東北人。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓袖牙,卻偏偏與公主長得像侧巨,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子鞭达,可洞房花燭夜當晚...
    茶點故事閱讀 43,724評論 2 351