Celery和Django的分布式自動化測試

為了實現(xiàn)快速高效使用計算集群解決大量測試用例管理和執(zhí)行的問題盯串,基于Celery和Django的分布式自動化測試译打,其由API服務器層茉唉、用例管理層食拜、任務調度層和任務執(zhí)行層組成四層架構负甸,實現(xiàn)了定時調度測試柴淘、分布式執(zhí)行第股、失敗重試等功能。能夠快速部署和配置測試執(zhí)行節(jié)點,實現(xiàn)了充分利用計算集群資源控漠、提高測試效率的目的滞诺。

自動化測試Celery工作原理:

Celery.png

Django Celery部署

  • 1. 安裝celery

首先,我們必須擁有一個broker消息隊列用于發(fā)送和接收消息桐早。Celery官網給出了多個broker的備選方案:RabbitMQ膨俐、Redis蔓姚、Database以及其他的消息中間件。我這邊使用的是Redis作為消息中間人乳幸。
django-celery-beat 定時任務
django-celery-results 存儲Celery任務結果第三方插件曾撤,我這邊是根據(jù)業(yè)務邏輯重新設計了數(shù)據(jù)結構

pip install celery==5.0.5
pip install redis==3.5.3
pip install django-celery-beat==2.2.0
pip install django-celery-results==2.0.1
  • 2. 注冊APP
INSTALLED_APPS = [
    ....   
    'django_celery_beat',
    'django_celery_results',
]
  • 3. 配置settings.py
# 設置代理人broker
broker_url = f'redis://{HOST}:6379'
# 使用django orm 作為結果存儲
result_backend = 'django-db'
# celery 的啟動工作數(shù)量設置
worker_concurrency = 5
# 任務預取功能畏梆,就是每個工作的進程/線程在獲取任務的時候慈格,會盡量多拿 n 個梯捕,以保證獲取的通訊成本可以壓縮短曾。
worker_prefetch_multiplier = 5
# celery 的 worker 執(zhí)行多少個任務后進行重啟操作
worker_max_tasks_per_child = 100
# 禁用所有速度限制,如果網絡資源有限,不建議開足馬力碉考。
worker_disable_rate_limits = True
# 指定任務接受的序列化類型
accept_content = ['json']
# 指定任務序列化方式
task_serializer = 'json'
# 指定結果序列化的方式
result_serializer = 'json'
# celery beat配置(周期性任務設置)
timezone = 'Asia/Shanghai'
enable_utc = False
beat_sync_every = 1
# settings USE_TZ=False時添加該選項热芹,否啟動 django celery beat 的時候,出現(xiàn)這個錯誤TypeError: can't compare offset-naive and offset-aware datetimes
DJANGO_CELERY_BEAT_TZ_AWARE = False 
# 休眠最大秒數(shù)
beat_max_loop_interval = 300
beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
  • 4. 新增celery_tasks文件
"""目錄結構"""
├── celery_tasks
│ ├── init.py
│ ├── celery.py
# celery.py
# 將Celery連接到應用程序
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# 為celery設置環(huán)境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ServerDjango.settings')
app = Celery('celery_tasks')
# 加載配置
app.config_from_envvar('DJANGO_SETTINGS_MODULE')
# 設置app自動加載任務
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


# init.py
from celery_tasks.celery import app as celery_app
__all__ = ['celery_app']

celery.py中設定了對settings.pyINSTALLED_APPSautodiscover_tasks糊啡,Celery便會去查看INSTALLD_APPS下包含的所有app目錄中的tasks.py文件糊探,找到標記為task的方法,將它們注冊為celery task河闰。需要注意的是科平,與一般的.py中實現(xiàn)celery不同,tasks.py必須建在各app的根目錄下姜性,且不能隨意命名瞪慧。

  • 例: tasks.py
from celery import shared_task

@shared_task
def tailf_log(channel_name, file_path):
    """跟蹤日志"""
    channel_layer = get_channel_layer()
    try:
        with open(file_path, encoding='utf-8') as f:
            while True:
                line = f.readline()
                if line:
                    async_to_sync(channel_layer.send)(
                        channel_name,
                        {
                            "type": "send.message",
                            "message": str(line)
                        }
                    )
                else:
                    time.sleep(0.5)
    except Exception as e:
        f.close()
        print(e)

  • 5. 分別啟動wokerbeat
celery -A celery_tasks worker -l info   # 啟動woker
celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers.DatabaseScheduler #啟動beat 調度器使用數(shù)據(jù)庫
  • 依據(jù)現(xiàn)有業(yè)務邏輯增加了任務失敗重試機制、任務返回后計算下次任務執(zhí)行時間以及當前任務消耗時間功能部念。
import celery
from celery.schedules import crontab
from django_celery_beat.models import PeriodicTask
from ManageApps.my_tasks.models import UserTasks

class CeleryTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        task = UserTasks.objects.get(task_id=kwargs['task_id'])
        if kwargs["task_id"] and task.retry:
            # 失敗重試弃酌,默認300s
            self.retry(exc=exc, countdown=300, max_retries=1)
        return super(CeleryTask, self).on_failure(exc, task_id, args, kwargs, einfo)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if kwargs["task_id"]:  # 關鍵字參數(shù)task_id, 判斷是否為定時任務
            task = _next_run_time(kwargs['task_id'])
            elapsed_time = (datetime.datetime.now() - task.last_run_at).total_seconds()
            UserTaskResult.objects.create(**{"task_id": kwargs['task_id'], "result_id": task_id,
                                             "elapsed": round(elapsed_time, 2), "status": status})
        return super(CeleryTask, self).after_return(status, retval, task_id, args, kwargs, einfo)

def _next_run_time(task_id):
    """計算任務下次運行時間"""
    per_task = PeriodicTask.objects.get(id=task_id)
    my_task = UserTasks.objects.get(task_id=task_id)
    if per_task.crontab_id and my_task.start_time:
        # 周期任務
        cron_obj = CrontabSchedule.objects.get(id=per_task.crontab_id)
        cron = crontab(minute=cron_obj.minute, hour=cron_obj.hour, day_of_week=cron_obj.day_of_week,
                       day_of_month=cron_obj.day_of_month, month_of_year=cron_obj.month_of_year)
        now = cron.now()  # 當前運行時間
        result = cron.remaining_delta(last_run_at=now)
        ends_in = (result[0] + result[1]).replace(tzinfo=None)
        my_task.start_time = ends_in
    elif per_task.interval_id and my_task.start_time:
        # 間隔任務
        interval = IntervalSchedule.objects.get(id=per_task.interval_id)
        offset = datetime.timedelta(minutes=+0)
        if interval.period == 'minutes':
            offset = datetime.timedelta(minutes=+interval.every)
        elif interval.period == 'days':
            offset = datetime.timedelta(days=+interval.every)
        elif interval.period == 'hours':
            offset = datetime.timedelta(hours=+interval.every)
        elif interval.period == 'seconds':
            offset = datetime.timedelta(seconds=+interval.every)
        elif interval.period == 'microseconds':
            offset = datetime.timedelta(microseconds=+interval.every)
        my_task.start_time = datetime.datetime.now() + offset
    else:
        # 第一次運行寫入當前時間
        my_task.start_time = datetime.datetime.now()
    my_task.save()
    return my_task
  • 參考django-celery-beat、django-celery-result二次設計任務模型
from django.db import models
from django_celery_beat.models import PeriodicTask
from django_celery_results.models import TaskResult, TASK_STATE_CHOICES

class UserTasks(models.Model):
    user = models.ForeignKey('user.User', on_delete=models.CASCADE, verbose_name='所屬用戶', help_text='所屬用戶',
                             null=True, blank=True)
    task = models.ForeignKey(to=PeriodicTask, on_delete=models.CASCADE, verbose_name='所屬任務', help_text='所屬任務',
                             null=True, blank=True)
    task_tags = models.CharField(max_length=255, null=True, blank=True, verbose_name='任務標簽', help_text='任務標簽')
    notice = models.SmallIntegerField(verbose_name='任務通知', help_text='任務通知')
    failfast = models.BooleanField(default=False, blank=True, verbose_name='錯誤停止測試機制', help_text='錯誤停止測試機制')
    retry = models.BooleanField(default=False, blank=True, verbose_name='重試機制', help_text='重試機制')
    task_type = models.BooleanField(default=False, blank=True, verbose_name='任務類型', help_text='任務類型')
    last_run_at = models.DateTimeField(blank=True, null=True, verbose_name='Last Run Datetime',
                                       help_text='計劃上次觸發(fā)任務運行的日期時間')
    start_time = models.DateTimeField(blank=True, null=True, verbose_name='Start Datetime',
                                      help_text='Datetime when the schedule should begin triggering the task to run',)

    class Meta:
        db_table = 'tb_user_tasks'
        verbose_name = '用戶任務'
        verbose_name_plural = verbose_name


class UserTaskResult(models.Model):
    result_id = models.CharField(max_length=255, null=True, blank=True, verbose_name='Result ID', help_text='結果ID')
    task = models.ForeignKey(to=PeriodicTask, on_delete=models.CASCADE, verbose_name='所屬任務', help_text='所屬任務',
                             null=True, blank=True)
    create_time = models.BigIntegerField(verbose_name="創(chuàng)建時間", help_text="創(chuàng)建時間")
    elapsed = models.FloatField(verbose_name="耗時/s", help_text="耗時/s", null=True, blank=True, default=0.00)
    status = models.CharField(max_length=50, default='PENDING', choices=TASK_STATE_CHOICES,
                              verbose_name='任務狀態(tài)',
                              help_text='Current state of the task being run')

    class Meta:
        db_table = 'tb_user_task_result'
        verbose_name = '用戶任務結果'
        verbose_name_plural = verbose_name

前端頁面

  • 任務管理
    image.png

    image.png
  • 任務統(tǒng)計
    image.png
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末儡炼,一起剝皮案震驚了整個濱河市妓湘,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌射赛,老刑警劉巖多柑,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異楣责,居然都是意外死亡竣灌,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進店門秆麸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來初嘹,“玉大人,你說我怎么就攤上這事沮趣⊥头常” “怎么了?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長驻龟。 經常有香客問我温眉,道長,這世上最難降的妖魔是什么翁狐? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任类溢,我火速辦了婚禮,結果婚禮上露懒,老公的妹妹穿的比我還像新娘闯冷。我一直安慰自己,他們只是感情好懈词,可當我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布蛇耀。 她就那樣靜靜地躺著,像睡著了一般坎弯。 火紅的嫁衣襯著肌膚如雪纺涤。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天抠忘,我揣著相機與錄音洒琢,去河邊找鬼。 笑死褐桌,一個胖子當著我的面吹牛衰抑,可吹牛的內容都是我干的。 我是一名探鬼主播荧嵌,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼呛踊,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了啦撮?” 一聲冷哼從身側響起谭网,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎赃春,沒想到半個月后愉择,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡织中,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年锥涕,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片狭吼。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡层坠,死狀恐怖,靈堂內的尸體忽然破棺而出刁笙,到底是詐尸還是另有隱情破花,我是刑警寧澤谦趣,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站座每,受9級特大地震影響前鹅,放射性物質發(fā)生泄漏。R本人自食惡果不足惜峭梳,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一嫡纠、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧延赌,春花似錦、人聲如沸叉橱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽窃祝。三九已至掐松,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間粪小,已是汗流浹背大磺。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留探膊,地道東北人杠愧。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像逞壁,于是被迫代替她去往敵國和親流济。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,960評論 2 355

推薦閱讀更多精彩內容