以如下python接口為例:
@statefulsets_api.route('/stateful-sets', methods=['GET'])
@statefulsets_api.route('/namespaces/<namespace>/stateful-sets', methods=['GET'])
def get_deployments(namespace=None):
# If a namespace was specified, list deployments for that namespace only
# Otherwise, list deployments for all namespaces
if namespace:
statefulsets_list = v1.list_namespaced_stateful_set(namespace, watch=False)
else:
statefulsets_list = v1.list_stateful_set_for_all_namespaces(watch=False)
return jsonify([statefulset.to_dict() for statefulset in statefulsets_list.items])
我們給這個接口添加一個namespace的校驗邏輯瞭郑,驗證namespace只包含 大小寫字母为狸、數(shù)字祠丝、下劃線何吝、中劃線剖效。
首先偷俭,我們需要定義一個函數(shù)來檢查kubernetes的namespace是否符合要求
import re
def is_valid_namespace(namespace):
return bool(re.match(r'^[a-zA-Z0-9_-]+$', namespace))
接下來她我,我們需要修改get_deployments
函數(shù)枚赡,以便在收到帶有無效namespace的請求時返回適當?shù)腻e誤消息:
from flask import jsonify, abort
@deployments_api.route('/deployments', methods=['GET'])
@deployments_api.route('/namespaces/<namespace>/deployments', methods=['GET'])
def get_deployments(namespace=None):
if namespace and not is_valid_namespace(namespace):
abort(400, description="Invalid namespace. It should only contain letters, numbers, underscores, and hyphens.")
# If a namespace was specified, list deployments for that namespace only
# Otherwise, list deployments for all namespaces
if namespace:
deployments_list = v1.list_namespaced_deployment(namespace, watch=False)
else:
deployments_list = v1.list_deployment_for_all_namespaces(watch=False)
return jsonify([deployment.to_dict() for deployment in deployments_list.items])
現(xiàn)在勾徽,我們可以編寫針對這個接口的測試用例滑凉,確保當請求無效的namespace時,接口返回400錯誤。
假設當前項目的路徑設計如下
├── api
│ ├── kafka_api
│ ├── kubernetes_api
│ │ ├── __init__.py
│ ├── __init__.py
├── app.py
├── module
我們接下來在項目中
- 在
api/kubernetes
文件夾中畅姊,創(chuàng)建一個名為tests
的子文件夾闪幽。 - 在
api/kubernetes/tests
文件夾中,創(chuàng)建一個名為test_kubernetes_deployments.py
的文件涡匀。在這個文件中盯腌,您可以編寫針對api/kubernetes
中的功能的測試用例。
接下來項目結(jié)構(gòu)應該變化成如下所示:
├── api
│ ├── kafka_api
│ ├── kubernetes_api
│ │ ├── __init__.py
│ │ ├── tests
│ │ │ ├── __init__.py
│ │ │ ├── test_kubernetes_deployments.py
│ ├── __init__.py
├── app.py
├── module
為了讓pytest
能夠自動發(fā)現(xiàn)并運行這些測試用例陨瘩,您需要在每個包含測試用例的文件夾中添加一個__init__.py
文件(如果尚未存在)腕够。在這種情況下,您需要確保api/kubernetes
和api/kubernetes/tests
文件夾中都有__init__.py
文件舌劳。
書寫測試用例帚湘,并運行。注:也可使用pytest命令行運行
import pytest
from app import app
@pytest.fixture
def client():
with app.test_client() as client:
yield client
def test_get_deployments_invalid_namespace(client):
# Test with an invalid namespace parameter
response = client.get('/api/kubernetes/instances/default/namespaces/invalid@namespace/deployments')
assert response.status_code == 400
decode = response.data.decode('utf-8')
assert "Invalid namespace. It should only contain letters, numbers, underscores, and hyphens." in decode
如果運行時出現(xiàn)了TypeError: __init__() got an unexpected keyword argument 'as_tuple'
錯誤甚淡,請升級您的flask版本到最新版本