緊接著上回繼續(xù)翻譯吧狈定。有關huey這個python
寫的的輕量級消息隊列
個人才疏學淺粗梭,可能很多英文都要借助翻譯軟件巷挥,但盡量做到能夠易于理解烹卒。
教程指導
這個文檔的目的是為了幫助人盡可能快速使用huey
章咧。
簡單入門
使用huey需要注意有如下三個主要的組成(或者過程):
- 生產(chǎn)者倦西,例如
web
應用等。 - 消費者赁严,運行放置在消息隊列中的任務(
jobs
)扰柠。 - 隊列,存放任務疼约。例如
Redis
等卤档。
底下的截圖展示了上述三個不同的過程。左邊是生產(chǎn)者:一個簡單的程序詢問用戶要輸入多少的“豆子”程剥。右上角消費者一直運行劝枣,它正在做“計算”,舉例如圖中所示打印了有多少“豆子”被計數(shù)织鲸。右下角是是一個隊列舔腾,圖中使用的是Redis
。我們可以看到任務被添加(LPUSH
)入隊列和從數(shù)據(jù)庫中讀取(BRPOP
)任務搂擦。
自我嘗試
假定你安裝了huey
稳诚,讓我們來看一下代碼例子。
第一步先配置隊列瀑踢。消費者需要指定一個Huey
實例扳还,這代表了使用的后端類型。
# config.py
from huey import RedisHuey
huey = RedisHuey()
huey
對象封裝了隊列橱夭。隊列負責存儲和取回消息氨距,你的應用程序代碼使用huey
實例來聯(lián)系函數(shù)調(diào)用。我們來看一下怎么使用huey
來連接一個計算豆子的函數(shù):
# tasks.py
from config import huey # import the huey we instantiated in config.py
@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)
上述代碼展現(xiàn)了如何用API
定義最終被消費者執(zhí)行的“任務”——用task()
裝飾器簡單裝飾你想要讓消費者運行的函數(shù)任務棘劣。而當它被調(diào)用時候俏让,主進程將立即返回而不是進入函數(shù)內(nèi)部。在消費者進程中會看到這個新消息并運行這個函數(shù)。
我們的主程序很簡單舆驶。它導入了配置和任務——這確保了在我們根據(jù)指定的配置運行消費者時橱健,所有任務都會被加載入內(nèi)存而钞。
# main.py
from config import huey # import our "huey" object
from tasks import count_beans # import our task
if __name__ == '__main__':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print('Enqueued job to count %s beans' % beans)
啟動腳本需要依次進行以下步驟:
- 確保本地運行
Redis
- 確保安裝了
huey
- 啟動消費者:
huey_consumer.py main.huey
(注意是"main.huey
"而不是"config.huey
"沙廉,這里提示一下huey_consumer.py
需要自己從huey
腳本的bin
下拷貝到當前的路徑,這樣才能用該命令來啟動臼节。) - 運行主程序:
python main.py
獲取結果
上面的例子實現(xiàn)了一個“發(fā)送并且忘記”的方法撬陵,但是如果你的應用程序需要對任務的結果做些什么呢?要從你的任務中獲取結果网缝,只需返回任務函數(shù)中的值即可巨税。
如果你得到了存儲結果但不使用它們,那么可能會浪費大量空間粉臊,特別是如果你的任務量很高草添。要禁用存儲功能,可以在初始化
Huey
實例時返回None
或指定result_store = False
扼仲。
為了更好地說明獲取結果的代碼远寸,我們還將修改tasks.py
模塊以返回一個字符串,而不是打印結果到標準輸出窗口:
from config import huey
@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)
return 'Counted %s beans' % num
我們準備向消費者輸入大量任務屠凶。不是簡單地執(zhí)行主程序驰后,我們這回將啟動一個解釋器并運行以下操作:
>>> from main import count_beans
>>> res = count_beans(100)
>>> print(res) # What is "res" ?
<huey.api.TaskResultWrapper object at 0xb7471a4c>
>>> res() # Get the result of this task
'Counted 100 beans'
按照與上一個例子相同的布局,下面是三個主要工作流程的截圖:
- 左邊矗愧,解釋器生成任務并詢問結果
- 右上角灶芝, 消費者執(zhí)行任務并存儲結果
- 右下角是
Redis
數(shù)據(jù)庫,我們能夠看到它儲存結果然后在我們?nèi)』財?shù)據(jù)后刪除它們
延遲執(zhí)行任務
將某個特定任務排入任意時間來執(zhí)行通常很用的唉韭,例如夜涕,標記在某個時間發(fā)布的博客條目。
這在huey
中很容易完成属愤。返回上一節(jié)描述的在解釋器中執(zhí)行的代碼钠乏,讓我們安排一個一分鐘后執(zhí)行的數(shù)豆子的任務,然后看看huey
怎么處理它春塌。執(zhí)行以下代碼:
>>> import datetime
>>> res = count_beans.schedule(args=(100,), delay=60)
>>> print(res)
<huey.api.TaskResultWrapper object at 0xb72915ec>
>>> res() # This returns None, no data is ready.
>>> res() # A couple seconds later.
>>> res(blocking=True) # OK, let's just block until its ready
'Counted 100 beans'
你也可以使用datetime
類型來指定“預計到達時間”:
>>> in_a_minute = datetime.datetime.now() + datetime.timedelta(seconds=60)
>>> res = count_beans.schedule(args=(100,), eta=in_a_minute)
默認情況下晓避,
Huey
實例以UTC
時間運行。這對計劃任務的影響是只壳,當使用本地的時間時俏拱,它們必須與datetime.utcnow()
相關聯(lián)。在上述代碼中我們不采用
utcnow()
的原因是schedule()
包含默認值為True
的第三個參數(shù)叫做convert_utc
吼句。所以在上面的代碼中锅必,datetime
在被發(fā)送到隊列之前從本地時間轉換為了UTC
。當你想要以本地時間模式運行(-o),你需要總是指定
schedule()
的第三個參數(shù)convert_utc=False
搞隐,包括在指定delay
參數(shù)時驹愚。
瞧redis
的輸出,我們看到以下(簡化內(nèi)容):
+1325563365.910640 "LPUSH" count_beans(100)
+1325563365.911912 "BRPOP" wait for next job
+1325563365.912435 "HSET" store 'Counted 100 beans'
+1325563366.393236 "HGET" retrieve result from task
+1325563366.393464 "HDEL" delete result after reading
截圖也展示了相同的內(nèi)容:
重試失敗任務
Huey
支持有限次重試失敗任務劣纲。如果在執(zhí)行任務期間引發(fā)了異常逢捺,但是你已經(jīng)指定了retries
參數(shù),則任務將重新入隊并再次嘗試癞季,直到指定的重試次數(shù)劫瞳。
如下顯示的一個任務,將重試3次绷柒,每次都會拋出異常:
# tasks.py
from config import huey
@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)
return 'Counted %s beans' % num
@huey.task(retries=3)
def try_thrice():
print('trying....')
raise Exception('nope')
控制臺輸出顯示我們的任務在解釋器會話中被調(diào)用志于,然后當消費者接收然后執(zhí)行它時,我們看到它失敗但進行了重試:
在重試之間等待一定的時間間隔通常是個好的主意废睦。你可以指定重試之間的delay
參數(shù)(以秒為單位)伺绽,這會是任務重試之前等待的最短時間。這里我們修改代碼使它包含delay
參數(shù)嗜湃,并打印當前時間來顯示它的工作奈应。
# tasks.py
from datetime import datetime
from config import huey
@huey.task(retries=3, retry_delay=10)
def try_thrice():
print('trying....%s' % datetime.now())
raise Exception('nope')
下面的控制臺輸出顯示正在重試的任務,同時在重試過程中净蚤,我還在添加了“數(shù)豆子”的任務——處于正常執(zhí)行狀態(tài)钥组。
定期任務
huey
支持的另一個使用模式是定期執(zhí)行任務。依照crontab
行為今瀑,同時遵循類似的語法程梦。定期執(zhí)行的任務,不應返回有意義的結果橘荠,也不應接受任何參數(shù)屿附。
讓我們添加一個每分鐘打印一次字符竄的新任務——我們將使用它來測試消費者是否正在按計劃執(zhí)行任務。
# tasks.py
from datetime import datetime
from huey import crontab
from config import huey
@huey.periodic_task(crontab(minute='*'))
def print_time():
print(datetime.now())
現(xiàn)在哥童,當我們運行消費者時挺份,它將每分鐘開始打印時間:
取消或暫停任務
huey
可以停止任務執(zhí)行。這適用于正常任務贮懈,延遲任務和定期任務匀泊。
為了“revoke
(撤銷)”任務,你需要在實例化Huey
對象時指定一個result_store
參數(shù)朵你。
如果消費者沒有開始執(zhí)行任務各聘,你可以取消正常的任務:
# count some beans
res = count_beans(10000000)
# provided the command has not started executing yet, you can
# cancel it by calling revoke() on the TaskResultWrapper object
res.revoke()
這同樣適用于延遲任務:
res = count_beans.schedule(args=(100000,), eta=in_the_future)
res.revoke()
# and you can actually change your mind and restore it, provided
# it has not already been "skipped" by the consumer
res.restore()
要撤消給定任務的所有實例,請對任務本身使用revoke()
和restore()
方法:
count_beans.revoke()
assert count_beans.is_revoked() is True
res = count_beans(100)
assert res.is_revoked() is True
count_beans.restore()
assert count_beans.is_revoked() is False
取消或暫停定期任務
當我們開始處理定期任務時抡医,撤銷選項會更有意思躲因。
我們將使用打印時間代碼作為示例:
@huey.periodic_task(crontab(minute='*'))
def print_time():
print(datetime.now())
我們可以防止周期性的任務在下一個循環(huán)中執(zhí)行:
# only prevent it from running once
print_time.revoke(revoke_once=True)
由于上述任務每分鐘執(zhí)行一次,我們將看到輸出將跳過下一次的一分鐘,然后才恢復正常大脉。
我們也可以防止任務執(zhí)行直到指定時間:
# prevent printing time for 10 minutes
now = datetime.datetime.utcnow()
in_10 = now + datetime.timedelta(seconds=600)
print_time.revoke(revoke_until=in_10)
當指定
revoke_until
設置搞监,如果消費者以UTC
默認時間模式運行,本地時間需要關聯(lián)到datetime.utcnow()
镰矿。如果消費者以本地時間(-o參數(shù)指定)琐驴,則可以使用datetime.now()
。
最后衡怀,我們可以防止任務無限期地運行:
# will not print time until we call revoke() again with
# different parameters or restore the task
print_time.revoke()
assert print_time.is_revoked() is True
我們隨時可以恢復任務棍矛,它將會正常執(zhí)行:
print_time.restore()
查看更多
這總結了huey
的基本使用模式安疗。以下是有關API
其他方面的詳細信息的鏈接:
- Huey——負責協(xié)調(diào)可執(zhí)行任務和隊列后端
- Huey.task()——裝飾器來指示可執(zhí)行任務
- Huey.periodic_task() ——裝飾器以指示以周期性間隔執(zhí)行的任務
- TaskResultWrapper.get() ——從任務獲取返回值
- crontab() ——用于定義執(zhí)行周期性任務的間隔時間
結束
有一周混過去了抛杨,繼續(xù)啃爬蟲去了。荐类。怖现。。