環(huán)境
- python:3.7
- locust:1.3.2
# -*- coding: utf-8 -*-
# @Author : Lee
# @File : demo.py
import os
import sys
# 將當(dāng)前項(xiàng)目根目錄添加到運(yùn)行環(huán)境中,目的是引用自定義包
sys.path.append(os.path.join(os.getcwd(), "../..")) # 需根據(jù)當(dāng)前文件所在目錄層級(jí)調(diào)整
from pathlib import Path
from locust.runners import MasterRunner # 設(shè)置主從運(yùn)行時(shí)導(dǎo)入
from locust import HttpUser, task, between, tag, events # Locust框架相關(guān)
class LocustDemo(HttpUser):
wait_time = between(1, 5)
# 每啟動(dòng)一個(gè)用戶(hù)調(diào)用一次
def on_start(self):
print("on_start")
def on_stop(self):
print("on_stop")
@task
@tag("tag", "tag1")
def task_demo_tag1(self):
response = self.client.post(url, params=params, data=data, catch_response=True)
# 執(zhí)行成功
response.success()
# 執(zhí)行失敗
response.failure(“error message”)
@task
@tag("tag2")
def task_demo_tag2(self):
response = self.client.post(url, params=params, data=data, catch_response=True)
# 執(zhí)行成功
response.success()
# 執(zhí)行失敗
response.failure(“error message”)
@task
@tag("tag3")
def task_demo_tag3(self):
response = self.client.get(url, params=params, catch_response=True)
# 執(zhí)行成功
response.success()
# 執(zhí)行失敗
response.failure(“error message”)
@task
@tag("tag4")
def task_demo_tag4(self):
response = self.client.get(url, params=params, catch_response=True)
# 執(zhí)行成功
response.success()
# 執(zhí)行失敗
response.failure(“error message”)
# 測(cè)試開(kāi)始前執(zhí)行饺鹃,只執(zhí)行一次
@events.test_start.add_listener
def on_test_start(**kwargs):
print("A new test is starting")
# 測(cè)試結(jié)束后執(zhí)行
@events.test_stop.add_listener
def on_test_stop(**kwargs):
print("A new test is ending")
# 設(shè)置主從時(shí)在主機(jī)上執(zhí)行
@events.init.add_listener
def on_locust_init(environment, **kwargs):
if isinstance(environment.runner, MasterRunner):
print("I'm on master node")
else:
print("I'm on a worker or standalone node")
if __name__ == '__main__':
res = os.system("locust -f {} --host={} --tags tag4 tag1".format(Path(__file__).name, configr.cp.get("api", "pre_url")))