Skip to content

Commit 7dbc4ec

Browse files
committed
add test script
1 parent 028fa39 commit 7dbc4ec

2 files changed

Lines changed: 220 additions & 0 deletions

File tree

.vscode/launch.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,16 @@
206206
"env":{ "PYTHONPATH": "/app"},
207207
"console": "integratedTerminal",
208208

209+
},
210+
{
211+
"name": "test_api",
212+
"type": "debugpy",
213+
"request": "launch",
214+
"program": "./tests/test_api.py",
215+
"cwd": "${workspaceFolder}",
216+
"env":{ "PYTHONPATH": "/app"},
217+
"console": "integratedTerminal",
218+
209219
},
210220

211221

tests/test_api.py

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
import json
2+
import os
3+
import threading
4+
import time
5+
import uuid
6+
from dataclasses import dataclass, field
7+
8+
import requests
9+
10+
# from tests.warm_up import warm_up_queries
11+
# QUERIES = warm_up_queries
12+
13+
API_URL = "http://localhost:8000"
14+
API_KEY = askUOS_API_KEY = os.getenv("STREAMLIT_API_KEY", "")
15+
headers = {
16+
"Authorization": f"Bearer {askUOS_API_KEY}",
17+
}
18+
19+
# Each user gets a different set of queries to make history more interesting
20+
USER_QUERIES = [
21+
[
22+
"What are the admission requirements for bachelor bioloy?",
23+
"What documents do I need to apply?",
24+
# "When is the application deadline?",
25+
],
26+
[
27+
"How do I apply for a student visa?",
28+
"How long does the visa process take?",
29+
# "What are the visa fees?",
30+
],
31+
[
32+
"What courses are available in computer science?",
33+
"Is there a master's program in AI?",
34+
# "What are the tuition fees?",
35+
],
36+
[
37+
"Where is the international office?",
38+
"How do I get a student ID?",
39+
# "What sports facilities are available?",
40+
],
41+
[
42+
"What scholarships are available?",
43+
"How do I apply for financial aid?",
44+
# "Is there on-campus housing?",
45+
],
46+
]
47+
48+
49+
@dataclass
50+
class UserResult:
51+
user_id: str
52+
thread_id: str
53+
queries: list[str]
54+
durations: list[float] = field(default_factory=list)
55+
streamed_responses: list[str] = field(default_factory=list)
56+
history_ok: bool = False
57+
errors: list[str] = field(default_factory=list)
58+
59+
60+
def consume_stream(response: requests.Response) -> str:
61+
"""Consume stream and return the full assembled text."""
62+
full_text = ""
63+
for line in response.iter_lines():
64+
if not line:
65+
continue
66+
decoded = line.decode("utf-8") if isinstance(line, bytes) else line
67+
if decoded == "data: [DONE]":
68+
break
69+
if decoded.startswith("data: "):
70+
try:
71+
chunk = json.loads(decoded[len("data: ") :])
72+
delta = (
73+
chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
74+
)
75+
if delta:
76+
full_text += delta
77+
except json.JSONDecodeError:
78+
continue
79+
return full_text
80+
81+
82+
def run_user_session(result: UserResult):
83+
84+
for query in result.queries:
85+
start = time.time()
86+
try:
87+
response = requests.post(
88+
f"{API_URL}/v1/chat/completions",
89+
headers=headers,
90+
json={
91+
"messages": [{"role": "user", "content": query}],
92+
"thread_id": result.thread_id,
93+
"stream": True,
94+
"keep_user_message_history": True, # required
95+
"language": "English",
96+
},
97+
stream=True, # do not buffer
98+
timeout=120,
99+
)
100+
response.raise_for_status()
101+
text = consume_stream(response)
102+
result.streamed_responses.append(text)
103+
104+
except Exception as e:
105+
result.errors.append(f"Stream error on '{query}': {e}")
106+
result.streamed_responses.append("")
107+
finally:
108+
result.durations.append(round(time.time() - start, 2))
109+
110+
111+
def verify_history(result: UserResult):
112+
"""Run after all threads complete."""
113+
114+
try:
115+
response = requests.get(
116+
f"{API_URL}/v1/threads/{result.thread_id}/messages",
117+
headers=headers,
118+
timeout=10,
119+
)
120+
response.raise_for_status()
121+
messages = response.json().get("messages", [])
122+
123+
user_msgs = [m.get("content") for m in messages if m.get("role") == "user"]
124+
assistant_msgs = [
125+
m.get("content") for m in messages if m.get("role") == "assistant"
126+
]
127+
128+
missing_user = [q for q in result.queries if q not in user_msgs]
129+
missing_assistant_msgs = [
130+
q for q in result.streamed_responses if q not in assistant_msgs
131+
]
132+
missing_assistant = len(assistant_msgs) < len(result.queries)
133+
134+
result.history_ok = not missing_user and not missing_assistant
135+
136+
if missing_user:
137+
result.errors.append(f"Missing user messages in history: {missing_user}")
138+
if missing_assistant:
139+
result.errors.append(
140+
f"Expected {len(result.queries)} assistant messages, got {len(assistant_msgs)}"
141+
)
142+
if missing_assistant_msgs:
143+
result.errors.append(
144+
f"Assitant messages are not container in retrieved history. Expected messages: {missing_assistant_msgs}"
145+
)
146+
except Exception as e:
147+
result.errors.append(f"History fetch error: {e}")
148+
149+
150+
def test_concurrent_streaming_users():
151+
results = [
152+
UserResult(
153+
user_id=f"user_{i + 1}",
154+
thread_id=str(uuid.uuid4()),
155+
queries=USER_QUERIES[i],
156+
)
157+
for i in range(5)
158+
]
159+
160+
threads = [threading.Thread(target=run_user_session, args=(r,)) for r in results]
161+
162+
overall_start = time.time()
163+
for t in threads:
164+
t.start()
165+
for t in threads:
166+
t.join()
167+
overall_duration = round(time.time() - overall_start, 2)
168+
169+
# ── Verify all histories AFTER all threads complete ──────────────
170+
print("\nVerifying histories...")
171+
for result in results:
172+
verify_history(result)
173+
174+
# ── Stats ────────────────────────────────────────────────────────────
175+
print("\n" + "=" * 65)
176+
print(f"{'CONCURRENT STREAMING USER TEST RESULTS':^65}")
177+
print("=" * 65)
178+
179+
total_errors = 0
180+
for r in results:
181+
avg = round(sum(r.durations) / len(r.durations), 2) if r.durations else 0
182+
status = "✅" if not r.errors else "❌"
183+
history = "✅" if r.history_ok else "❌"
184+
total_errors += len(r.errors)
185+
186+
print(f"\n[{r.user_id}] thread: {r.thread_id[:8]}...")
187+
print(f" History correct : {history}")
188+
print(f" Status : {status}")
189+
if r.errors:
190+
for err in r.errors:
191+
print(f" ⚠ {err}")
192+
print(f" Queries sent : {len(r.queries)}")
193+
print(f" Per query (s) : {list(zip(r.queries, r.durations))}")
194+
print(f" Avg per query : {avg}s")
195+
print(f" Slowest query : {max(r.durations)}s")
196+
print(f" Fastest query : {min(r.durations)}s")
197+
198+
print("\n" + "-" * 65)
199+
print(f" Total users : {len(results)}")
200+
print(f" Total queries sent : {sum(len(r.queries) for r in results)}")
201+
print(f" Total errors : {total_errors}")
202+
print(
203+
f" All histories OK : {'✅' if all(r.history_ok for r in results) else '❌'}"
204+
)
205+
print(f" Total wall time : {overall_duration}s")
206+
print("=" * 65)
207+
208+
209+
if __name__ == "__main__":
210+
test_concurrent_streaming_users()

0 commit comments

Comments
 (0)