Skip to content

Commit 1509fcb

Browse files
committed
Fix nested suspensions during callback replay
When Python code makes multiple erlang.call() invocations within a single function, the second call would fail with unexpected_suspension during replay. This happened because the resume code only handled a single callback result, not multiple sequential callbacks. Changes: - Add create_suspended_state_from_existing() to create new suspended state by copying original request data from existing state - Add make_suspended_term() helper to build the suspension tuple - Update nif_resume_callback_dirty to handle nested suspensions by creating new suspended state and returning {suspended, ...} instead of {error, unexpected_suspension} This enables patterns like: def process(): result1 = erlang.call('func1', x) # First callback result2 = erlang.call('func2', y) # Second callback - now works! return result1 + result2
1 parent 47406f1 commit 1509fcb

1 file changed

Lines changed: 195 additions & 6 deletions

File tree

c_src/py_callback.c

Lines changed: 195 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,165 @@ static suspended_state_t *create_suspended_state(ErlNifEnv *env, PyObject *exc_a
255255
return state;
256256
}
257257

258+
/**
259+
* Create a new suspended state from an existing one (for nested suspensions).
260+
* Used when a second erlang.call() is made during replay.
261+
*/
262+
static suspended_state_t *create_suspended_state_from_existing(
263+
ErlNifEnv *env, PyObject *exc_args, suspended_state_t *existing) {
264+
265+
if (!PyTuple_Check(exc_args) || PyTuple_Size(exc_args) != 3) {
266+
return NULL;
267+
}
268+
269+
PyObject *callback_id_obj = PyTuple_GetItem(exc_args, 0);
270+
PyObject *func_name_obj = PyTuple_GetItem(exc_args, 1);
271+
PyObject *callback_args = PyTuple_GetItem(exc_args, 2);
272+
273+
if (!PyLong_Check(callback_id_obj) || !PyUnicode_Check(func_name_obj)) {
274+
return NULL;
275+
}
276+
277+
/* Allocate the new suspended state resource */
278+
suspended_state_t *state = enif_alloc_resource(
279+
SUSPENDED_STATE_RESOURCE_TYPE, sizeof(suspended_state_t));
280+
if (state == NULL) {
281+
return NULL;
282+
}
283+
284+
/* Initialize the state */
285+
memset(state, 0, sizeof(suspended_state_t));
286+
state->worker = existing->worker; /* Same worker */
287+
state->callback_id = PyLong_AsUnsignedLongLong(callback_id_obj);
288+
289+
/* Copy callback function name */
290+
Py_ssize_t len;
291+
const char *func_name = PyUnicode_AsUTF8AndSize(func_name_obj, &len);
292+
if (func_name == NULL) {
293+
enif_release_resource(state);
294+
return NULL;
295+
}
296+
state->callback_func_name = enif_alloc(len + 1);
297+
if (state->callback_func_name == NULL) {
298+
enif_release_resource(state);
299+
return NULL;
300+
}
301+
memcpy(state->callback_func_name, func_name, len);
302+
state->callback_func_name[len] = '\0';
303+
state->callback_func_len = len;
304+
305+
/* Store reference to callback args */
306+
Py_INCREF(callback_args);
307+
state->callback_args = callback_args;
308+
309+
/* Copy original request context from existing state */
310+
state->request_type = existing->request_type;
311+
state->orig_timeout_ms = existing->orig_timeout_ms;
312+
313+
/* Create environment to hold copied terms */
314+
state->orig_env = enif_alloc_env();
315+
if (state->orig_env == NULL) {
316+
Py_DECREF(callback_args);
317+
state->callback_args = NULL;
318+
enif_free(state->callback_func_name);
319+
state->callback_func_name = NULL;
320+
enif_release_resource(state);
321+
return NULL;
322+
}
323+
324+
/* Copy request-specific data from existing state */
325+
if (existing->request_type == PY_REQ_CALL) {
326+
/* Copy module binary */
327+
if (!enif_alloc_binary(existing->orig_module.size, &state->orig_module)) {
328+
Py_DECREF(callback_args);
329+
state->callback_args = NULL;
330+
enif_free(state->callback_func_name);
331+
state->callback_func_name = NULL;
332+
enif_free_env(state->orig_env);
333+
state->orig_env = NULL;
334+
enif_release_resource(state);
335+
return NULL;
336+
}
337+
memcpy(state->orig_module.data, existing->orig_module.data, existing->orig_module.size);
338+
339+
/* Copy function binary */
340+
if (!enif_alloc_binary(existing->orig_func.size, &state->orig_func)) {
341+
enif_release_binary(&state->orig_module);
342+
Py_DECREF(callback_args);
343+
state->callback_args = NULL;
344+
enif_free(state->callback_func_name);
345+
state->callback_func_name = NULL;
346+
enif_free_env(state->orig_env);
347+
state->orig_env = NULL;
348+
enif_release_resource(state);
349+
return NULL;
350+
}
351+
memcpy(state->orig_func.data, existing->orig_func.data, existing->orig_func.size);
352+
353+
/* Copy args and kwargs to our environment */
354+
state->orig_args = enif_make_copy(state->orig_env, existing->orig_args);
355+
state->orig_kwargs = enif_make_copy(state->orig_env, existing->orig_kwargs);
356+
} else if (existing->request_type == PY_REQ_EVAL) {
357+
/* Copy code binary */
358+
if (!enif_alloc_binary(existing->orig_code.size, &state->orig_code)) {
359+
Py_DECREF(callback_args);
360+
state->callback_args = NULL;
361+
enif_free(state->callback_func_name);
362+
state->callback_func_name = NULL;
363+
enif_free_env(state->orig_env);
364+
state->orig_env = NULL;
365+
enif_release_resource(state);
366+
return NULL;
367+
}
368+
memcpy(state->orig_code.data, existing->orig_code.data, existing->orig_code.size);
369+
370+
/* Copy locals */
371+
state->orig_locals = enif_make_copy(state->orig_env, existing->orig_locals);
372+
}
373+
374+
/* Initialize synchronization primitives */
375+
pthread_mutex_init(&state->mutex, NULL);
376+
pthread_cond_init(&state->cond, NULL);
377+
378+
state->result_data = NULL;
379+
state->result_len = 0;
380+
state->has_result = false;
381+
state->is_error = false;
382+
383+
return state;
384+
}
385+
386+
/**
387+
* Helper to build {suspended, CallbackId, StateRef, {FuncName, Args}} term.
388+
*/
389+
static ERL_NIF_TERM make_suspended_term(ErlNifEnv *env, suspended_state_t *suspended,
390+
PyObject *exc_args) {
391+
PyObject *callback_id_obj = PyTuple_GetItem(exc_args, 0);
392+
PyObject *func_name_obj = PyTuple_GetItem(exc_args, 1);
393+
PyObject *call_args_obj = PyTuple_GetItem(exc_args, 2);
394+
395+
uint64_t callback_id = PyLong_AsUnsignedLongLong(callback_id_obj);
396+
Py_ssize_t fn_len;
397+
const char *fn = PyUnicode_AsUTF8AndSize(func_name_obj, &fn_len);
398+
399+
ERL_NIF_TERM state_ref = enif_make_resource(env, suspended);
400+
enif_release_resource(suspended); /* Erlang now holds the reference */
401+
402+
ERL_NIF_TERM callback_id_term = enif_make_uint64(env, callback_id);
403+
404+
ERL_NIF_TERM func_name_term;
405+
unsigned char *fn_buf = enif_make_new_binary(env, fn_len, &func_name_term);
406+
memcpy(fn_buf, fn, fn_len);
407+
408+
ERL_NIF_TERM args_term = py_to_term(env, call_args_obj);
409+
410+
return enif_make_tuple4(env,
411+
ATOM_SUSPENDED,
412+
callback_id_term,
413+
state_ref,
414+
enif_make_tuple2(env, func_name_term, args_term));
415+
}
416+
258417
/**
259418
* Helper to parse callback response data into a Python object.
260419
* Response format: status_byte (0=ok, 1=error) + python_repr_string
@@ -994,9 +1153,24 @@ static ERL_NIF_TERM nif_resume_callback_dirty(ErlNifEnv *env, int argc, const ER
9941153

9951154
if (py_result == NULL) {
9961155
if (is_suspension_exception()) {
997-
/* Another suspension - shouldn't happen if cache worked, but handle it */
998-
PyErr_Clear();
999-
result = make_error(env, "unexpected_suspension");
1156+
/*
1157+
* Another suspension during replay - Python made a second erlang.call().
1158+
* Create a new suspended state and return {suspended, ...} so Erlang
1159+
* can handle this callback and resume again.
1160+
*/
1161+
PyObject *exc_args = get_suspension_args(); /* Clears exception */
1162+
if (exc_args == NULL) {
1163+
result = make_error(env, "get_suspension_args_failed");
1164+
} else {
1165+
suspended_state_t *new_suspended = create_suspended_state_from_existing(env, exc_args, state);
1166+
if (new_suspended == NULL) {
1167+
Py_DECREF(exc_args);
1168+
result = make_error(env, "create_nested_suspended_state_failed");
1169+
} else {
1170+
result = make_suspended_term(env, new_suspended, exc_args);
1171+
Py_DECREF(exc_args);
1172+
}
1173+
}
10001174
} else {
10011175
result = make_py_error(env);
10021176
}
@@ -1044,9 +1218,24 @@ static ERL_NIF_TERM nif_resume_callback_dirty(ErlNifEnv *env, int argc, const ER
10441218

10451219
if (py_result == NULL) {
10461220
if (is_suspension_exception()) {
1047-
/* Another suspension - shouldn't happen if cache worked, but handle it */
1048-
PyErr_Clear();
1049-
result = make_error(env, "unexpected_suspension");
1221+
/*
1222+
* Another suspension during replay - Python made a second erlang.call().
1223+
* Create a new suspended state and return {suspended, ...} so Erlang
1224+
* can handle this callback and resume again.
1225+
*/
1226+
PyObject *exc_args = get_suspension_args(); /* Clears exception */
1227+
if (exc_args == NULL) {
1228+
result = make_error(env, "get_suspension_args_failed");
1229+
} else {
1230+
suspended_state_t *new_suspended = create_suspended_state_from_existing(env, exc_args, state);
1231+
if (new_suspended == NULL) {
1232+
Py_DECREF(exc_args);
1233+
result = make_error(env, "create_nested_suspended_state_failed");
1234+
} else {
1235+
result = make_suspended_term(env, new_suspended, exc_args);
1236+
Py_DECREF(exc_args);
1237+
}
1238+
}
10501239
} else {
10511240
result = make_py_error(env);
10521241
}

0 commit comments

Comments
 (0)