Skip to content

Commit eb4f27a

Browse files
committed
Add ThreadPoolExecutor support for erlang.call()
Python threads spawned via concurrent.futures.ThreadPoolExecutor can now call erlang.call() without blocking. Each spawned thread lazily acquires a dedicated "thread worker" channel with its own Erlang handler process. Key components: - c_src/py_thread_worker.c: Thread worker pool with pipe-based IPC - src/py_thread_handler.erl: Coordinator and per-thread handlers - Modified erlang_call_impl() to detect spawned threads - Automatic cleanup via pthread_key_t destructor on thread exit Test suite: test/py_thread_callback_SUITE.erl (6 tests)
1 parent 79f670d commit eb4f27a

10 files changed

Lines changed: 1090 additions & 19 deletions

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,15 @@
44

55
### Added
66

7+
- **ThreadPoolExecutor Support** - Python threads spawned via `concurrent.futures.ThreadPoolExecutor`
8+
can now call `erlang.call()` without blocking
9+
- Each spawned thread lazily acquires a dedicated "thread worker" channel
10+
- One lightweight Erlang process per Python thread handles callbacks
11+
- Automatic cleanup when Python thread exits via `pthread_key_t` destructor
12+
- New module: `py_thread_handler.erl` - Coordinator and per-thread handlers
13+
- New C file: `py_thread_worker.c` - Thread worker pool management
14+
- New test suite: `test/py_thread_callback_SUITE.erl`
15+
716
- **Reentrant Callbacks** - Python→Erlang→Python callback chains without deadlocks
817
- Exception-based suspension mechanism interrupts Python execution cleanly
918
- Callbacks execute in separate processes to prevent worker pool exhaustion

c_src/py_callback.c

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -540,11 +540,42 @@ static PyObject *ErlangFunction_New(PyObject *name) {
540540
static PyObject *erlang_call_impl(PyObject *self, PyObject *args) {
541541
(void)self;
542542

543+
/*
544+
* Check if this is a call from an executor thread (normal path) or
545+
* from a spawned thread like ThreadPoolExecutor (thread worker path).
546+
*/
543547
if (tl_current_worker == NULL || !tl_current_worker->has_callback_handler) {
544-
PyErr_SetString(PyExc_RuntimeError,
545-
"erlang.call() must be called from worker thread. "
546-
"Use execute_async() for concurrent calls from other threads.");
547-
return NULL;
548+
/*
549+
* Not an executor thread - try thread worker path.
550+
* This enables ThreadPoolExecutor threads to call erlang.call().
551+
*/
552+
Py_ssize_t nargs = PyTuple_Size(args);
553+
if (nargs < 1) {
554+
PyErr_SetString(PyExc_TypeError, "erlang.call requires at least a function name");
555+
return NULL;
556+
}
557+
558+
PyObject *name_obj = PyTuple_GetItem(args, 0);
559+
if (!PyUnicode_Check(name_obj)) {
560+
PyErr_SetString(PyExc_TypeError, "Function name must be a string");
561+
return NULL;
562+
}
563+
const char *func_name = PyUnicode_AsUTF8(name_obj);
564+
if (func_name == NULL) {
565+
return NULL;
566+
}
567+
size_t func_name_len = strlen(func_name);
568+
569+
/* Build args list (remaining args) */
570+
PyObject *call_args = PyTuple_GetSlice(args, 1, nargs);
571+
if (call_args == NULL) {
572+
return NULL;
573+
}
574+
575+
/* Use thread worker call */
576+
PyObject *result = thread_worker_call(func_name, func_name_len, call_args);
577+
Py_DECREF(call_args);
578+
return result;
548579
}
549580

550581
Py_ssize_t nargs = PyTuple_Size(args);

c_src/py_nif.c

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ ERL_NIF_TERM ATOM_SUSPENDED;
113113
#include "py_convert.c"
114114
#include "py_exec.c"
115115
#include "py_callback.c"
116+
#include "py_thread_worker.c"
116117

117118
/* ============================================================================
118119
* Resource callbacks
@@ -388,6 +389,11 @@ static ERL_NIF_TERM nif_py_init(ErlNifEnv *env, int argc, const ERL_NIF_TERM arg
388389
return make_error(env, "executor_start_failed");
389390
}
390391

392+
/* Initialize thread worker system for ThreadPoolExecutor support */
393+
if (thread_worker_init() < 0) {
394+
/* Non-fatal - thread worker support just won't be available */
395+
}
396+
391397
return ATOM_OK;
392398
}
393399

@@ -399,6 +405,9 @@ static ERL_NIF_TERM nif_finalize(ErlNifEnv *env, int argc, const ERL_NIF_TERM ar
399405
return ATOM_OK;
400406
}
401407

408+
/* Clean up thread worker system */
409+
thread_worker_cleanup();
410+
402411
/* Stop executors based on mode */
403412
switch (g_execution_mode) {
404413
case PY_MODE_FREE_THREADED:
@@ -1744,7 +1753,12 @@ static ErlNifFunc nif_funcs[] = {
17441753

17451754
/* Execution mode info */
17461755
{"execution_mode", 0, nif_execution_mode, 0},
1747-
{"num_executors", 0, nif_num_executors, 0}
1756+
{"num_executors", 0, nif_num_executors, 0},
1757+
1758+
/* Thread worker support (ThreadPoolExecutor) */
1759+
{"thread_worker_set_coordinator", 1, nif_thread_worker_set_coordinator, 0},
1760+
{"thread_worker_write", 2, nif_thread_worker_write, 0},
1761+
{"thread_worker_signal_ready", 1, nif_thread_worker_signal_ready, 0}
17481762
};
17491763

17501764
ERL_NIF_INIT(py_nif, nif_funcs, load, NULL, upgrade, unload)

c_src/py_nif.h

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,4 +1151,55 @@ static void detect_execution_mode(void);
11511151

11521152
/** @} */
11531153

1154+
/* ============================================================================
1155+
* Thread Worker Functions (py_thread_worker.c)
1156+
* ============================================================================ */
1157+
1158+
/**
1159+
* @defgroup thread_worker Thread Worker Support
1160+
* @brief Functions for ThreadPoolExecutor thread support
1161+
* @{
1162+
*/
1163+
1164+
/**
1165+
* @brief Initialize the thread worker system
1166+
*
1167+
* Creates pthread key for automatic cleanup on thread exit.
1168+
* Called during NIF initialization.
1169+
*
1170+
* @return 0 on success, -1 on failure
1171+
*/
1172+
static int thread_worker_init(void);
1173+
1174+
/**
1175+
* @brief Clean up the thread worker system
1176+
*
1177+
* Releases all workers in the pool and destroys the pthread key.
1178+
* Called during NIF unload.
1179+
*/
1180+
static void thread_worker_cleanup(void);
1181+
1182+
/**
1183+
* @brief Set the thread coordinator PID
1184+
*
1185+
* @param pid PID of the coordinator process
1186+
*/
1187+
static void thread_worker_set_coordinator(ErlNifPid pid);
1188+
1189+
/**
1190+
* @brief Execute an erlang.call() from a spawned thread
1191+
*
1192+
* Called when a Python thread that is NOT an executor thread
1193+
* tries to call erlang.call().
1194+
*
1195+
* @param func_name Name of the Erlang function to call
1196+
* @param func_name_len Length of function name
1197+
* @param call_args Python tuple of arguments
1198+
* @return Python result object, or NULL with exception set
1199+
*/
1200+
static PyObject *thread_worker_call(const char *func_name, size_t func_name_len,
1201+
PyObject *call_args);
1202+
1203+
/** @} */
1204+
11541205
#endif /* PY_NIF_H */

0 commit comments

Comments
 (0)