Skip to content

Commit 8a04765

Browse files
author
sav-da
committed
fix: fixes after review
1 parent 3b017e5 commit 8a04765

9 files changed

Lines changed: 231 additions & 133 deletions

File tree

rabbitmq/functional_tests/basic_chaos/rabbitmq_service.cpp

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
#include <userver/components/component_context.hpp>
88
#include <userver/components/minimal_server_component_list.hpp>
99
#include <userver/concurrent/variable.hpp>
10+
#include <userver/formats/json.hpp>
11+
#include <userver/formats/parse/common_containers.hpp>
1012
#include <userver/formats/serialize/common_containers.hpp>
1113
#include <userver/server/handlers/http_handler_base.hpp>
1214
#include <userver/server/handlers/tests_control.hpp>
@@ -26,8 +28,7 @@ class ChaosProducer final : public components::LoggableComponentBase {
2628

2729
ChaosProducer(const components::ComponentConfig& config, const components::ComponentContext& context)
2830
: components::LoggableComponentBase{config, context},
29-
rabbit_client_{context.FindComponent<components::RabbitMQ>("chaos-rabbit").GetClient()}
30-
{
31+
rabbit_client_{context.FindComponent<components::RabbitMQ>("chaos-rabbit").GetClient()} {
3132
const auto setup_deadline = engine::Deadline::FromDuration(kDefaultOperationTimeout);
3233

3334
auto admin_channel = rabbit_client_->GetAdminChannel(setup_deadline);
@@ -77,9 +78,7 @@ class ChaosConsumer final : public components::ComponentBase {
7778
static constexpr std::string_view kName{"chaos-consumer"};
7879

7980
ChaosConsumer(const components::ComponentConfig& config, const components::ComponentContext& context)
80-
: components::ComponentBase{config, context},
81-
consumer_{config, context, messages_}
82-
{
81+
: components::ComponentBase{config, context}, consumer_{config, context, messages_} {
8382
Start();
8483
}
8584

@@ -119,8 +118,7 @@ class ChaosConsumer final : public components::ComponentBase {
119118
)
120119
: urabbitmq::
121120
ConsumerBase{context.FindComponent<components::RabbitMQ>(config["rabbit_name"].As<std::string>()).GetClient(), ParseSettings(config)},
122-
messages_{messages}
123-
{}
121+
messages_{messages} {}
124122

125123
protected:
126124
void Process(urabbitmq::ConsumedMessage msg) override {
@@ -150,8 +148,7 @@ class ChaosHandler final : public server::handlers::HttpHandlerBase {
150148
ChaosHandler(const components::ComponentConfig& config, const components::ComponentContext& context)
151149
: server::handlers::HttpHandlerBase{config, context},
152150
producer_{context.FindComponent<ChaosProducer>()},
153-
consumer_{context.FindComponent<ChaosConsumer>()}
154-
{}
151+
consumer_{context.FindComponent<ChaosConsumer>()} {}
155152

156153
std::string HandleRequestThrow(const server::http::HttpRequest& request, server::request::RequestContext&)
157154
const override {
@@ -178,6 +175,13 @@ class ChaosHandler final : public server::handlers::HttpHandlerBase {
178175
throw server::handlers::ClientError{server::handlers::ExternalBody{"No 'message' query argument"}};
179176
}
180177
urabbitmq::Envelope envelope{message, urabbitmq::MessageType::kTransient, {}, {}, {}};
178+
if (!request.RequestBody().empty()) {
179+
const auto request_json = formats::json::FromString(request.RequestBody());
180+
if (request_json.HasMember("headers")) {
181+
envelope
182+
.headers = request_json["headers"].As<std::unordered_map<std::string, urabbitmq::HeaderValue>>();
183+
}
184+
}
181185
const auto& correlation_id = request.GetArg("correlation_id");
182186
if (!correlation_id.empty()) {
183187
envelope.correlation_id = correlation_id;
@@ -219,16 +223,17 @@ class ChaosHandler final : public server::handlers::HttpHandlerBase {
219223
}
220224

221225
std::string HandleGet() const {
222-
formats::json::ValueBuilder messages_builder;
226+
urabbitmq::HeaderValue::Builder messages_builder;
223227
for (const auto& item : consumer_.GetMessages()) {
224-
formats::json::ValueBuilder item_builder;
228+
urabbitmq::HeaderValue::Builder item_builder;
225229
item_builder["message"] = item.message;
226230
if (item.correlation_id.has_value()) {
227231
item_builder["correlation_id"] = item.correlation_id;
228232
}
229233
if (item.reply_to.has_value()) {
230234
item_builder["reply_to"] = item.reply_to;
231235
}
236+
item_builder["headers"] = item.headers;
232237
messages_builder.PushBack(std::move(item_builder));
233238
}
234239
return formats::json::ToString(messages_builder.ExtractValue());

rabbitmq/functional_tests/basic_chaos/static_config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ components_manager:
1717
min_pool_size: 1
1818
max_pool_size: 1
1919
max_in_flight_requests: 5
20+
heartbeat_interval_seconds: 1
2021
use_secure_connection: false
2122

2223
secdist: {} # Component that stores configuration of hosts and passwords

rabbitmq/functional_tests/basic_chaos/tests/test_rabbitmq.py

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ async def _clear_messages(service_client):
3737
assert response.status_code == 200
3838

3939

40+
def _strip_headers(messages):
41+
return [{key: value for key, value in message.items() if key != 'headers'} for message in messages]
42+
43+
4044
async def _publish_and_consume(testpoint, client):
4145
@testpoint('message_consumed')
4246
def message_consumed(data):
@@ -51,7 +55,7 @@ def message_consumed(data):
5155
response = await client.get('/v1/messages')
5256
assert response.status_code == 200
5357

54-
assert response.json() == MESSAGES
58+
assert _strip_headers(response.json()) == MESSAGES
5559

5660

5761
async def test_rabbitmq_happy(testpoint, service_client, gate):
@@ -60,6 +64,55 @@ async def test_rabbitmq_happy(testpoint, service_client, gate):
6064
await _publish_and_consume(testpoint, service_client)
6165

6266

67+
async def test_rabbitmq_headers(testpoint, service_client, gate):
68+
await _clear_messages(service_client)
69+
70+
@testpoint('message_consumed')
71+
def message_consumed(data):
72+
pass
73+
74+
expected_headers = {
75+
'x-bool': True,
76+
'x-int': -10,
77+
'x-uint': 10,
78+
'x-double': 2.5,
79+
'x-array': [-7, 'array-value', {'enabled': False, 'nullable': None}],
80+
'x-object': {
81+
'count': 42,
82+
'name': 'nested-object',
83+
'array': [-7, 'array-value', {'enabled': False, 'nullable': None}],
84+
},
85+
'x-null': None,
86+
}
87+
88+
response = await service_client.post(
89+
'/v1/messages?message=headers&reliable=1&reply_to=reply&correlation_id=corr-id',
90+
json={'headers': expected_headers},
91+
)
92+
assert response.status_code == 200
93+
94+
await message_consumed.wait_call()
95+
96+
response = await service_client.get('/v1/messages')
97+
assert response.status_code == 200
98+
messages = response.json()
99+
assert len(messages) == 1
100+
101+
consumed = messages[0]
102+
assert consumed['message'] == 'headers'
103+
assert consumed['reply_to'] == 'reply'
104+
assert consumed['correlation_id'] == 'corr-id'
105+
assert consumed['headers']['x-bool'] is True
106+
assert consumed['headers']['x-int'] == -10
107+
assert consumed['headers']['x-uint'] == 10
108+
assert consumed['headers']['x-double'] == 2.5
109+
assert consumed['headers']['x-array'] == expected_headers['x-array']
110+
assert consumed['headers']['x-object'] == expected_headers['x-object']
111+
assert consumed['headers']['x-null'] is None
112+
assert consumed['headers']['u-trace-id']
113+
assert consumed['headers']['u-parent-span-id']
114+
115+
63116
@pytest.mark.skip(reason='std::terminate is called, fix in TAXICOMMON-6995')
64117
async def test_consumer_reconnects(testpoint, service_client, gate):
65118
await _clear_messages(service_client)
@@ -85,4 +138,35 @@ def message_consumed(data):
85138
response = await service_client.get('/v1/messages')
86139
assert response.status_code == 200
87140

88-
assert response.json() == MESSAGES
141+
assert _strip_headers(response.json()) == MESSAGES
142+
143+
144+
async def test_rabbitmq_heartbeat_reconnects(testpoint, service_client, gate):
145+
await _clear_messages(service_client)
146+
147+
@testpoint('message_consumed')
148+
def message_consumed(data):
149+
pass
150+
151+
response = await service_client.post('/v1/messages?message=before-heartbeat')
152+
assert response.status_code == 200
153+
await message_consumed.wait_call()
154+
155+
await gate.to_server_noop()
156+
await gate.to_client_noop()
157+
await asyncio.sleep(2.5)
158+
await gate.to_server_pass()
159+
await gate.to_client_pass()
160+
await gate.sockets_close()
161+
162+
await gate.wait_for_connections(timeout=10.0)
163+
await asyncio.sleep(1.0)
164+
165+
response = await service_client.post('/v1/messages?message=after-heartbeat')
166+
assert response.status_code == 200
167+
168+
await message_consumed.wait_call()
169+
170+
response = await service_client.get('/v1/messages')
171+
assert response.status_code == 200
172+
assert any(message['message'] == 'after-heartbeat' for message in response.json())

rabbitmq/src/tests/header_value_rmqtest.cpp

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,28 @@ USERVER_NAMESPACE_BEGIN
1616
namespace {
1717

1818
template <typename T>
19-
formats::json::Value MakeHeaderValue(T&& value) {
20-
return formats::json::ValueBuilder{std::forward<T>(value)}.ExtractValue();
19+
urabbitmq::HeaderValue MakeHeaderValue(T&& value) {
20+
return urabbitmq::HeaderValue::Builder{std::forward<T>(value)}.ExtractValue();
2121
}
2222

23-
formats::json::Value MakeNestedArrayValue() {
24-
formats::json::ValueBuilder builder{formats::common::Type::kArray};
23+
urabbitmq::HeaderValue MakeNestedArrayValue() {
24+
urabbitmq::HeaderValue::Builder builder{formats::common::Type::kArray};
2525
builder.PushBack(std::int64_t{-7});
2626
builder.PushBack("array-value");
2727

28-
formats::json::ValueBuilder nested_object{formats::common::Type::kObject};
28+
urabbitmq::HeaderValue::Builder nested_object{formats::common::Type::kObject};
2929
nested_object["enabled"] = false;
30-
nested_object["nullable"] = formats::json::ValueBuilder{};
30+
nested_object["nullable"] = urabbitmq::HeaderValue::Builder{};
3131
builder.PushBack(std::move(nested_object));
3232

3333
return builder.ExtractValue();
3434
}
3535

36-
formats::json::Value MakeNestedObjectValue() {
37-
formats::json::ValueBuilder builder{formats::common::Type::kObject};
36+
urabbitmq::HeaderValue MakeNestedObjectValue() {
37+
urabbitmq::HeaderValue::Builder builder{formats::common::Type::kObject};
3838
builder["count"] = std::uint64_t{42};
3939
builder["name"] = "nested-object";
40-
builder["array"] = formats::json::ValueBuilder{MakeNestedArrayValue()};
40+
builder["array"] = urabbitmq::HeaderValue::Builder{MakeNestedArrayValue()};
4141

4242
return builder.ExtractValue();
4343
}
@@ -83,33 +83,43 @@ UTEST(HeaderValue, ConvertsNestedAmqpTypes) {
8383
const std::unordered_map<std::string, urabbitmq::HeaderValue> expected{
8484
{"string", MakeHeaderValue("value")},
8585
{"bool", MakeHeaderValue(true)},
86-
{"signed", MakeHeaderValue(std::int64_t{-10})},
87-
{"unsigned", MakeHeaderValue(std::uint64_t{10})},
86+
{"signed", MakeHeaderValue(-10)},
87+
{"unsigned", MakeHeaderValue(10u)},
8888
{"double", MakeHeaderValue(1.5)},
89-
{"null", formats::json::ValueBuilder{}.ExtractValue()},
89+
{"null", urabbitmq::HeaderValue::Builder{}.ExtractValue()},
9090
{"array", MakeNestedArrayValue()},
9191
{"object", MakeNestedObjectValue()},
9292
};
9393

94-
ExpectHeadersEqual(urabbitmq::impl::TableToHeaders(headers), expected);
94+
const auto actual = urabbitmq::impl::TableToHeaders(headers);
95+
ExpectHeadersEqual(actual, expected);
96+
EXPECT_TRUE(actual.at("signed").IsInt());
97+
EXPECT_TRUE(actual.at("unsigned").IsUInt());
9598
}
9699

97100
UTEST(HeaderValue, RoundTripsHeaders) {
98101
const std::unordered_map<std::string, urabbitmq::HeaderValue> expected{
99102
{"string", MakeHeaderValue("value")},
100103
{"bool", MakeHeaderValue(false)},
101-
{"signed", MakeHeaderValue(std::int64_t{-123456789})},
102-
{"unsigned", MakeHeaderValue(std::uint64_t{123456789})},
104+
{"signed", MakeHeaderValue(-123456789)},
105+
{"signed64", MakeHeaderValue(std::int64_t{-1234567890123})},
106+
{"unsigned", MakeHeaderValue(123456789u)},
107+
{"unsigned64", MakeHeaderValue(std::uint64_t{1234567890123})},
103108
{"double", MakeHeaderValue(3.25)},
104-
{"null", formats::json::ValueBuilder{}.ExtractValue()},
109+
{"null", urabbitmq::HeaderValue::Builder{}.ExtractValue()},
105110
{"array", MakeNestedArrayValue()},
106111
{"object", MakeNestedObjectValue()},
107112
};
108113

109114
AMQP::Table table;
110115
urabbitmq::impl::AddHeadersToTable(table, expected);
111116

112-
ExpectHeadersEqual(urabbitmq::impl::TableToHeaders(table), expected);
117+
const auto actual = urabbitmq::impl::TableToHeaders(table);
118+
ExpectHeadersEqual(actual, expected);
119+
EXPECT_TRUE(actual.at("signed").IsInt());
120+
EXPECT_TRUE(actual.at("signed64").IsInt64());
121+
EXPECT_TRUE(actual.at("unsigned").IsUInt());
122+
EXPECT_TRUE(actual.at("unsigned64").IsUInt64());
113123
}
114124

115125
USERVER_NAMESPACE_END

rabbitmq/src/tests/publish_consume_rmqtest.cpp

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,28 @@ USERVER_NAMESPACE_BEGIN
1919
namespace {
2020

2121
template <typename T>
22-
formats::json::Value MakeHeaderValue(T&& value) {
23-
return formats::json::ValueBuilder{std::forward<T>(value)}.ExtractValue();
22+
urabbitmq::HeaderValue MakeHeaderValue(T&& value) {
23+
return urabbitmq::HeaderValue::Builder{std::forward<T>(value)}.ExtractValue();
2424
}
2525

26-
formats::json::Value MakeNestedArrayValue() {
27-
formats::json::ValueBuilder builder{formats::common::Type::kArray};
26+
urabbitmq::HeaderValue MakeNestedArrayValue() {
27+
urabbitmq::HeaderValue::Builder builder{formats::common::Type::kArray};
2828
builder.PushBack(std::int64_t{-7});
2929
builder.PushBack("array-value");
3030

31-
formats::json::ValueBuilder nested_object{formats::common::Type::kObject};
31+
urabbitmq::HeaderValue::Builder nested_object{formats::common::Type::kObject};
3232
nested_object["enabled"] = false;
33-
nested_object["nullable"] = formats::json::ValueBuilder{};
33+
nested_object["nullable"] = urabbitmq::HeaderValue::Builder{};
3434
builder.PushBack(std::move(nested_object));
3535

3636
return builder.ExtractValue();
3737
}
3838

39-
formats::json::Value MakeNestedObjectValue() {
40-
formats::json::ValueBuilder builder{formats::common::Type::kObject};
39+
urabbitmq::HeaderValue MakeNestedObjectValue() {
40+
urabbitmq::HeaderValue::Builder builder{formats::common::Type::kObject};
4141
builder["count"] = std::uint64_t{42};
4242
builder["name"] = "nested-object";
43-
builder["array"] = formats::json::ValueBuilder{MakeNestedArrayValue()};
43+
builder["array"] = urabbitmq::HeaderValue::Builder{MakeNestedArrayValue()};
4444

4545
return builder.ExtractValue();
4646
}
@@ -292,10 +292,12 @@ UTEST(Consumer, ConsumeMetadataAndHeadersWork) {
292292
{
293293
{"x-custom-header", MakeHeaderValue("custom-value")},
294294
{"x-bool", MakeHeaderValue(true)},
295+
{"x-int", MakeHeaderValue(-10)},
295296
{"x-int64", MakeHeaderValue(std::int64_t{-10})},
297+
{"x-uint", MakeHeaderValue(10u)},
296298
{"x-uint64", MakeHeaderValue(std::uint64_t{10})},
297299
{"x-double", MakeHeaderValue(2.5)},
298-
{"x-null", formats::json::ValueBuilder{}.ExtractValue()},
300+
{"x-null", urabbitmq::HeaderValue::Builder{}.ExtractValue()},
299301
},
300302
},
301303
{

0 commit comments

Comments
 (0)