Skip to content

Commit f23a696

Browse files
committed
Fix thread safety issues in C NIF codebase
- Use pthread_once for async callback initialization to prevent race condition on g_async_callback_initialized flag - Fix mutex held during Python calls in async_event_loop_thread by collecting completed futures under mutex, releasing it, then processing callbacks outside the mutex - Add read_with_timeout() and read_length_prefixed_data() helpers to py_nif.h with select()-based timeout support - Update all blocking pipe reads to use timeout (30s for callbacks, 10s for handler spawn) to prevent indefinite hangs
1 parent 070c907 commit f23a696

3 files changed

Lines changed: 201 additions & 80 deletions

File tree

c_src/py_callback.c

Lines changed: 97 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -625,39 +625,36 @@ static PyObject *erlang_call_impl(PyObject *self, PyObject *args) {
625625
func_term,
626626
args_term);
627627

628-
uint32_t response_len = 0;
629-
ssize_t n;
630628
char *response_data = NULL;
629+
uint32_t response_len = 0;
630+
int read_result;
631631

632632
Py_BEGIN_ALLOW_THREADS
633633
enif_send(NULL, &tl_current_worker->callback_handler, msg_env, msg);
634634
enif_free_env(msg_env);
635-
n = read(tl_current_worker->callback_pipe[0], &response_len, sizeof(response_len));
635+
/* Use 30 second timeout to prevent indefinite blocking */
636+
read_result = read_length_prefixed_data(
637+
tl_current_worker->callback_pipe[0],
638+
&response_data, &response_len, 30000);
636639
Py_END_ALLOW_THREADS
637640

638-
if (n != sizeof(response_len)) {
639-
PyErr_SetString(PyExc_RuntimeError, "Failed to read callback response length");
641+
if (read_result == -1) {
642+
if (errno == ETIMEDOUT) {
643+
PyErr_SetString(PyExc_TimeoutError, "Callback response timed out");
644+
} else {
645+
PyErr_SetString(PyExc_RuntimeError, "Failed to read callback response");
646+
}
640647
return NULL;
641648
}
642-
643-
response_data = enif_alloc(response_len);
644-
if (response_data == NULL) {
649+
if (read_result == -2) {
645650
PyErr_SetString(PyExc_MemoryError, "Failed to allocate response buffer");
646651
return NULL;
647652
}
648653

649-
Py_BEGIN_ALLOW_THREADS
650-
n = read(tl_current_worker->callback_pipe[0], response_data, response_len);
651-
Py_END_ALLOW_THREADS
652-
653-
if (n != (ssize_t)response_len) {
654+
PyObject *result = parse_callback_response((unsigned char *)response_data, response_len);
655+
if (response_data != NULL) {
654656
enif_free(response_data);
655-
PyErr_SetString(PyExc_RuntimeError, "Failed to read callback response data");
656-
return NULL;
657657
}
658-
659-
PyObject *result = parse_callback_response((unsigned char *)response_data, response_len);
660-
enif_free(response_data);
661658
return result;
662659
}
663660

@@ -723,19 +720,19 @@ extern bool g_has_thread_coordinator;
723720
static int g_async_callback_pipe[2] = {-1, -1}; /* [0]=read, [1]=write */
724721
static PyObject *g_async_pending_futures = NULL; /* Dict: callback_id -> Future */
725722
static pthread_mutex_t g_async_futures_mutex = PTHREAD_MUTEX_INITIALIZER;
726-
static bool g_async_callback_initialized = false;
723+
724+
/* Thread-safe initialization using pthread_once */
725+
static pthread_once_t g_async_callback_init_once = PTHREAD_ONCE_INIT;
726+
static int g_async_callback_init_result = 0;
727727

728728
/**
729-
* Initialize async callback system.
730-
* Creates the response pipe and pending futures dict.
729+
* Internal initialization function called by pthread_once.
730+
* Thread-safe: only called once by pthread_once.
731731
*/
732-
static int async_callback_init(void) {
733-
if (g_async_callback_initialized) {
734-
return 0;
735-
}
736-
732+
static void async_callback_init_impl(void) {
737733
if (pipe(g_async_callback_pipe) < 0) {
738-
return -1;
734+
g_async_callback_init_result = -1;
735+
return;
739736
}
740737

741738
/* Set the read end to non-blocking for asyncio compatibility */
@@ -750,11 +747,21 @@ static int async_callback_init(void) {
750747
close(g_async_callback_pipe[1]);
751748
g_async_callback_pipe[0] = -1;
752749
g_async_callback_pipe[1] = -1;
753-
return -1;
750+
g_async_callback_init_result = -1;
751+
return;
754752
}
755753

756-
g_async_callback_initialized = true;
757-
return 0;
754+
g_async_callback_init_result = 0;
755+
}
756+
757+
/**
758+
* Initialize async callback system.
759+
* Creates the response pipe and pending futures dict.
760+
* Thread-safe: uses pthread_once for initialization.
761+
*/
762+
static int async_callback_init(void) {
763+
pthread_once(&g_async_callback_init_once, async_callback_init_impl);
764+
return g_async_callback_init_result;
758765
}
759766

760767
/**
@@ -893,11 +900,10 @@ static PyObject *get_async_callback_fd(PyObject *self, PyObject *args) {
893900
(void)self;
894901
(void)args;
895902

896-
if (!g_async_callback_initialized) {
897-
if (async_callback_init() < 0) {
898-
PyErr_SetString(PyExc_RuntimeError, "Failed to initialize async callback system");
899-
return NULL;
900-
}
903+
/* async_callback_init uses pthread_once, so it's safe to call multiple times */
904+
if (async_callback_init() < 0) {
905+
PyErr_SetString(PyExc_RuntimeError, "Failed to initialize async callback system");
906+
return NULL;
901907
}
902908

903909
return PyLong_FromLong(g_async_callback_pipe[0]);
@@ -1328,40 +1334,75 @@ static void *async_event_loop_thread(void *arg) {
13281334
PyErr_Clear();
13291335
}
13301336

1331-
/* Check for completed futures (GIL held) */
1337+
/*
1338+
* Check for completed futures (GIL held).
1339+
*
1340+
* IMPORTANT: We must not hold the mutex while calling Python functions
1341+
* to avoid deadlocks. The pattern is:
1342+
* 1. Lock mutex, collect completed items, unlock
1343+
* 2. Process callbacks outside mutex (no contention)
1344+
* 3. Lock mutex, remove processed items, unlock
1345+
*/
1346+
1347+
/* Phase 1: Collect completed futures under mutex */
1348+
#define MAX_COMPLETED_BATCH 16
1349+
async_pending_t *completed[MAX_COMPLETED_BATCH];
1350+
int num_completed = 0;
1351+
13321352
pthread_mutex_lock(&worker->queue_mutex);
1333-
async_pending_t *prev = NULL;
13341353
async_pending_t *p = worker->pending_head;
1335-
while (p != NULL) {
1354+
while (p != NULL && num_completed < MAX_COMPLETED_BATCH) {
13361355
if (p->future != NULL) {
1356+
/* Quick check if future is done (still needs GIL, but mutex held briefly) */
13371357
PyObject *done = PyObject_CallMethod(p->future, "done", NULL);
13381358
if (done != NULL && PyObject_IsTrue(done)) {
13391359
Py_DECREF(done);
1360+
completed[num_completed++] = p;
1361+
} else {
1362+
Py_XDECREF(done);
1363+
}
1364+
}
1365+
p = p->next;
1366+
}
1367+
pthread_mutex_unlock(&worker->queue_mutex);
13401368

1341-
/* Future is complete - process it */
1342-
async_future_callback(worker, p);
1369+
/* Phase 2: Process completed callbacks outside mutex (no deadlock risk) */
1370+
for (int i = 0; i < num_completed; i++) {
1371+
async_future_callback(worker, completed[i]);
1372+
}
13431373

1344-
/* Remove from list */
1345-
Py_DECREF(p->future);
1346-
if (prev == NULL) {
1347-
worker->pending_head = p->next;
1348-
} else {
1349-
prev->next = p->next;
1350-
}
1351-
if (p == worker->pending_tail) {
1352-
worker->pending_tail = prev;
1374+
/* Phase 3: Remove processed items under mutex */
1375+
if (num_completed > 0) {
1376+
pthread_mutex_lock(&worker->queue_mutex);
1377+
for (int i = 0; i < num_completed; i++) {
1378+
async_pending_t *to_remove = completed[i];
1379+
1380+
/* Find and remove from list */
1381+
async_pending_t *prev = NULL;
1382+
p = worker->pending_head;
1383+
while (p != NULL) {
1384+
if (p == to_remove) {
1385+
/* Remove from list */
1386+
if (prev == NULL) {
1387+
worker->pending_head = p->next;
1388+
} else {
1389+
prev->next = p->next;
1390+
}
1391+
if (p == worker->pending_tail) {
1392+
worker->pending_tail = prev;
1393+
}
1394+
break;
13531395
}
1354-
async_pending_t *to_free = p;
1396+
prev = p;
13551397
p = p->next;
1356-
enif_free(to_free);
1357-
continue;
13581398
}
1359-
Py_XDECREF(done);
1399+
1400+
/* Clean up */
1401+
Py_DECREF(to_remove->future);
1402+
enif_free(to_remove);
13601403
}
1361-
prev = p;
1362-
p = p->next;
1404+
pthread_mutex_unlock(&worker->queue_mutex);
13631405
}
1364-
pthread_mutex_unlock(&worker->queue_mutex);
13651406
}
13661407

13671408
/* Stop and close the event loop */

c_src/py_nif.h

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@
9898
/** @brief Flag indicating dlopen with RTLD_GLOBAL is needed for Python extensions */
9999
#define NEED_DLOPEN_GLOBAL 1
100100
#endif
101+
102+
#include <sys/select.h>
101103
/** @} */
102104

103105
/* ============================================================================
@@ -871,6 +873,86 @@ static char *binary_to_string(const ErlNifBinary *bin) {
871873
return str;
872874
}
873875

876+
/**
877+
* @brief Read from a file descriptor with optional timeout
878+
*
879+
* Uses select() to implement a timeout on blocking read operations.
880+
* This prevents indefinite blocking on pipe reads.
881+
*
882+
* @param fd File descriptor to read from
883+
* @param buf Buffer to read into
884+
* @param count Number of bytes to read
885+
* @param timeout_ms Timeout in milliseconds (0 = no timeout, wait indefinitely)
886+
*
887+
* @return Number of bytes read on success, 0 on timeout, -1 on error
888+
*
889+
* @note On timeout, errno is set to ETIMEDOUT
890+
*/
891+
static ssize_t read_with_timeout(int fd, void *buf, size_t count, int timeout_ms) {
892+
if (timeout_ms > 0) {
893+
struct timeval tv;
894+
tv.tv_sec = timeout_ms / 1000;
895+
tv.tv_usec = (timeout_ms % 1000) * 1000;
896+
897+
fd_set fds;
898+
FD_ZERO(&fds);
899+
FD_SET(fd, &fds);
900+
901+
int ret = select(fd + 1, &fds, NULL, NULL, &tv);
902+
if (ret < 0) {
903+
return -1; /* select error */
904+
}
905+
if (ret == 0) {
906+
errno = ETIMEDOUT;
907+
return 0; /* timeout */
908+
}
909+
}
910+
911+
return read(fd, buf, count);
912+
}
913+
914+
/**
915+
* @brief Read length-prefixed data from a file descriptor
916+
*
917+
* Reads a 4-byte length prefix followed by the data payload.
918+
* Uses read_with_timeout for optional timeout support.
919+
*
920+
* @param fd File descriptor to read from
921+
* @param data_out Pointer to store allocated data buffer (caller must free with enif_free)
922+
* @param len_out Pointer to store data length
923+
* @param timeout_ms Timeout in milliseconds (0 = no timeout)
924+
*
925+
* @return 0 on success, -1 on read error/timeout, -2 on allocation failure
926+
*
927+
* @note On success with len > 0, caller must call enif_free(*data_out)
928+
*/
929+
static int read_length_prefixed_data(int fd, char **data_out, uint32_t *len_out, int timeout_ms) {
930+
uint32_t len;
931+
ssize_t n = read_with_timeout(fd, &len, sizeof(len), timeout_ms);
932+
if (n != sizeof(len)) {
933+
return -1;
934+
}
935+
936+
*data_out = NULL;
937+
*len_out = len;
938+
939+
if (len > 0) {
940+
*data_out = enif_alloc(len);
941+
if (*data_out == NULL) {
942+
return -2;
943+
}
944+
945+
n = read_with_timeout(fd, *data_out, len, timeout_ms);
946+
if (n != (ssize_t)len) {
947+
enif_free(*data_out);
948+
*data_out = NULL;
949+
return -1;
950+
}
951+
}
952+
953+
return 0;
954+
}
955+
874956
/** @} */
875957

876958
/* ============================================================================

c_src/py_thread_worker.c

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -332,9 +332,14 @@ static int thread_worker_spawn_handler(thread_worker_t *tw) {
332332
}
333333
enif_free_env(msg_env);
334334

335-
/* Read handler PID from pipe (Erlang writes it after spawning) */
335+
/* Read handler PID from pipe (Erlang writes it after spawning)
336+
* Use 10 second timeout for handler spawn */
336337
uint32_t response_len = 0;
337-
ssize_t n = read(tw->response_pipe[0], &response_len, sizeof(response_len));
338+
ssize_t n = read_with_timeout(tw->response_pipe[0], &response_len, sizeof(response_len), 10000);
339+
if (n <= 0) {
340+
/* Timeout or error - handler spawn failed */
341+
return -1;
342+
}
338343
if (n != sizeof(response_len) || response_len == 0) {
339344
/* Handler spawned successfully - pid info is in the length as a signal */
340345
tw->has_handler = true;
@@ -420,9 +425,9 @@ static PyObject *thread_worker_call(const char *func_name, size_t func_name_len,
420425
func_term,
421426
args_term);
422427

423-
ssize_t n;
424-
uint32_t response_len = 0;
425428
char *response_data = NULL;
429+
uint32_t response_len = 0;
430+
int read_result;
426431

427432
/* Send message to coordinator (can be done with GIL held) */
428433
if (!enif_send(NULL, &g_thread_coordinator_pid, msg_env, msg)) {
@@ -434,36 +439,29 @@ static PyObject *thread_worker_call(const char *func_name, size_t func_name_len,
434439

435440
/* Release GIL while waiting for response */
436441
Py_BEGIN_ALLOW_THREADS
437-
438-
/* Read response from pipe */
439-
n = read(tw->response_pipe[0], &response_len, sizeof(response_len));
440-
442+
/* Use 30 second timeout to prevent indefinite blocking */
443+
read_result = read_length_prefixed_data(
444+
tw->response_pipe[0], &response_data, &response_len, 30000);
441445
Py_END_ALLOW_THREADS
442446

443-
if (n != sizeof(response_len)) {
444-
PyErr_SetString(PyExc_RuntimeError, "Failed to read callback response length");
447+
if (read_result == -1) {
448+
if (errno == ETIMEDOUT) {
449+
PyErr_SetString(PyExc_TimeoutError, "Callback response timed out");
450+
} else {
451+
PyErr_SetString(PyExc_RuntimeError, "Failed to read callback response");
452+
}
445453
return NULL;
446454
}
447-
448-
response_data = enif_alloc(response_len);
449-
if (response_data == NULL) {
455+
if (read_result == -2) {
450456
PyErr_SetString(PyExc_MemoryError, "Failed to allocate response buffer");
451457
return NULL;
452458
}
453459

454-
Py_BEGIN_ALLOW_THREADS
455-
n = read(tw->response_pipe[0], response_data, response_len);
456-
Py_END_ALLOW_THREADS
457-
458-
if (n != (ssize_t)response_len) {
459-
enif_free(response_data);
460-
PyErr_SetString(PyExc_RuntimeError, "Failed to read callback response data");
461-
return NULL;
462-
}
463-
464460
/* Parse response using existing function */
465461
PyObject *result = parse_callback_response((unsigned char *)response_data, response_len);
466-
enif_free(response_data);
462+
if (response_data != NULL) {
463+
enif_free(response_data);
464+
}
467465

468466
return result;
469467
}

0 commit comments

Comments
 (0)