2121#pragma once
2222#ifndef CPPNET_CONTEXT_THREAD_IMPL_HPP
2323#define CPPNET_CONTEXT_THREAD_IMPL_HPP
24- #include " net/detail/with_lock.hpp"
2524#include " net/service/context_thread.hpp"
2625
2726#include < stdexec/execution.hpp>
@@ -42,10 +41,7 @@ auto context_thread<Service>::isr(async_scope &scope,
4241 static auto msg = socket_message{.buffers = buffer};
4342
4443 if (!handle ())
45- {
46- scope.request_stop ();
4744 return ;
48- }
4945
5046 auto recvmsg =
5147 io::recvmsg (socket, msg, 0 ) |
@@ -57,55 +53,67 @@ auto context_thread<Service>::isr(async_scope &scope,
5753template <ServiceLike Service>
5854auto context_thread<Service>::stop() noexcept -> void
5955{
60- auto socket = interrupt .sockets [1 ];
61- interrupt .sockets [1 ] = interrupt .INVALID_SOCKET ;
62- if (socket != interrupt .INVALID_SOCKET )
56+ auto socket = timers .sockets [1 ];
57+ timers .sockets [1 ] = timers .INVALID_SOCKET ;
58+ if (socket != timers .INVALID_SOCKET )
6359 io::socket::close (socket);
6460 state = STOPPED;
6561}
6662
6763template <ServiceLike Service>
6864template <typename ... Args>
69- auto context_thread<Service>::start(std::mutex &mtx,
70- std::condition_variable &cvar,
71- Args &&...args) -> void
65+ auto context_thread<Service>::start(Args &&...args) -> void
7266{
73- auto lock = std::lock_guard{mtx };
67+ auto lock = std::lock_guard{mtx_ };
7468 if (started_)
7569 throw std::invalid_argument (" context_thread can't be started twice." );
7670
7771 server_ = std::thread ([&]() noexcept {
7872 using namespace detail ;
7973 using namespace io ::socket;
74+ using namespace std ::chrono;
8075
8176 auto service = Service{std::forward<Args>(args)...};
82- auto &sockets = interrupt .sockets ;
77+ auto &sockets = timers .sockets ;
8378 if (!socketpair (AF_UNIX, SOCK_STREAM, 0 , sockets.data ()))
8479 {
80+ const auto token = scope.get_stop_token ();
81+
8582 isr (scope, poller.emplace (sockets[0 ]), [&]() noexcept {
8683 auto sigmask_ = sigmask.exchange (0 );
8784 for (int signum = 0 ; auto mask = (sigmask_ >> signum); ++signum)
8885 {
8986 if (mask & (1 << 0 ))
9087 service.signal_handler (signum);
9188 }
92- return !(sigmask_ & (1 << terminate));
93- });
9489
95- state = STARTED;
96- cvar.notify_all ();
90+ if (sigmask_ & (1 << terminate))
91+ {
92+ scope.request_stop ();
93+ timers.add (
94+ seconds (1 ),
95+ [&](timers::timer_id) { service.signal_handler (terminate); },
96+ seconds (1 ));
97+ }
98+
99+ return !token.stop_requested ();
100+ });
97101
98102 service.start (static_cast <async_context &>(*this ));
103+ state = STARTED;
99104
100- const auto token = scope.get_stop_token ();
101105 if (token.stop_requested ())
106+ {
107+ state = STOPPED;
102108 signal (terminate);
109+ }
103110
111+ state.notify_all ();
104112 run (service, token);
105113 }
106114
107- with_lock (std::unique_lock{mtx}, [&]() noexcept { stop (); } );
108- cvar .notify_all ();
115+ stop ();
116+ state .notify_all ();
109117 });
110118
111119 started_ = true ;
@@ -126,27 +134,21 @@ auto context_thread<Service>::run(Service &service,
126134 const StopToken &token) -> void
127135{
128136 using namespace stdexec ;
129- using std::chrono::duration_cast ;
137+ using namespace std ::chrono;
130138
131- int next = 0 ;
132- auto start = clock::now ();
133- auto is_empty = false ;
134- scope.spawn (poller.on_empty () | then ([&]() noexcept { is_empty = true ; }));
139+ auto next = timers.resolve ();
140+ int wait_ms =
141+ (next.count () < 0 ) ? next.count () : duration_cast<duration>(next).count ();
135142
136- while (poller. wait_for (next) || !is_empty)
137- {
138- const auto now = clock::now ( );
143+ auto is_empty = std::atomic_flag ();
144+ scope. spawn (poller. on_empty () |
145+ then ([&]() noexcept { is_empty. test_and_set (); }) );
139146
140- next -= duration_cast<duration>(now - start).count ();
141- if (next <= 0 )
142- {
143- if (token.stop_requested ())
144- service.signal_handler (terminate);
145-
146- next = INTERVAL_MS;
147- }
148-
149- start = now;
147+ while (poller.wait_for (wait_ms) || !is_empty.test ())
148+ {
149+ next = timers.resolve ();
150+ wait_ms = (next.count () < 0 ) ? next.count ()
151+ : duration_cast<duration>(next).count ();
150152 }
151153}
152154} // namespace net::service
0 commit comments