Skip to content

Commit 78adf01

Browse files
committed
test: add RpcServer unit tests
1 parent ade4d0b commit 78adf01

1 file changed

Lines changed: 287 additions & 0 deletions

File tree

tests/rpc_server_tests.cpp

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
/**
2+
* @file rpc_server_tests.cpp
3+
* @brief Unit tests for RpcServer - Internal RPC server for node-to-node communication
4+
*/
5+
6+
#include <gtest/gtest.h>
7+
8+
#include <atomic>
9+
#include <csignal>
10+
#include <cstdint>
11+
#include <cstring>
12+
#include <memory>
13+
#include <thread>
14+
#include <vector>
15+
16+
#include "network/rpc_client.hpp"
17+
#include "network/rpc_message.hpp"
18+
#include "network/rpc_server.hpp"
19+
20+
using namespace cloudsql::network;
21+
22+
namespace {
23+
24+
// Ignore SIGPIPE to prevent crashes when writing to closed sockets
25+
struct SigpipeGuard {
26+
SigpipeGuard() { std::signal(SIGPIPE, SIG_IGN); }
27+
};
28+
SigpipeGuard g_sigpipe;
29+
30+
class RpcServerTests : public ::testing::Test {
31+
protected:
32+
void SetUp() override {
33+
// Use a unique port for each test to avoid TIME_WAIT issues
34+
port_ = TEST_PORT_BASE_ + next_port_++;
35+
server_ = std::make_unique<RpcServer>(port_);
36+
handler_called_ = false;
37+
}
38+
39+
void TearDown() override {
40+
if (server_) {
41+
server_->stop();
42+
}
43+
// Small delay to allow socket to settle
44+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
45+
}
46+
47+
static constexpr uint16_t TEST_PORT_BASE_ = 6300;
48+
static std::atomic<uint16_t> next_port_;
49+
uint16_t port_;
50+
std::unique_ptr<RpcServer> server_;
51+
std::atomic<bool> handler_called_{false};
52+
};
53+
54+
std::atomic<uint16_t> RpcServerTests::next_port_{0};
55+
56+
TEST_F(RpcServerTests, LifecycleStartStop) {
57+
ASSERT_TRUE(server_->start());
58+
server_->stop();
59+
ASSERT_TRUE(server_->start());
60+
server_->stop();
61+
}
62+
63+
TEST_F(RpcServerTests, DoubleStartReturnsFalse) {
64+
ASSERT_TRUE(server_->start());
65+
ASSERT_FALSE(server_->start());
66+
// Force cleanup before next test
67+
server_->stop();
68+
// Give socket time to release
69+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
70+
}
71+
72+
TEST_F(RpcServerTests, SetAndGetHandler) {
73+
auto handler = [](const RpcHeader&, const std::vector<uint8_t>&, int) {};
74+
server_->set_handler(RpcType::Heartbeat, handler);
75+
auto retrieved = server_->get_handler(RpcType::Heartbeat);
76+
ASSERT_NE(retrieved, nullptr);
77+
}
78+
79+
TEST_F(RpcServerTests, GetHandlerNotSet) {
80+
auto retrieved = server_->get_handler(RpcType::RegisterNode);
81+
EXPECT_EQ(retrieved, nullptr);
82+
}
83+
84+
TEST_F(RpcServerTests, HandlerOverride) {
85+
int call_count = 0;
86+
auto handler1 = [&](const RpcHeader&, const std::vector<uint8_t>&, int) { call_count++; };
87+
auto handler2 = [&](const RpcHeader&, const std::vector<uint8_t>&, int) { call_count += 10; };
88+
89+
server_->set_handler(RpcType::Heartbeat, handler1);
90+
server_->set_handler(RpcType::Heartbeat, handler2);
91+
auto retrieved = server_->get_handler(RpcType::Heartbeat);
92+
ASSERT_NE(retrieved, nullptr);
93+
}
94+
95+
TEST_F(RpcServerTests, ClearHandlersAfterStop) {
96+
auto handler = [](const RpcHeader&, const std::vector<uint8_t>&, int) {};
97+
server_->set_handler(RpcType::Heartbeat, handler);
98+
server_->start();
99+
server_->stop();
100+
auto retrieved = server_->get_handler(RpcType::Heartbeat);
101+
EXPECT_EQ(retrieved, nullptr);
102+
}
103+
104+
TEST_F(RpcServerTests, ZeroPayloadHandler) {
105+
server_->start();
106+
107+
bool called = false;
108+
server_->set_handler(
109+
RpcType::Heartbeat, [&called](const RpcHeader& h, const std::vector<uint8_t>& p, int fd) {
110+
called = true;
111+
EXPECT_EQ(p.size(), 0U);
112+
});
113+
114+
// Connect and send RPC with zero payload
115+
int fd = socket(AF_INET, SOCK_STREAM, 0);
116+
ASSERT_GE(fd, 0);
117+
118+
sockaddr_in addr{};
119+
addr.sin_family = AF_INET;
120+
addr.sin_port = htons(port_);
121+
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
122+
123+
ASSERT_EQ(connect(fd, (sockaddr*)&addr, sizeof(addr)), 0);
124+
125+
// Send header
126+
RpcHeader hdr;
127+
hdr.type = RpcType::Heartbeat;
128+
hdr.payload_len = 0;
129+
char h_buf[RpcHeader::HEADER_SIZE];
130+
hdr.encode(h_buf);
131+
send(fd, h_buf, RpcHeader::HEADER_SIZE, 0);
132+
133+
// Give time for the server to process and call the handler
134+
for (int i = 0; i < 10 && !called; ++i) {
135+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
136+
}
137+
EXPECT_TRUE(called);
138+
139+
close(fd);
140+
}
141+
142+
TEST_F(RpcServerTests, MultipleConnections) {
143+
server_->start();
144+
145+
int call_count = 0;
146+
server_->set_handler(RpcType::Heartbeat,
147+
[&call_count](const RpcHeader&, const std::vector<uint8_t>&, int) {
148+
call_count++;
149+
});
150+
151+
std::vector<int> fds;
152+
for (int i = 0; i < 5; ++i) {
153+
int fd = socket(AF_INET, SOCK_STREAM, 0);
154+
sockaddr_in addr{};
155+
addr.sin_family = AF_INET;
156+
addr.sin_port = htons(port_);
157+
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
158+
if (connect(fd, (sockaddr*)&addr, sizeof(addr)) == 0) {
159+
fds.push_back(fd);
160+
}
161+
}
162+
163+
// Send RPCs
164+
for (int fd : fds) {
165+
RpcHeader hdr;
166+
hdr.type = RpcType::Heartbeat;
167+
hdr.payload_len = 0;
168+
char h_buf[RpcHeader::HEADER_SIZE];
169+
hdr.encode(h_buf);
170+
send(fd, h_buf, RpcHeader::HEADER_SIZE, 0);
171+
}
172+
173+
// Give time for the server to process all 5
174+
for (int i = 0; i < 20 && call_count < 5; ++i) {
175+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
176+
}
177+
178+
for (int fd : fds) {
179+
close(fd);
180+
}
181+
182+
EXPECT_EQ(call_count, 5);
183+
}
184+
185+
TEST_F(RpcServerTests, ClientDisconnectMidHeader) {
186+
server_->start();
187+
188+
server_->set_handler(RpcType::Heartbeat,
189+
[](const RpcHeader&, const std::vector<uint8_t>&, int) {});
190+
191+
int fd = socket(AF_INET, SOCK_STREAM, 0);
192+
sockaddr_in addr{};
193+
addr.sin_family = AF_INET;
194+
addr.sin_port = htons(port_);
195+
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
196+
connect(fd, (sockaddr*)&addr, sizeof(addr));
197+
198+
// Send partial header then disconnect
199+
char partial[6];
200+
std::memset(partial, 0, 6);
201+
send(fd, partial, 6, 0);
202+
203+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
204+
close(fd);
205+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
206+
}
207+
208+
TEST_F(RpcServerTests, ClientDisconnectMidPayload) {
209+
server_->start();
210+
211+
server_->set_handler(RpcType::Heartbeat,
212+
[](const RpcHeader&, const std::vector<uint8_t>&, int) {});
213+
214+
int fd = socket(AF_INET, SOCK_STREAM, 0);
215+
sockaddr_in addr{};
216+
addr.sin_family = AF_INET;
217+
addr.sin_port = htons(port_);
218+
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
219+
connect(fd, (sockaddr*)&addr, sizeof(addr));
220+
221+
// Send full header indicating payload but don't send payload
222+
RpcHeader hdr;
223+
hdr.type = RpcType::Heartbeat;
224+
hdr.payload_len = 100; // Request 100 bytes but we won't send them
225+
char h_buf[RpcHeader::HEADER_SIZE];
226+
hdr.encode(h_buf);
227+
send(fd, h_buf, RpcHeader::HEADER_SIZE, 0);
228+
229+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
230+
close(fd);
231+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
232+
}
233+
234+
TEST_F(RpcServerTests, FullRoundTripWithClient) {
235+
server_->start();
236+
237+
server_->set_handler(RpcType::QueryResults,
238+
[](const RpcHeader& h, const std::vector<uint8_t>& p, int fd) {
239+
// Echo back the payload
240+
RpcHeader resp_h;
241+
resp_h.type = RpcType::QueryResults;
242+
resp_h.payload_len = static_cast<uint16_t>(p.size());
243+
char h_buf[RpcHeader::HEADER_SIZE];
244+
resp_h.encode(h_buf);
245+
send(fd, h_buf, RpcHeader::HEADER_SIZE, 0);
246+
if (!p.empty()) {
247+
send(fd, p.data(), p.size(), 0);
248+
}
249+
});
250+
251+
RpcClient client("127.0.0.1", port_);
252+
ASSERT_TRUE(client.connect());
253+
254+
std::vector<uint8_t> payload = {1, 2, 3, 4, 5};
255+
std::vector<uint8_t> response;
256+
ASSERT_TRUE(client.call(RpcType::QueryResults, payload, response, 0));
257+
258+
EXPECT_EQ(response.size(), 5U);
259+
EXPECT_EQ(response[0], 1);
260+
EXPECT_EQ(response[4], 5);
261+
}
262+
263+
TEST_F(RpcServerTests, NoHandlerRegistered) {
264+
server_->start();
265+
// Don't set any handler
266+
267+
int fd = socket(AF_INET, SOCK_STREAM, 0);
268+
sockaddr_in addr{};
269+
addr.sin_family = AF_INET;
270+
addr.sin_port = htons(port_);
271+
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
272+
connect(fd, (sockaddr*)&addr, sizeof(addr));
273+
274+
// Send an RPC with no handler registered
275+
RpcHeader hdr;
276+
hdr.type = RpcType::Error;
277+
hdr.payload_len = 0;
278+
char h_buf[RpcHeader::HEADER_SIZE];
279+
hdr.encode(h_buf);
280+
send(fd, h_buf, RpcHeader::HEADER_SIZE, 0);
281+
282+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
283+
close(fd);
284+
server_->stop();
285+
}
286+
287+
} // namespace

0 commit comments

Comments
 (0)