Skip to content

Commit 4384fe1

Browse files
committed
Use flag-based callback detection instead of exception type
Previously, erlang.call() raised SuspensionRequired and the executor detected it by checking the exception type. This broke when ASGI/WSGI frameworks caught the exception before it reached the executor. Now we set a thread-local flag (tl_pending_callback) that the executor checks FIRST, before looking at exception type. The exception is still raised to abort Python execution, but we don't care if Python code catches/re-raises it - the flag tells us a callback is pending. This makes erlang.call() work reliably from any Python context, including ASGI middleware that catches all exceptions.
1 parent da42abb commit 4384fe1

4 files changed

Lines changed: 134 additions & 98 deletions

File tree

c_src/py_callback.c

Lines changed: 27 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -702,51 +702,44 @@ static PyObject *erlang_call_impl(PyObject *self, PyObject *args) {
702702
}
703703

704704
/*
705-
* Suspension is allowed - raise SuspensionRequired exception.
705+
* Flag-based suspension: set thread-local flag and raise exception.
706706
*
707-
* Unlike returning a marker tuple, raising an exception properly interrupts
708-
* Python execution even in the middle of an expression like:
709-
* erlang.call('foo', x) + 1
707+
* Unlike checking exception type (which fails if frameworks catch exceptions),
708+
* we set a thread-local flag that the C executor checks FIRST. This way:
709+
* 1. Python code can catch/re-raise the exception - we don't care
710+
* 2. The flag tells us a callback is pending
711+
* 3. Executor handles it before looking at exception type
710712
*
711-
* The executor (process_request) catches this exception and:
712-
* 1. Creates a suspended state resource
713-
* 2. Returns {suspended, CallbackId, StateRef, {Func, Args}} to Erlang
714-
* 3. The dirty scheduler is freed
715-
*
716-
* When Erlang calls resume_callback with the result:
717-
* 1. The result is stored in the suspended state
718-
* 2. A dirty NIF is scheduled to re-run the Python code
719-
* 3. On re-run, this function finds the cached result and returns it
713+
* The exception is just to abort Python execution cleanly.
720714
*/
721715
uint64_t callback_id = atomic_fetch_add(&g_callback_id_counter, 1);
722716

723-
/* Create exception args tuple: (callback_id, func_name, args) */
724-
PyObject *exc_args = PyTuple_New(3);
725-
if (exc_args == NULL) {
726-
Py_DECREF(call_args);
727-
return NULL;
728-
}
729-
730-
PyObject *callback_id_obj = PyLong_FromUnsignedLongLong(callback_id);
731-
PyObject *func_name_obj = PyUnicode_FromString(func_name);
717+
/* Set pending callback flag and store info */
718+
tl_pending_callback = true;
719+
tl_pending_callback_id = callback_id;
732720

733-
if (callback_id_obj == NULL || func_name_obj == NULL) {
734-
Py_XDECREF(callback_id_obj);
735-
Py_XDECREF(func_name_obj);
721+
/* Store function name (make a copy) */
722+
if (tl_pending_func_name != NULL) {
723+
enif_free(tl_pending_func_name);
724+
}
725+
tl_pending_func_name = enif_alloc(func_name_len + 1);
726+
if (tl_pending_func_name == NULL) {
727+
tl_pending_callback = false;
736728
Py_DECREF(call_args);
737-
Py_DECREF(exc_args);
729+
PyErr_SetString(PyExc_MemoryError, "Failed to allocate function name");
738730
return NULL;
739731
}
732+
memcpy(tl_pending_func_name, func_name, func_name_len);
733+
tl_pending_func_name[func_name_len] = '\0';
734+
tl_pending_func_name_len = func_name_len;
740735

741-
PyTuple_SET_ITEM(exc_args, 0, callback_id_obj); /* Takes ownership */
742-
PyTuple_SET_ITEM(exc_args, 1, func_name_obj); /* Takes ownership */
743-
PyTuple_SET_ITEM(exc_args, 2, call_args); /* Takes ownership */
736+
/* Store args (take ownership) */
737+
Py_XDECREF(tl_pending_args);
738+
tl_pending_args = call_args; /* Takes ownership, don't decref */
744739

745-
/* Raise the exception - Python will unwind the stack */
746-
PyErr_SetObject(SuspensionRequiredException, exc_args);
747-
Py_DECREF(exc_args); /* SetObject increfs, so we decref our reference */
748-
749-
return NULL; /* Signals exception was raised */
740+
/* Raise exception to abort Python execution */
741+
PyErr_SetString(SuspensionRequiredException, "callback pending");
742+
return NULL;
750743
}
751744

752745
/**

c_src/py_exec.c

Lines changed: 93 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -290,50 +290,65 @@ static void process_request(py_request_t *req) {
290290
if (check_timeout_error()) {
291291
PyErr_Clear();
292292
req->result = enif_make_tuple2(env, ATOM_ERROR, ATOM_TIMEOUT);
293-
} else if (is_suspension_exception()) {
293+
} else if (tl_pending_callback) {
294294
/*
295-
* Python code called erlang.call() which raised SuspensionRequired.
296-
* Create a suspended state and return {suspended, CallbackId, StateRef, {Func, Args}}
297-
* so Erlang can execute the callback and then resume.
295+
* Flag-based callback detection: check flag FIRST, not exception type.
296+
* This works even if Python code caught and re-raised the exception.
298297
*/
299-
PyObject *exc_args = get_suspension_args(); /* Clears exception */
298+
PyErr_Clear(); /* Clear whatever exception is set */
299+
300+
/* Build exc_args tuple from thread-local storage */
301+
PyObject *exc_args = PyTuple_New(3);
300302
if (exc_args == NULL) {
301-
req->result = make_error(env, "get_suspension_args_failed");
303+
tl_pending_callback = false;
304+
req->result = make_error(env, "alloc_exc_args_failed");
302305
} else {
303-
suspended_state_t *suspended = create_suspended_state(env, exc_args, req);
304-
if (suspended == NULL) {
306+
PyObject *callback_id_obj = PyLong_FromUnsignedLongLong(tl_pending_callback_id);
307+
PyObject *func_name_obj = PyUnicode_FromStringAndSize(
308+
tl_pending_func_name, tl_pending_func_name_len);
309+
310+
if (callback_id_obj == NULL || func_name_obj == NULL) {
311+
Py_XDECREF(callback_id_obj);
312+
Py_XDECREF(func_name_obj);
305313
Py_DECREF(exc_args);
306-
req->result = make_error(env, "create_suspended_state_failed");
314+
tl_pending_callback = false;
315+
req->result = make_error(env, "build_exc_args_failed");
307316
} else {
308-
/* Extract callback info from exception args */
309-
PyObject *callback_id_obj = PyTuple_GetItem(exc_args, 0);
310-
PyObject *func_name_obj = PyTuple_GetItem(exc_args, 1);
311-
PyObject *call_args_obj = PyTuple_GetItem(exc_args, 2);
317+
PyTuple_SET_ITEM(exc_args, 0, callback_id_obj);
318+
PyTuple_SET_ITEM(exc_args, 1, func_name_obj);
319+
Py_INCREF(tl_pending_args); /* Tuple takes ownership */
320+
PyTuple_SET_ITEM(exc_args, 2, tl_pending_args);
312321

313-
uint64_t callback_id = PyLong_AsUnsignedLongLong(callback_id_obj);
314-
Py_ssize_t fn_len;
315-
const char *fn = PyUnicode_AsUTF8AndSize(func_name_obj, &fn_len);
322+
suspended_state_t *suspended = create_suspended_state(env, exc_args, req);
323+
if (suspended == NULL) {
324+
Py_DECREF(exc_args);
325+
tl_pending_callback = false;
326+
req->result = make_error(env, "create_suspended_state_failed");
327+
} else {
328+
/* Create Erlang terms */
329+
ERL_NIF_TERM state_ref = enif_make_resource(env, suspended);
330+
enif_release_resource(suspended);
316331

317-
/* Create Erlang terms */
318-
ERL_NIF_TERM state_ref = enif_make_resource(env, suspended);
319-
enif_release_resource(suspended); /* Erlang now holds the reference */
332+
ERL_NIF_TERM callback_id_term = enif_make_uint64(env, tl_pending_callback_id);
320333

321-
ERL_NIF_TERM callback_id_term = enif_make_uint64(env, callback_id);
334+
ERL_NIF_TERM func_name_term;
335+
unsigned char *fn_buf = enif_make_new_binary(env, tl_pending_func_name_len, &func_name_term);
336+
memcpy(fn_buf, tl_pending_func_name, tl_pending_func_name_len);
322337

323-
ERL_NIF_TERM func_name_term;
324-
unsigned char *fn_buf = enif_make_new_binary(env, fn_len, &func_name_term);
325-
memcpy(fn_buf, fn, fn_len);
338+
ERL_NIF_TERM args_term = py_to_term(env, tl_pending_args);
326339

327-
ERL_NIF_TERM args_term = py_to_term(env, call_args_obj);
340+
Py_DECREF(exc_args);
328341

329-
Py_DECREF(exc_args);
342+
/* Clear pending state */
343+
tl_pending_callback = false;
330344

331-
/* Return {suspended, CallbackId, StateRef, {FuncName, Args}} */
332-
req->result = enif_make_tuple4(env,
333-
ATOM_SUSPENDED,
334-
callback_id_term,
335-
state_ref,
336-
enif_make_tuple2(env, func_name_term, args_term));
345+
/* Return {suspended, CallbackId, StateRef, {FuncName, Args}} */
346+
req->result = enif_make_tuple4(env,
347+
ATOM_SUSPENDED,
348+
callback_id_term,
349+
state_ref,
350+
enif_make_tuple2(env, func_name_term, args_term));
351+
}
337352
}
338353
}
339354
} else {
@@ -404,43 +419,57 @@ static void process_request(py_request_t *req) {
404419
if (check_timeout_error()) {
405420
PyErr_Clear();
406421
req->result = enif_make_tuple2(env, ATOM_ERROR, ATOM_TIMEOUT);
407-
} else if (is_suspension_exception()) {
408-
/* Handle suspension from erlang.call() in eval */
409-
PyObject *exc_args = get_suspension_args(); /* Clears exception */
422+
} else if (tl_pending_callback) {
423+
/* Flag-based callback detection for eval */
424+
PyErr_Clear();
425+
426+
PyObject *exc_args = PyTuple_New(3);
410427
if (exc_args == NULL) {
411-
req->result = make_error(env, "get_suspension_args_failed");
428+
tl_pending_callback = false;
429+
req->result = make_error(env, "alloc_exc_args_failed");
412430
} else {
413-
suspended_state_t *suspended = create_suspended_state(env, exc_args, req);
414-
if (suspended == NULL) {
415-
Py_DECREF(exc_args);
416-
req->result = make_error(env, "create_suspended_state_failed");
417-
} else {
418-
PyObject *callback_id_obj = PyTuple_GetItem(exc_args, 0);
419-
PyObject *func_name_obj = PyTuple_GetItem(exc_args, 1);
420-
PyObject *call_args_obj = PyTuple_GetItem(exc_args, 2);
421-
422-
uint64_t callback_id = PyLong_AsUnsignedLongLong(callback_id_obj);
423-
Py_ssize_t fn_len;
424-
const char *fn = PyUnicode_AsUTF8AndSize(func_name_obj, &fn_len);
425-
426-
ERL_NIF_TERM state_ref = enif_make_resource(env, suspended);
427-
enif_release_resource(suspended);
428-
429-
ERL_NIF_TERM callback_id_term = enif_make_uint64(env, callback_id);
430-
431-
ERL_NIF_TERM func_name_term;
432-
unsigned char *fn_buf = enif_make_new_binary(env, fn_len, &func_name_term);
433-
memcpy(fn_buf, fn, fn_len);
434-
435-
ERL_NIF_TERM args_term = py_to_term(env, call_args_obj);
431+
PyObject *callback_id_obj = PyLong_FromUnsignedLongLong(tl_pending_callback_id);
432+
PyObject *func_name_obj = PyUnicode_FromStringAndSize(
433+
tl_pending_func_name, tl_pending_func_name_len);
436434

435+
if (callback_id_obj == NULL || func_name_obj == NULL) {
436+
Py_XDECREF(callback_id_obj);
437+
Py_XDECREF(func_name_obj);
437438
Py_DECREF(exc_args);
438-
439-
req->result = enif_make_tuple4(env,
440-
ATOM_SUSPENDED,
441-
callback_id_term,
442-
state_ref,
443-
enif_make_tuple2(env, func_name_term, args_term));
439+
tl_pending_callback = false;
440+
req->result = make_error(env, "build_exc_args_failed");
441+
} else {
442+
PyTuple_SET_ITEM(exc_args, 0, callback_id_obj);
443+
PyTuple_SET_ITEM(exc_args, 1, func_name_obj);
444+
Py_INCREF(tl_pending_args);
445+
PyTuple_SET_ITEM(exc_args, 2, tl_pending_args);
446+
447+
suspended_state_t *suspended = create_suspended_state(env, exc_args, req);
448+
if (suspended == NULL) {
449+
Py_DECREF(exc_args);
450+
tl_pending_callback = false;
451+
req->result = make_error(env, "create_suspended_state_failed");
452+
} else {
453+
ERL_NIF_TERM state_ref = enif_make_resource(env, suspended);
454+
enif_release_resource(suspended);
455+
456+
ERL_NIF_TERM callback_id_term = enif_make_uint64(env, tl_pending_callback_id);
457+
458+
ERL_NIF_TERM func_name_term;
459+
unsigned char *fn_buf = enif_make_new_binary(env, tl_pending_func_name_len, &func_name_term);
460+
memcpy(fn_buf, tl_pending_func_name, tl_pending_func_name_len);
461+
462+
ERL_NIF_TERM args_term = py_to_term(env, tl_pending_args);
463+
464+
Py_DECREF(exc_args);
465+
tl_pending_callback = false;
466+
467+
req->result = enif_make_tuple4(env,
468+
ATOM_SUSPENDED,
469+
callback_id_term,
470+
state_ref,
471+
enif_make_tuple2(env, func_name_term, args_term));
472+
}
444473
}
445474
}
446475
} else {

c_src/py_nif.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ __thread ErlNifEnv *tl_callback_env = NULL;
8383
__thread suspended_state_t *tl_current_suspended = NULL;
8484
__thread bool tl_allow_suspension = false;
8585

86+
/* Thread-local pending callback state (flag-based detection, not exception-based) */
87+
__thread bool tl_pending_callback = false;
88+
__thread uint64_t tl_pending_callback_id = 0;
89+
__thread char *tl_pending_func_name = NULL;
90+
__thread size_t tl_pending_func_name_len = 0;
91+
__thread PyObject *tl_pending_args = NULL;
92+
8693
/* Thread-local timeout state */
8794
__thread uint64_t tl_timeout_deadline = 0;
8895
__thread bool tl_timeout_enabled = false;

c_src/py_nif.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,13 @@ extern __thread suspended_state_t *tl_current_suspended;
717717
/** @brief Flag: suspension is allowed in current context */
718718
extern __thread bool tl_allow_suspension;
719719

720+
/** @brief Flag: pending callback detected (checked before exception type) */
721+
extern __thread bool tl_pending_callback;
722+
extern __thread uint64_t tl_pending_callback_id;
723+
extern __thread char *tl_pending_func_name;
724+
extern __thread size_t tl_pending_func_name_len;
725+
extern __thread PyObject *tl_pending_args;
726+
720727
/** @brief Timeout deadline (nanoseconds, monotonic clock) */
721728
extern __thread uint64_t tl_timeout_deadline;
722729

0 commit comments

Comments
 (0)