-
Notifications
You must be signed in to change notification settings - Fork 4.8k
Expand file tree
/
Copy pathtest_stream_accumulator.py
More file actions
242 lines (208 loc) · 9.56 KB
/
test_stream_accumulator.py
File metadata and controls
242 lines (208 loc) · 9.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
"""Tests for ResponseStreamState bounds checking in accumulate_event and handle_event.
Verifies that out-of-bounds output_index and content_index values do not raise
IndexError, as reported in https://github.com/openai/openai-python/issues/2852.
"""
from __future__ import annotations
from openai._types import omit
from openai.lib.streaming.responses._responses import ResponseStreamState
from openai.types.responses import Response
from openai.types.responses.response_created_event import ResponseCreatedEvent
from openai.types.responses.response_text_delta_event import ResponseTextDeltaEvent
from openai.types.responses.response_text_done_event import ResponseTextDoneEvent
from openai.types.responses.response_content_part_added_event import ResponseContentPartAddedEvent
from openai.types.responses.response_output_text import ResponseOutputText
from openai.types.responses.response_function_call_arguments_delta_event import (
ResponseFunctionCallArgumentsDeltaEvent,
)
def _make_state() -> ResponseStreamState:
return ResponseStreamState(input_tools=omit, text_format=omit)
def _created_event() -> ResponseCreatedEvent:
"""Construct a minimal response.created event with an empty output list."""
response = Response.model_construct(
id="resp-test",
created_at=0.0,
model="gpt-4o",
object="response",
output=[],
parallel_tool_calls=False,
tool_choice="auto",
tools=[],
)
return ResponseCreatedEvent(
response=response,
sequence_number=0,
type="response.created",
)
def _text_delta_event(*, output_index: int = 0, content_index: int = 0, seq: int = 1) -> ResponseTextDeltaEvent:
return ResponseTextDeltaEvent(
content_index=content_index,
delta="hello",
item_id="item-1",
logprobs=[],
output_index=output_index,
sequence_number=seq,
type="response.output_text.delta",
)
def _text_done_event(*, output_index: int = 0, content_index: int = 0, seq: int = 2) -> ResponseTextDoneEvent:
return ResponseTextDoneEvent(
content_index=content_index,
item_id="item-1",
logprobs=[],
output_index=output_index,
sequence_number=seq,
text="hello world",
type="response.output_text.done",
)
def _content_part_added_event(*, output_index: int = 0, content_index: int = 0, seq: int = 1) -> ResponseContentPartAddedEvent:
part = ResponseOutputText.model_construct(
annotations=[],
text="",
type="output_text",
)
return ResponseContentPartAddedEvent(
content_index=content_index,
item_id="item-1",
output_index=output_index,
part=part,
sequence_number=seq,
type="response.content_part.added",
)
def _function_call_args_delta_event(*, output_index: int = 0, seq: int = 1) -> ResponseFunctionCallArgumentsDeltaEvent:
return ResponseFunctionCallArgumentsDeltaEvent(
delta='{"key":',
item_id="fc-1",
output_index=output_index,
sequence_number=seq,
type="response.function_call_arguments.delta",
)
# ---------------------------------------------------------------------------
# accumulate_event: out-of-bounds output_index
# ---------------------------------------------------------------------------
class TestAccumulateEventBoundsCheck:
"""accumulate_event should silently skip when indices are out of bounds."""
def test_content_part_added_out_of_bounds_output_index(self) -> None:
state = _make_state()
state.handle_event(_created_event())
# output list is empty, output_index=0 is out of bounds
snapshot = state.accumulate_event(_content_part_added_event(output_index=0))
# Should not raise; output list remains empty
assert len(snapshot.output) == 0
def test_text_delta_out_of_bounds_output_index(self) -> None:
state = _make_state()
state.handle_event(_created_event())
snapshot = state.accumulate_event(_text_delta_event(output_index=0))
assert len(snapshot.output) == 0
def test_text_delta_out_of_bounds_content_index(self) -> None:
"""output_index is valid but content_index is out of bounds."""
state = _make_state()
state.handle_event(_created_event())
# Manually add a message output item with empty content
from openai.types.responses.response_output_message import ResponseOutputMessage
msg = ResponseOutputMessage.model_construct(
id="msg-1",
type="message",
status="in_progress",
content=[],
role="assistant",
)
state.accumulate_event(_created_event()) # reinitialize
# Access state internals to get the snapshot
state_obj = state
# Use handle_event to initialize snapshot first, then manually append
# Actually, we need a proper approach: init, then append item
state2 = _make_state()
state2.handle_event(_created_event())
# Directly modify the snapshot to have a message with empty content
snapshot = state2.accumulate_event(_text_delta_event(output_index=5, content_index=0))
# Should not raise
assert snapshot is not None
def test_function_call_delta_out_of_bounds_output_index(self) -> None:
state = _make_state()
state.handle_event(_created_event())
snapshot = state.accumulate_event(_function_call_args_delta_event(output_index=0))
assert len(snapshot.output) == 0
# ---------------------------------------------------------------------------
# handle_event: out-of-bounds output_index should return raw event as fallback
# ---------------------------------------------------------------------------
class TestHandleEventBoundsCheck:
"""handle_event should return the raw event when indices are out of bounds."""
def test_text_delta_out_of_bounds_output_index_returns_raw_event(self) -> None:
state = _make_state()
state.handle_event(_created_event())
delta = _text_delta_event(output_index=5)
events = state.handle_event(delta)
assert len(events) == 1
# Should be the raw event, not an enriched ResponseTextDeltaEvent
assert events[0].type == "response.output_text.delta"
def test_text_done_out_of_bounds_output_index_returns_raw_event(self) -> None:
state = _make_state()
state.handle_event(_created_event())
done = _text_done_event(output_index=5)
events = state.handle_event(done)
assert len(events) == 1
assert events[0].type == "response.output_text.done"
def test_function_call_delta_out_of_bounds_output_index_returns_raw_event(self) -> None:
state = _make_state()
state.handle_event(_created_event())
delta = _function_call_args_delta_event(output_index=5)
events = state.handle_event(delta)
assert len(events) == 1
assert events[0].type == "response.function_call_arguments.delta"
def test_text_delta_out_of_bounds_content_index_returns_raw_event(self) -> None:
"""output_index is valid (message exists) but content_index is out of bounds."""
state = _make_state()
state.handle_event(_created_event())
# We need an output item in the snapshot but with empty content.
# Send an output_item.added event to populate the output list.
from openai.types.responses.response_output_item_added_event import ResponseOutputItemAddedEvent
from openai.types.responses.response_output_message import ResponseOutputMessage
msg = ResponseOutputMessage.model_construct(
id="msg-1",
type="message",
status="in_progress",
content=[],
role="assistant",
)
item_added = ResponseOutputItemAddedEvent(
item=msg,
output_index=0,
sequence_number=1,
type="response.output_item.added",
)
state.handle_event(item_added)
# Now send a text delta with content_index=0, but content list is empty
delta = _text_delta_event(output_index=0, content_index=0, seq=2)
events = state.handle_event(delta)
assert len(events) == 1
assert events[0].type == "response.output_text.delta"
def test_normal_flow_still_works(self) -> None:
"""Verify that normal in-order events still produce enriched events."""
state = _make_state()
state.handle_event(_created_event())
# Add output item
from openai.types.responses.response_output_item_added_event import ResponseOutputItemAddedEvent
from openai.types.responses.response_output_message import ResponseOutputMessage
msg = ResponseOutputMessage.model_construct(
id="msg-1",
type="message",
status="in_progress",
content=[],
role="assistant",
)
item_added = ResponseOutputItemAddedEvent(
item=msg,
output_index=0,
sequence_number=1,
type="response.output_item.added",
)
state.handle_event(item_added)
# Add content part
state.handle_event(_content_part_added_event(output_index=0, content_index=0, seq=2))
# Send text delta -- should work normally
delta = _text_delta_event(output_index=0, content_index=0, seq=3)
events = state.handle_event(delta)
assert len(events) == 1
event = events[0]
assert event.type == "response.output_text.delta"
# The enriched event should have a 'snapshot' field
assert hasattr(event, "snapshot")