Skip to content

Commit c8c9738

Browse files
authored
Merge pull request #41 from benoitc/fix/channel-waiter-race
Fix channel waiter race condition
2 parents 1462fac + 09c1a55 commit c8c9738

1 file changed

Lines changed: 24 additions & 48 deletions

File tree

c_src/py_channel.c

Lines changed: 24 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -145,16 +145,20 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)
145145
bool should_resume = (channel->waiting != NULL);
146146

147147
/* Check if there's an async waiter to dispatch.
148-
* We only clear the waiter state after successful dispatch to avoid
149-
* lost wakeups if the event queue is full. */
148+
* IMPORTANT: Clear waiter state BEFORE releasing mutex to avoid race condition.
149+
* With task_ready notification, the callback can fire before we re-acquire the mutex.
150+
* If dispatch fails (rare), data is still in channel for next receive. */
150151
erlang_event_loop_t *loop_to_wake = NULL;
151152
uint64_t callback_id = 0;
152153
bool has_async_waiter = channel->has_waiter;
153154

154155
if (has_async_waiter) {
155156
loop_to_wake = channel->waiter_loop;
156157
callback_id = channel->waiter_callback_id;
157-
/* Don't clear yet - will clear after successful dispatch */
158+
/* Clear waiter state now to avoid race with fast callback */
159+
channel->has_waiter = false;
160+
channel->waiter_loop = NULL;
161+
channel->waiter_callback_id = 0;
158162
}
159163

160164
/* Check if there's a sync waiter to notify */
@@ -163,7 +167,8 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)
163167

164168
if (has_sync_waiter) {
165169
sync_waiter = channel->sync_waiter_pid;
166-
/* Don't clear yet - will clear after successful send */
170+
/* Clear waiter state now to avoid race */
171+
channel->has_sync_waiter = false;
167172
}
168173

169174
pthread_mutex_unlock(&channel->mutex);
@@ -175,19 +180,9 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)
175180

176181
/* Dispatch async waiter via timer dispatch (same path as timers) */
177182
if (loop_to_wake != NULL) {
178-
bool dispatched = event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1);
179-
if (dispatched) {
180-
/* Successfully dispatched - now clear the waiter state */
181-
pthread_mutex_lock(&channel->mutex);
182-
if (channel->has_waiter && channel->waiter_callback_id == callback_id) {
183-
channel->has_waiter = false;
184-
channel->waiter_loop = NULL;
185-
}
186-
pthread_mutex_unlock(&channel->mutex);
187-
/* Release the reference we kept in channel_wait */
188-
enif_release_resource(loop_to_wake);
189-
}
190-
/* If dispatch failed, waiter remains registered for next send */
183+
event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1);
184+
/* Release the reference we kept in channel_wait */
185+
enif_release_resource(loop_to_wake);
191186
}
192187

193188
/* Notify sync waiter via Erlang message */
@@ -197,14 +192,7 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)
197192
enif_send(NULL, &sync_waiter, msg_env,
198193
enif_make_atom(msg_env, "channel_data_ready"));
199194
enif_free_env(msg_env);
200-
/* Successfully notified - clear the waiter state */
201-
pthread_mutex_lock(&channel->mutex);
202-
if (channel->has_sync_waiter) {
203-
channel->has_sync_waiter = false;
204-
}
205-
pthread_mutex_unlock(&channel->mutex);
206195
}
207-
/* If alloc failed, waiter remains registered for next send */
208196
}
209197

210198
return 0;
@@ -241,16 +229,20 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {
241229
bool should_resume = (channel->waiting != NULL);
242230

243231
/* Check if there's an async waiter to dispatch.
244-
* We only clear the waiter state after successful dispatch to avoid
245-
* lost wakeups if the event queue is full. */
232+
* IMPORTANT: Clear waiter state BEFORE releasing mutex to avoid race condition.
233+
* With task_ready notification, the callback can fire before we re-acquire the mutex.
234+
* If dispatch fails (rare), data is still in channel for next receive. */
246235
erlang_event_loop_t *loop_to_wake = NULL;
247236
uint64_t callback_id = 0;
248237
bool has_async_waiter = channel->has_waiter;
249238

250239
if (has_async_waiter) {
251240
loop_to_wake = channel->waiter_loop;
252241
callback_id = channel->waiter_callback_id;
253-
/* Don't clear yet - will clear after successful dispatch */
242+
/* Clear waiter state now to avoid race with fast callback */
243+
channel->has_waiter = false;
244+
channel->waiter_loop = NULL;
245+
channel->waiter_callback_id = 0;
254246
}
255247

256248
/* Check if there's a sync waiter to notify */
@@ -259,7 +251,8 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {
259251

260252
if (has_sync_waiter) {
261253
sync_waiter = channel->sync_waiter_pid;
262-
/* Don't clear yet - will clear after successful send */
254+
/* Clear waiter state now to avoid race */
255+
channel->has_sync_waiter = false;
263256
}
264257

265258
pthread_mutex_unlock(&channel->mutex);
@@ -270,19 +263,9 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {
270263

271264
/* Dispatch async waiter via timer dispatch (same path as timers) */
272265
if (loop_to_wake != NULL) {
273-
bool dispatched = event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1);
274-
if (dispatched) {
275-
/* Successfully dispatched - now clear the waiter state */
276-
pthread_mutex_lock(&channel->mutex);
277-
if (channel->has_waiter && channel->waiter_callback_id == callback_id) {
278-
channel->has_waiter = false;
279-
channel->waiter_loop = NULL;
280-
}
281-
pthread_mutex_unlock(&channel->mutex);
282-
/* Release the reference we kept in channel_wait */
283-
enif_release_resource(loop_to_wake);
284-
}
285-
/* If dispatch failed, waiter remains registered for next send */
266+
event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1);
267+
/* Release the reference we kept in channel_wait */
268+
enif_release_resource(loop_to_wake);
286269
}
287270

288271
/* Notify sync waiter via Erlang message */
@@ -292,14 +275,7 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {
292275
enif_send(NULL, &sync_waiter, msg_env,
293276
enif_make_atom(msg_env, "channel_data_ready"));
294277
enif_free_env(msg_env);
295-
/* Successfully notified - clear the waiter state */
296-
pthread_mutex_lock(&channel->mutex);
297-
if (channel->has_sync_waiter) {
298-
channel->has_sync_waiter = false;
299-
}
300-
pthread_mutex_unlock(&channel->mutex);
301278
}
302-
/* If alloc failed, waiter remains registered for next send */
303279
}
304280

305281
return 0;

0 commit comments

Comments
 (0)