Skip to content

Commit 22e03c8

Browse files
author
sav-da
committed
feat: fix after review
1 parent 8a04765 commit 22e03c8

3 files changed

Lines changed: 23 additions & 13 deletions

File tree

rabbitmq/functional_tests/basic_chaos/tests/test_rabbitmq.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -152,20 +152,27 @@ def message_consumed(data):
152152
assert response.status_code == 200
153153
await message_consumed.wait_call()
154154

155-
await gate.to_server_noop()
156-
await gate.to_client_noop()
157-
await asyncio.sleep(2.5)
158-
await gate.to_server_pass()
159-
await gate.to_client_pass()
160-
await gate.sockets_close()
155+
async with service_client.capture_logs(log_level='INFO') as capture:
161156

162-
await gate.wait_for_connections(timeout=10.0)
163-
await asyncio.sleep(1.0)
157+
@capture.subscribe(text="Consumer for queue 'chaos-queue' is broken, trying to restart")
158+
def consumer_broken(**kwargs):
159+
pass
164160

165-
response = await service_client.post('/v1/messages?message=after-heartbeat')
166-
assert response.status_code == 200
161+
@capture.subscribe(text='Restarted successfully')
162+
def consumer_restarted(**kwargs):
163+
pass
167164

168-
await message_consumed.wait_call()
165+
await gate.to_server_drop()
166+
await asyncio.sleep(3.0)
167+
await gate.to_server_pass()
168+
169+
await consumer_broken.wait_call()
170+
await consumer_restarted.wait_call()
171+
172+
response = await service_client.post('/v1/messages?message=after-heartbeat')
173+
assert response.status_code == 200
174+
175+
await message_consumed.wait_call()
169176

170177
response = await service_client.get('/v1/messages')
171178
assert response.status_code == 200

rabbitmq/src/urabbitmq/impl/amqp_connection_handler.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ void AmqpConnectionHandler::onReady(AMQP::Connection*) {
180180
}
181181

182182
void AmqpConnectionHandler::OnConnectionCreated(AmqpConnection* connection, engine::Deadline deadline) {
183+
UINVARIANT(connection_ == nullptr, "Unexpected repeated OnConnectionCreated call");
183184
connection_ = connection;
184185
reader_.Start(connection);
185186

rabbitmq/src/urabbitmq/impl/header_value.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ AMQP::Array ToAmqpArray(const HeaderValue& value) {
8585
AMQP::Table ToAmqpTable(const HeaderValue& value) {
8686
AMQP::Table table;
8787
for (const auto& [key, item] : formats::common::Items(value)) {
88-
WithAmqpField(item, [&table, &key](const AMQP::Field& field) { table.set(key, field); });
88+
const auto key_copy = std::string{key};
89+
WithAmqpField(item, [&table, key_copy](const AMQP::Field& field) { table.set(key_copy, field); });
8990
}
9091

9192
return table;
@@ -157,7 +158,8 @@ std::unordered_map<std::string, HeaderValue> TableToHeaders(const AMQP::Table& t
157158

158159
void AddHeadersToTable(AMQP::Table& table, const std::unordered_map<std::string, HeaderValue>& headers) {
159160
for (const auto& [key, value] : headers) {
160-
WithAmqpField(value, [&table, &key](const AMQP::Field& field) { table.set(key, field); });
161+
const auto key_copy = key;
162+
WithAmqpField(value, [&table, key_copy](const AMQP::Field& field) { table.set(key_copy, field); });
161163
}
162164
}
163165

0 commit comments

Comments
 (0)