Skip to content

Commit f8c836a

Browse files
committed
feat: Properly support IPv6 TCP servers.
1 parent 0763e15 commit f8c836a

2 files changed

Lines changed: 154 additions & 7 deletions

File tree

include/net/service/impl/async_tcp_service_impl.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ auto async_tcp_service<TCPStreamHandler>::start(async_context &ctx) noexcept
4646
using namespace io;
4747
using namespace io::socket;
4848

49-
auto sock = socket_handle(AF_INET, SOCK_STREAM, IPPROTO_TCP);
49+
auto sock = socket_handle(address_->sin6_family, SOCK_STREAM, 0);
5050
if (auto error = initialize_(sock))
5151
{
5252
ctx.scope.request_stop();
@@ -57,7 +57,7 @@ auto async_tcp_service<TCPStreamHandler>::start(async_context &ctx) noexcept
5757
using namespace stdexec;
5858
ctx.scope.request_stop();
5959
sender auto connect =
60-
io::connect(ctx.poller.emplace(AF_INET, SOCK_STREAM, IPPROTO_TCP),
60+
io::connect(ctx.poller.emplace(address_->sin6_family, SOCK_STREAM, 0),
6161
address_) |
6262
then([](auto status) {}) | upon_error([](auto &&error) {});
6363
ctx.scope.spawn(std::move(connect));

tests/test_async_tcp_service.cpp

Lines changed: 152 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ struct echo_block_service : public async_tcp_service<echo_block_service> {
7272
}
7373
};
7474

75-
class AsyncTcpServiceTest : public ::testing::Test {};
75+
class AsyncTcpServiceV4Test : public ::testing::Test {};
7676

77-
TEST_F(AsyncTcpServiceTest, StartTest)
77+
TEST_F(AsyncTcpServiceV4Test, StartTest)
7878
{
7979
using namespace io::socket;
8080

@@ -97,7 +97,7 @@ TEST_F(AsyncTcpServiceTest, StartTest)
9797
while (ctx.poller.wait());
9898
}
9999

100-
TEST_F(AsyncTcpServiceTest, EchoTest)
100+
TEST_F(AsyncTcpServiceV4Test, EchoTest)
101101
{
102102
using namespace io::socket;
103103

@@ -149,7 +149,7 @@ TEST_F(AsyncTcpServiceTest, EchoTest)
149149
while (ctx.poller.wait());
150150
}
151151

152-
TEST_F(AsyncTcpServiceTest, InitializeError)
152+
TEST_F(AsyncTcpServiceV4Test, InitializeError)
153153
{
154154
using namespace io::socket;
155155

@@ -176,7 +176,7 @@ TEST_F(AsyncTcpServiceTest, InitializeError)
176176
while (ctx.poller.wait());
177177
}
178178

179-
TEST_F(AsyncTcpServiceTest, AsyncServiceTest)
179+
TEST_F(AsyncTcpServiceV4Test, AsyncServiceTest)
180180
{
181181
using namespace io::socket;
182182
using service_type = async_service<echo_block_service>;
@@ -218,4 +218,151 @@ TEST_F(AsyncTcpServiceTest, AsyncServiceTest)
218218
}
219219
}
220220
}
221+
222+
class AsyncTcpServiceV6Test : public ::testing::Test {};
223+
224+
TEST_F(AsyncTcpServiceV6Test, StartTestV6)
225+
{
226+
using namespace io::socket;
227+
228+
auto ctx = async_context{};
229+
auto addr = socket_address<sockaddr_in>();
230+
addr->sin_family = AF_INET6;
231+
auto service = echo_block_service{addr};
232+
233+
ctx.interrupt = [&] {
234+
auto sigmask = ctx.sigmask.exchange(0);
235+
for (int signum = 0; auto mask = (sigmask >> signum); ++signum)
236+
{
237+
if (mask & (1 << 0))
238+
service.signal_handler(signum);
239+
}
240+
};
241+
242+
service.start(ctx);
243+
ctx.signal(ctx.terminate);
244+
while (ctx.poller.wait());
245+
}
246+
247+
TEST_F(AsyncTcpServiceV6Test, EchoTest)
248+
{
249+
using namespace io::socket;
250+
251+
auto ctx = async_context();
252+
auto addr = socket_address<sockaddr_in6>();
253+
addr->sin6_family = AF_INET6;
254+
addr->sin6_port = htons(8080);
255+
auto service = echo_block_service(addr);
256+
257+
ctx.interrupt = [&] {
258+
auto sigmask = ctx.sigmask.exchange(0);
259+
for (int signum = 0; auto mask = (sigmask >> signum); ++signum)
260+
{
261+
if (mask & (1 << 0))
262+
service.signal_handler(signum);
263+
}
264+
};
265+
266+
ASSERT_FALSE(service.initialized);
267+
service.start(ctx);
268+
{
269+
ASSERT_TRUE(service.initialized);
270+
ASSERT_FALSE(ctx.scope.get_stop_token().stop_requested());
271+
272+
using namespace io;
273+
auto sock = socket_handle(AF_INET6, SOCK_STREAM, 0);
274+
addr->sin6_addr = IN6ADDR_LOOPBACK_INIT;
275+
276+
ASSERT_EQ(connect(sock, addr), 0);
277+
ctx.poller.wait();
278+
279+
auto buf = std::array<char, 1>{'x'};
280+
auto msg = socket_message{.buffers = buf};
281+
282+
const char *alphabet = "abcdefghijklmnopqrstuvwxyz";
283+
auto *end = alphabet + 26;
284+
285+
for (auto *it = alphabet; it != end; ++it)
286+
{
287+
ASSERT_EQ(sendmsg(sock, socket_message{.buffers = std::span(it, 1)}, 0),
288+
1);
289+
ctx.poller.wait();
290+
ASSERT_EQ(recvmsg(sock, msg, 0), 1);
291+
EXPECT_EQ(buf[0], *it);
292+
}
293+
}
294+
295+
ctx.signal(ctx.terminate);
296+
while (ctx.poller.wait());
297+
}
298+
299+
TEST_F(AsyncTcpServiceV6Test, InitializeError)
300+
{
301+
using namespace io::socket;
302+
303+
auto ctx = async_context();
304+
auto addr = socket_address<sockaddr_in6>();
305+
addr->sin6_family = AF_INET6;
306+
addr->sin6_port = htons(8080);
307+
auto service = echo_block_service(addr);
308+
service.initialized = true;
309+
310+
ctx.interrupt = [&] {
311+
auto sigmask = ctx.sigmask.exchange(0);
312+
for (int signum = 0; auto mask = (sigmask >> signum); ++signum)
313+
{
314+
if (mask & (1 << 0))
315+
service.signal_handler(signum);
316+
}
317+
};
318+
319+
service.start(ctx);
320+
EXPECT_TRUE(ctx.scope.get_stop_token().stop_requested());
321+
322+
ctx.signal(ctx.terminate);
323+
while (ctx.poller.wait());
324+
}
325+
326+
TEST_F(AsyncTcpServiceV6Test, AsyncServiceTest)
327+
{
328+
using namespace io::socket;
329+
using service_type = async_service<echo_block_service>;
330+
331+
auto list = std::list<service_type>{};
332+
auto &service = list.emplace_back();
333+
334+
std::mutex mtx;
335+
std::condition_variable cvar;
336+
auto addr = socket_address<sockaddr_in6>();
337+
addr->sin6_family = AF_INET6;
338+
addr->sin6_port = htons(8081);
339+
340+
service.start(mtx, cvar, addr);
341+
{
342+
auto lock = std::unique_lock{mtx};
343+
cvar.wait(lock, [&] { return service.interrupt || service.stopped; });
344+
}
345+
ASSERT_TRUE(static_cast<bool>(service.interrupt));
346+
{
347+
using namespace io;
348+
auto sock = socket_handle(AF_INET6, SOCK_STREAM, 0);
349+
addr->sin6_addr = IN6ADDR_LOOPBACK_INIT;
350+
351+
ASSERT_EQ(connect(sock, addr), 0);
352+
353+
auto buf = std::array<char, 1>{'x'};
354+
auto msg = socket_message{.buffers = buf};
355+
356+
const char *alphabet = "abcdefghijklmnopqrstuvwxyz";
357+
auto *end = alphabet + 26;
358+
359+
for (auto *it = alphabet; it != end; ++it)
360+
{
361+
ASSERT_EQ(sendmsg(sock, socket_message{.buffers = std::span(it, 1)}, 0),
362+
1);
363+
ASSERT_EQ(recvmsg(sock, msg, 0), 1);
364+
EXPECT_EQ(buf[0], *it);
365+
}
366+
}
367+
}
221368
// NOLINTEND

0 commit comments

Comments
 (0)