為了實現(xiàn)快速高效使用計算集群解決大量測試用例管理和執(zhí)行的問題盯串,基于Celery和Django的分布式自動化測試译打,其由API服務器層茉唉、用例管理層食拜、任務調度層和任務執(zhí)行層組成四層架構负甸,實現(xiàn)了定時調度測試柴淘、分布式執(zhí)行第股、失敗重試等功能。能夠快速部署和配置測試執(zhí)行節(jié)點,實現(xiàn)了充分利用計算集群資源控漠、提高測試效率的目的滞诺。
自動化測試Celery工作原理:
Celery.png
Django Celery部署
- 1. 安裝
celery
首先,我們必須擁有一個broker
消息隊列用于發(fā)送和接收消息桐早。Celery
官網給出了多個broker
的備選方案:RabbitMQ
膨俐、Redis
蔓姚、Database
以及其他的消息中間件。我這邊使用的是Redis
作為消息中間人乳幸。
django-celery-beat
定時任務
django-celery-results
存儲Celery
任務結果第三方插件曾撤,我這邊是根據(jù)業(yè)務邏輯重新設計了數(shù)據(jù)結構
pip install celery==5.0.5
pip install redis==3.5.3
pip install django-celery-beat==2.2.0
pip install django-celery-results==2.0.1
- 2. 注冊
APP
INSTALLED_APPS = [
....
'django_celery_beat',
'django_celery_results',
]
- 3. 配置
settings.py
# 設置代理人broker
broker_url = f'redis://{HOST}:6379'
# 使用django orm 作為結果存儲
result_backend = 'django-db'
# celery 的啟動工作數(shù)量設置
worker_concurrency = 5
# 任務預取功能畏梆,就是每個工作的進程/線程在獲取任務的時候慈格,會盡量多拿 n 個梯捕,以保證獲取的通訊成本可以壓縮短曾。
worker_prefetch_multiplier = 5
# celery 的 worker 執(zhí)行多少個任務后進行重啟操作
worker_max_tasks_per_child = 100
# 禁用所有速度限制,如果網絡資源有限,不建議開足馬力碉考。
worker_disable_rate_limits = True
# 指定任務接受的序列化類型
accept_content = ['json']
# 指定任務序列化方式
task_serializer = 'json'
# 指定結果序列化的方式
result_serializer = 'json'
# celery beat配置(周期性任務設置)
timezone = 'Asia/Shanghai'
enable_utc = False
beat_sync_every = 1
# settings USE_TZ=False時添加該選項热芹,否啟動 django celery beat 的時候,出現(xiàn)這個錯誤TypeError: can't compare offset-naive and offset-aware datetimes
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 休眠最大秒數(shù)
beat_max_loop_interval = 300
beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
- 4. 新增
celery_tasks
文件
"""目錄結構"""
├── celery_tasks
│ ├── init.py
│ ├── celery.py
# celery.py
# 將Celery連接到應用程序
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# 為celery設置環(huán)境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ServerDjango.settings')
app = Celery('celery_tasks')
# 加載配置
app.config_from_envvar('DJANGO_SETTINGS_MODULE')
# 設置app自動加載任務
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# init.py
from celery_tasks.celery import app as celery_app
__all__ = ['celery_app']
在celery.py
中設定了對settings.py
中INSTALLED_APPS
做autodiscover_tasks
糊啡,Celery
便會去查看INSTALLD_APPS
下包含的所有app
目錄中的tasks.py
文件糊探,找到標記為task
的方法,將它們注冊為celery task
河闰。需要注意的是科平,與一般的.py
中實現(xiàn)celery
不同,tasks.py
必須建在各app
的根目錄下姜性,且不能隨意命名瞪慧。
- 例:
tasks.py
from celery import shared_task
@shared_task
def tailf_log(channel_name, file_path):
"""跟蹤日志"""
channel_layer = get_channel_layer()
try:
with open(file_path, encoding='utf-8') as f:
while True:
line = f.readline()
if line:
async_to_sync(channel_layer.send)(
channel_name,
{
"type": "send.message",
"message": str(line)
}
)
else:
time.sleep(0.5)
except Exception as e:
f.close()
print(e)
- 5. 分別啟動
woker
和beat
celery -A celery_tasks worker -l info # 啟動woker
celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers.DatabaseScheduler #啟動beat 調度器使用數(shù)據(jù)庫
- 依據(jù)現(xiàn)有業(yè)務邏輯增加了任務失敗重試機制、任務返回后計算下次任務執(zhí)行時間以及當前任務消耗時間功能部念。
import celery
from celery.schedules import crontab
from django_celery_beat.models import PeriodicTask
from ManageApps.my_tasks.models import UserTasks
class CeleryTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
task = UserTasks.objects.get(task_id=kwargs['task_id'])
if kwargs["task_id"] and task.retry:
# 失敗重試弃酌,默認300s
self.retry(exc=exc, countdown=300, max_retries=1)
return super(CeleryTask, self).on_failure(exc, task_id, args, kwargs, einfo)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if kwargs["task_id"]: # 關鍵字參數(shù)task_id, 判斷是否為定時任務
task = _next_run_time(kwargs['task_id'])
elapsed_time = (datetime.datetime.now() - task.last_run_at).total_seconds()
UserTaskResult.objects.create(**{"task_id": kwargs['task_id'], "result_id": task_id,
"elapsed": round(elapsed_time, 2), "status": status})
return super(CeleryTask, self).after_return(status, retval, task_id, args, kwargs, einfo)
def _next_run_time(task_id):
"""計算任務下次運行時間"""
per_task = PeriodicTask.objects.get(id=task_id)
my_task = UserTasks.objects.get(task_id=task_id)
if per_task.crontab_id and my_task.start_time:
# 周期任務
cron_obj = CrontabSchedule.objects.get(id=per_task.crontab_id)
cron = crontab(minute=cron_obj.minute, hour=cron_obj.hour, day_of_week=cron_obj.day_of_week,
day_of_month=cron_obj.day_of_month, month_of_year=cron_obj.month_of_year)
now = cron.now() # 當前運行時間
result = cron.remaining_delta(last_run_at=now)
ends_in = (result[0] + result[1]).replace(tzinfo=None)
my_task.start_time = ends_in
elif per_task.interval_id and my_task.start_time:
# 間隔任務
interval = IntervalSchedule.objects.get(id=per_task.interval_id)
offset = datetime.timedelta(minutes=+0)
if interval.period == 'minutes':
offset = datetime.timedelta(minutes=+interval.every)
elif interval.period == 'days':
offset = datetime.timedelta(days=+interval.every)
elif interval.period == 'hours':
offset = datetime.timedelta(hours=+interval.every)
elif interval.period == 'seconds':
offset = datetime.timedelta(seconds=+interval.every)
elif interval.period == 'microseconds':
offset = datetime.timedelta(microseconds=+interval.every)
my_task.start_time = datetime.datetime.now() + offset
else:
# 第一次運行寫入當前時間
my_task.start_time = datetime.datetime.now()
my_task.save()
return my_task
- 參考
django-celery-beat、django-celery-result
二次設計任務模型
from django.db import models
from django_celery_beat.models import PeriodicTask
from django_celery_results.models import TaskResult, TASK_STATE_CHOICES
class UserTasks(models.Model):
user = models.ForeignKey('user.User', on_delete=models.CASCADE, verbose_name='所屬用戶', help_text='所屬用戶',
null=True, blank=True)
task = models.ForeignKey(to=PeriodicTask, on_delete=models.CASCADE, verbose_name='所屬任務', help_text='所屬任務',
null=True, blank=True)
task_tags = models.CharField(max_length=255, null=True, blank=True, verbose_name='任務標簽', help_text='任務標簽')
notice = models.SmallIntegerField(verbose_name='任務通知', help_text='任務通知')
failfast = models.BooleanField(default=False, blank=True, verbose_name='錯誤停止測試機制', help_text='錯誤停止測試機制')
retry = models.BooleanField(default=False, blank=True, verbose_name='重試機制', help_text='重試機制')
task_type = models.BooleanField(default=False, blank=True, verbose_name='任務類型', help_text='任務類型')
last_run_at = models.DateTimeField(blank=True, null=True, verbose_name='Last Run Datetime',
help_text='計劃上次觸發(fā)任務運行的日期時間')
start_time = models.DateTimeField(blank=True, null=True, verbose_name='Start Datetime',
help_text='Datetime when the schedule should begin triggering the task to run',)
class Meta:
db_table = 'tb_user_tasks'
verbose_name = '用戶任務'
verbose_name_plural = verbose_name
class UserTaskResult(models.Model):
result_id = models.CharField(max_length=255, null=True, blank=True, verbose_name='Result ID', help_text='結果ID')
task = models.ForeignKey(to=PeriodicTask, on_delete=models.CASCADE, verbose_name='所屬任務', help_text='所屬任務',
null=True, blank=True)
create_time = models.BigIntegerField(verbose_name="創(chuàng)建時間", help_text="創(chuàng)建時間")
elapsed = models.FloatField(verbose_name="耗時/s", help_text="耗時/s", null=True, blank=True, default=0.00)
status = models.CharField(max_length=50, default='PENDING', choices=TASK_STATE_CHOICES,
verbose_name='任務狀態(tài)',
help_text='Current state of the task being run')
class Meta:
db_table = 'tb_user_task_result'
verbose_name = '用戶任務結果'
verbose_name_plural = verbose_name
前端頁面
-
任務管理
image.png
image.png -
任務統(tǒng)計
image.png