-
Notifications
You must be signed in to change notification settings - Fork 310
Expand file tree
/
Copy pathtest_basic_messaging.py
More file actions
314 lines (229 loc) · 9.83 KB
/
test_basic_messaging.py
File metadata and controls
314 lines (229 loc) · 9.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Basic messaging BDD test implementation for Python SDK
"""
import asyncio
import socket
from pytest_bdd import scenarios, given, when, then, parsers
from apache_iggy import IggyClient, SendMessage, PollingStrategy
# Load scenarios from the shared feature file
scenarios("/app/features/basic_messaging.feature")
@given("I have a running Iggy server")
def running_server(context):
"""Ensure we have a running Iggy server and create client"""
async def _connect():
# Resolve hostname to IP if needed
host, port = context.server_addr.split(":")
try:
# Try to resolve hostname to IP
ip_addr = socket.gethostbyname(host)
resolved_addr = f"{ip_addr}:{port}"
except socket.gaierror:
# If resolution fails, use as-is (might be an IP already)
resolved_addr = context.server_addr
context.client = IggyClient(resolved_addr)
await context.client.connect()
await context.client.ping() # Health check
asyncio.run(_connect())
@given("I am authenticated as the root user")
def authenticated_root_user(context):
"""Authenticate as root user"""
async def _login():
await context.client.login_user("iggy", "iggy")
asyncio.run(_login())
@given("I have no streams in the system")
def no_streams_in_system(context):
"""Ensure no streams exist in the system"""
# With --fresh flag on server, this should already be clean
# Just verify by attempting to get a stream that shouldn't exist
pass
@when(parsers.parse("I create a stream with name {stream_name}"))
def create_stream(context, stream_name):
"""Create a stream with specified name"""
async def _create():
await context.client.create_stream(name=stream_name)
stream = await context.client.get_stream(stream_name)
if stream is None:
raise RuntimeError(f"Stream {stream_name} was not found after creation")
context.last_stream_id = stream.id
context.last_stream_name = stream_name
asyncio.run(_create())
@then("the stream should be created successfully")
def stream_created_successfully(context):
"""Verify stream was created successfully"""
async def _verify():
stream = await context.client.get_stream(context.last_stream_name)
assert stream is not None
asyncio.run(_verify())
@then(parsers.parse("the stream should have name {stream_name}"))
def verify_stream_properties(context, stream_name):
"""Verify stream has correct name"""
async def _verify():
stream = await context.client.get_stream(stream_name)
assert stream is not None
assert stream.name == stream_name
asyncio.run(_verify())
@when(
parsers.parse(
"I create a topic with name {topic_name} in stream {stream_id:d} with {partitions:d} partitions"
)
)
def create_topic(context, topic_name, stream_id, partitions):
"""Create a topic with specified parameters"""
async def _create():
await context.client.create_topic(
stream=stream_id, name=topic_name, partitions_count=partitions
)
topic = await context.client.get_topic(stream_id, topic_name)
if topic is None:
raise RuntimeError(f"Topic {topic_name} was not found after creation")
context.last_topic_id = topic.id
context.last_topic_name = topic_name
context.last_topic_partitions = partitions
asyncio.run(_create())
@then("the topic should be created successfully")
def topic_created_successfully(context):
"""Verify topic was created successfully"""
async def _verify():
topic = await context.client.get_topic(
context.last_stream_id, context.last_topic_name
)
assert topic is not None
asyncio.run(_verify())
@then(parsers.parse("the topic should have name {topic_name}"))
def verify_topic_properties(context, topic_name):
"""Verify topic has correct name"""
async def _verify():
topic = await context.client.get_topic(context.last_stream_id, topic_name)
assert topic is not None
assert topic.name == topic_name
asyncio.run(_verify())
@then(parsers.parse("the topic should have {partitions:d} partitions"))
def verify_topic_partitions(context, partitions):
"""Verify topic has correct number of partitions"""
async def _verify():
topic = await context.client.get_topic(
context.last_stream_id, context.last_topic_name
)
assert topic is not None
assert topic.partitions_count == partitions
asyncio.run(_verify())
@when(
parsers.parse(
"I send {message_count:d} messages to stream {stream_id:d}, topic {topic_id:d}, partition {partition_id:d}"
)
)
def send_messages(context, message_count, stream_id, topic_id, partition_id):
"""Send messages to specified stream, topic, and partition"""
async def _send():
messages = []
for i in range(message_count):
content = f"test message {i}"
messages.append(SendMessage(content))
await context.client.send_messages(
stream=stream_id,
topic=topic_id,
partitioning=partition_id,
messages=messages,
)
# Store the last sent message content for comparison
if messages:
context.last_sent_message = f"test message {message_count - 1}"
asyncio.run(_send())
@then("all messages should be sent successfully")
def messages_sent_successfully(context):
"""Verify all messages were sent successfully"""
# If we got here without exception, messages were sent successfully
assert context.last_sent_message is not None
@when(
parsers.parse(
"I poll messages from stream {stream_id:d}, topic {topic_id:d}, partition {partition_id:d} starting from offset {start_offset:d}"
)
)
def poll_messages(context, stream_id, topic_id, partition_id, start_offset):
"""Poll messages from specified location"""
async def _poll():
context.last_polled_messages = await context.client.poll_messages(
stream=stream_id,
topic=topic_id,
partition_id=partition_id,
polling_strategy=PollingStrategy.Offset(value=start_offset),
count=100, # Poll up to 100 messages
auto_commit=True,
)
asyncio.run(_poll())
@then(parsers.parse("I should receive {expected_count:d} messages"))
def verify_message_count(context, expected_count):
"""Verify correct number of messages received"""
assert context.last_polled_messages is not None
assert len(context.last_polled_messages) == expected_count
@then(
parsers.parse(
"the messages should have sequential offsets from {start_offset:d} to {end_offset:d}"
)
)
def verify_sequential_offsets(context, start_offset, end_offset):
"""Verify messages have sequential offsets"""
assert context.last_polled_messages is not None
for i, message in enumerate(context.last_polled_messages):
expected_offset = start_offset + i
assert message.offset() == expected_offset
last_message = context.last_polled_messages[-1]
assert last_message.offset() == end_offset
@then("each message should have the expected payload content")
def verify_payload_content(context):
"""Verify each message has expected payload content"""
assert context.last_polled_messages is not None
for i, message in enumerate(context.last_polled_messages):
expected_payload = f"test message {i}"
actual_payload = message.payload().decode("utf-8")
assert actual_payload == expected_payload
@then("the last polled message should match the last sent message")
def verify_last_message_match(context):
"""Verify last polled message matches last sent message"""
assert context.last_sent_message is not None
assert context.last_polled_messages is not None
last_polled = context.last_polled_messages[-1]
last_polled_payload = last_polled.payload().decode("utf-8")
assert last_polled_payload == context.last_sent_message
@when(parsers.parse('I update the stream name to "{new_name}"'))
def update_stream_name(context, new_name):
"""Update the stream name"""
async def _update():
await context.client.update_stream(context.last_stream_id, new_name)
context.last_stream_name = new_name
asyncio.run(_update())
@then(parsers.parse('the stream name should be updated to "{expected_name}"'))
def verify_stream_name_updated(context, expected_name):
"""Verify stream name was updated"""
async def _verify():
stream = await context.client.get_stream(context.last_stream_id)
assert stream is not None
assert stream.name == expected_name
asyncio.run(_verify())
@when("I delete the stream")
def delete_stream(context):
"""Delete the stream"""
async def _delete():
await context.client.delete_stream(context.last_stream_id)
context.last_stream_id = None
asyncio.run(_delete())
@then("the stream should be deleted successfully")
def verify_stream_deleted(context):
"""Verify stream was deleted"""
assert context.last_stream_id is None