簡介并安裝
huey, a little task queue.
輕量級異步任務(wù)隊列。
下載安裝huey。
$ pip install huey
下載安裝redis依賴(huey暫時只支持redis)坡疼。
$ pip install redis
利用huey定義并執(zhí)行一些任務(wù)時,可以分成這幾個文件。
- config.py: 定義使用huey的一些配置哑蔫,任務(wù)的redis存儲。
The first step is to configure your queue. The consumer needs to be pointed at a Huey
instance, which specifies which backend to use.
- task.py: 定義你需要執(zhí)行的一些異步任務(wù)弧呐。
- huey_consumer.py: 開啟huey consumer的入口(huey提供)闸迷。
- huey_main.py: 執(zhí)行異步任務(wù)。
config.py
實際上就是利用redis創(chuàng)建consumer所指向的huey實例俘枫,huey實際上包含了一個隊列queue腥沽,用來存儲和取出消息。
from huey import RedisHuey
huey = RedisHuey('base_app', host='127.0.0.1')
或者
from huey import RedisHuey
from redis import ConnectionPool
import settings
redis_pool = ConnectionPool(host=settings.REDIS_ADDRESS, port=settings.REDIS_PORT, db=0)
huey = RedisHuey('base_app', connection_pool=redis_pool)
task.py
利用config.py所創(chuàng)建的huey來修飾普通函數(shù)使之成為huey任務(wù)鸠蚪。
這樣就定義了一個最基本的異步任務(wù)今阳。(base_huey.py 及上述的 config.py)
from base.base_huey import huey
@huey.task()
def count_beans(num):
print('-- counted %s beans --' % num)
for n in range(num):
print(n)
return 'Counted %s beans' % num
huey_consumer.py
之前習慣把huey包里的huey_consumer.py文件直接拿出來到主目錄然后執(zhí)行,新版的的huey_consumer.py與舊版的稍微有點區(qū)別邓嘹。
新版本增加了一個consumer_options.py酣栈,用來定義封裝了一些consumer相關(guān)的配置和命令行解析的處理類;在舊版中汹押,這些都是直接定義在huey_consumer.py中矿筝。
查看OptionParserHandler
源碼可知,huey_consumer可以包含很多參數(shù)棚贾,主要分為三個group(Logging日志記錄, Workers任務(wù)worker相關(guān), Scheduler計劃任務(wù)相關(guān))窖维。
def get_option_parser(self):
parser = optparse.OptionParser('Usage: %prog [options] '
'path.to.huey_instance')
def add_group(name, description, options):
group = parser.add_option_group(name, description)
for abbrev, name, kwargs in options:
group.add_option(abbrev, name, **kwargs)
add_group('Logging', 'The following options pertain to logging.',
self.get_logging_options())
add_group('Workers', (
'By default huey uses a single worker thread. To specify a '
'different number of workers, or a different execution model (such'
' as multiple processes or greenlets), use the options below.'),
self.get_worker_options())
add_group('Scheduler', (
'By default Huey will run the scheduler once every second to check'
' for tasks scheduled in the future, or tasks set to run at '
'specfic intervals (periodic tasks). Use the options below to '
'configure the scheduler or to disable periodic task scheduling.'),
self.get_scheduler_options())
return parser
最常用的一些參數(shù):
- -l 指定huey異步任務(wù)執(zhí)行時的日志文件(也可以通過
ConsumerConfig
的setup_logger()
來定義logger)。 - -w 執(zhí)行器worker隊列的數(shù)量
- -k worker的類型(process, thread, greenlet妙痹,默認是thread)
- -d 輪詢隊列的最短時間間隔
huey_main.py
定義需要執(zhí)行huey任務(wù)的方法铸史。
from tasks.huey_task import count_beans
# base test
def test_1():
count_beans(10) # no block
count_beans.schedule(args=(5, ), delay=5) # delay 5s
res = count_beans.schedule(args=(5, ), delay=5)
# res.get(blocking=True)
res(blocking=True) # block
if __name__ == '__main__':
test_1()
print('end')
執(zhí)行腳本
- 開啟consumer輪詢:
python huey_consumer_new.py tasks.huey_task.huey -l logs/base_huey.log -w 1
tasks.task.huey即上述的task.py,在此時后綴名需要替換成 .huey怯伊。 - 執(zhí)行異步方法:
pyton huey_main.py
ps:官方文檔中是將 huey
實例和task任務(wù)都引入到main.py中琳轿。
main.py
from config import huey # import our "huey" object
from tasks import count_beans # import our task
if name == 'main':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print('Enqueued job to count %s beans' % beans)
To run these scripts, follow these steps:
1. Ensure you have [Redis](http://redis.io/) running locally
2. Ensure you have [installed huey](http://huey.readthedocs.io/en/latest/installation.html#installation)
3. Start the consumer: huey_consumer.py main.huey
(notice this is “main.huey” and not “config.huey”).
4. Run the main program: python main.py
---
#####huey task簡單api介紹
利用```@huey.task()```能來定義一些基本異步任務(wù),當然還有其他延時任務(wù),周期性任務(wù)等崭篡。
1. 延時執(zhí)行:下例展示了兩種延時執(zhí)行的方法:第一種時直接執(zhí)行延時時間n秒并傳入?yún)?shù)挪哄;第二種是指定了eta參數(shù),即estimated time of arrival琉闪,傳入未來的某個時間點迹炼,使其在計劃時間點執(zhí)行。
import datetime
from tasks.huey_task import count_beans
count_beans(3) # normal
count_beans.schedule(args=(3, ), delay=5) # delay 5s
in_a_minute = datetime.datetime.now() + datetime.timedelta(seconds=60)
count_beans.schedule(args=(100,), eta=in_a_minute)
2. 阻塞:利用block參數(shù)能夠使其阻塞颠毙。
res = count_beans(100)
res.get(blocking=True)
res(blocking=True) # block
3. 異常重試:當任務(wù)出現(xiàn)異常時進行retry斯入,并且可以指定重試延時時間。
from base.base_huey import huey
retry 3 times delay 5s
@huey.task(retries=3, retry_delay=5)
def try_reties_by_delay():
print('trying %s' % datetime.now())
raise Exception('try_reties_by_delay')
4. 周期性任務(wù):利用```@huey.periodic_task()```來定義一個周期性任務(wù)蛀蜜。
from base.base_huey import huey
from huey import crontab
@huey.periodic_task(crontab(minute='*'))
def print_time():
print(datetime.now())
5. 取消或暫停任務(wù):revoke刻两。
---
#####嘗試簡單閱讀源碼
...