-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathcommand.py
More file actions
executable file
·249 lines (218 loc) · 7.8 KB
/
command.py
File metadata and controls
executable file
·249 lines (218 loc) · 7.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
import os
import subprocess
import sys
import time
import requests
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ROOT_DIR)
# noqa: I001
from api.utils import ( # noqa: E402
OPENAPI_URL,
PROJECT_ID,
SERVER_URL,
VERSION_URL,
get_path_op_id,
session,
)
# noqa: E402
from lib.chatmark.step import Step # noqa: E402
def get_apidocs():
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs",
params={"page": 1, "size": 100},
)
return res.json()["docs"]
def delete_old_apidocs():
apidocs = get_apidocs()
for apidoc in apidocs:
session.delete(f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs/{apidoc['id']}")
def get_local_version():
cmd = "git rev-parse HEAD"
res = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE)
return res.stdout.decode("utf-8").strip()
def check_api_version():
# 如果没有配置VERSION_URL,则跳过版本检查
if not VERSION_URL:
print("未配置VERSION_URL,跳过API版本检查...")
return
local_version = get_local_version()
print("检查被测服务器文档是否已经更新到最新版本...")
while True:
try:
res = session.get(VERSION_URL)
version = res.json()["version"]
if version == local_version:
print(f"API 文档已更新,当前版本为 {version},开始上传 OpenAPI 文档...")
break
else:
print(
".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查 API 版本失败!{e}", flush=True)
time.sleep(5)
def wait_for_testcase_done(testcase_id):
while True:
try:
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcases/{testcase_id}"
)
data = res.json()
status = data["status"]
if status == "content_ready":
print("文本用例生成完成!", flush=True)
break
else:
print(
".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查文本用例状态失败!{e}", flush=True)
time.sleep(5)
def wait_for_testcode_done(task_id):
while True:
try:
res = session.get(f"{SERVER_URL}/tasks/{task_id}")
data = res.json()
status = data["status"]
if status == "succeeded":
print("自动测试脚本生成完成!", flush=True)
break
else:
print(
".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查自动测试脚本生成失败!{e}", flush=True)
time.sleep(5)
def get_testcode(testcase_id):
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcodes",
params={"testcase_id": testcase_id},
)
return res.json()["testcodes"][0]
def wait_for_task_done(task_id):
while True:
try:
res = session.get(f"{SERVER_URL}/tasks/{task_id}")
data = res.json()
status = data["status"]
if status == "succeeded":
print("自动测试脚本执行完成!", flush=True)
break
else:
print(
".",
end="",
flush=True,
)
time.sleep(5)
except Exception as e:
print(f"检查自动测试脚本状态失败!{e}", flush=True)
time.sleep(5)
def get_testcase(api_path_id):
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcases",
params={"page": 1, "size": 100, "pathop_id": api_path_id},
)
return res.json()["testcases"][0]
def main():
error_msg = "请输入要测试的API名称和测试目标!如:/test.api.upload api_path method test_target"
if len(sys.argv) < 2:
print(error_msg)
return
args = sys.argv[1].strip().split(" ")
if len(args) < 3:
print(error_msg)
return
api_path = args[0]
method = args[1]
test_target = " ".join(args[2:])
with Step("检查 API 版本是否更新..."):
check_api_version()
delete_old_apidocs()
with Step(f"上传 OpenAPI 文档,并且触发 API {api_path} 的测试用例和自动测试脚本生成任务..."):
# 使用配置的OPENAPI_URL
if not OPENAPI_URL:
print("错误:未配置OPENAPI_URL,无法获取OpenAPI文档")
return
res = requests.get(
OPENAPI_URL,
)
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/apidocs",
files={"file": ("openapi.json", res.content, "application/json")},
data={"apiauth_id": 1},
)
if res.status_code == 200:
print("上传 OpenAPI 文档成功!\n")
else:
print(f"上传 OpenAPI 文档失败!{res.text}", flush=True)
return
apipathop_id = get_path_op_id(api_path, method)
with Step("开始生成文本用例..."):
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcases",
params={"generate_content": True},
json={"apipathop_id": apipathop_id, "title": test_target},
)
if res.status_code == 200:
print("提交生成文本用例成功!等待生成完成...", flush=True)
testcase_id = res.json()["id"]
wait_for_testcase_done(testcase_id)
else:
print(f"提交生成文本用例失败!{res.text}", flush=True)
return
with Step("开始生成自动测试脚本..."):
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/tasks/testcode",
params={"testcase_id": testcase_id},
)
if res.status_code == 200:
print("提交生成自动测试脚本成功!等待生成完成...", flush=True)
task_id = res.json()["id"]
wait_for_testcode_done(task_id)
else:
print(f"提交生成自动测试脚本失败!{res.text}", flush=True)
return
with Step("开始执行自动测试脚本..."):
testcode = get_testcode(testcase_id)
testcode_id = testcode["id"]
res = session.post(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/testcodes/{testcode_id}/exec",
)
if res.status_code == 200:
print("提交执行自动测试脚本成功!", flush=True)
else:
print(f"提交执行自动测试脚本失败!{res.text}")
return
api_path_id = get_path_op_id(api_path, method)
with Step("开始查询测试脚本执行结果..."):
while True:
res = session.get(
f"{SERVER_URL}/autotest/projects/{PROJECT_ID}/tasks",
params={"page": 1, "size": 1},
)
ret = res.json()
task = ret["tasks"][0]
task_id = task["id"]
wait_for_task_done(task_id)
testcase = get_testcase(api_path_id)
last_testcode_passed = testcase["last_testcode_passed"]
if last_testcode_passed:
print("测试脚本执行成功!", flush=True)
break
else:
print("测试脚本执行失败!", flush=True)
break
if __name__ == "__main__":
main()