Skip to content

Commit c65e2ba

Browse files
committed
Add erlang.async_call() for asyncio-compatible callbacks
Implements a new async API that integrates properly with asyncio without raising exceptions for control flow. This is the recommended approach for ASGI/asyncio applications (FastAPI, Starlette, etc). Usage: result = await erlang.async_call('func', arg1, arg2) The implementation: - Creates an asyncio Future for each call - Sends request to Erlang via thread coordinator - Releases the dirty NIF thread while waiting - Uses a non-blocking pipe monitored by asyncio - Resolves the Future when Erlang responds This avoids the SuspensionRequired exception that conflicts with asyncio's Task error handling when middleware catches and re-raises exceptions.
1 parent cdb2307 commit c65e2ba

8 files changed

Lines changed: 574 additions & 6 deletions

File tree

c_src/py_callback.c

Lines changed: 390 additions & 0 deletions
Large diffs are not rendered by default.

c_src/py_nif.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1765,7 +1765,10 @@ static ErlNifFunc nif_funcs[] = {
17651765
/* Thread worker support (ThreadPoolExecutor) */
17661766
{"thread_worker_set_coordinator", 1, nif_thread_worker_set_coordinator, 0},
17671767
{"thread_worker_write", 2, nif_thread_worker_write, 0},
1768-
{"thread_worker_signal_ready", 1, nif_thread_worker_signal_ready, 0}
1768+
{"thread_worker_signal_ready", 1, nif_thread_worker_signal_ready, 0},
1769+
1770+
/* Async callback support (for erlang.async_call) */
1771+
{"async_callback_response", 3, nif_async_callback_response, 0}
17691772
};
17701773

17711774
ERL_NIF_INIT(py_nif, nif_funcs, load, NULL, upgrade, unload)

c_src/py_nif.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@
8989
#include <time.h>
9090
#include <math.h>
9191
#include <unistd.h>
92+
#include <fcntl.h>
93+
#include <errno.h>
9294
#include <pthread.h>
9395

9496
#if defined(__linux__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)

c_src/py_thread_worker.c

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,10 @@ static bool g_thread_worker_key_created = false;
104104
static __thread thread_worker_t *tl_thread_worker = NULL;
105105

106106
/** @brief PID of the Erlang thread coordinator process */
107-
static ErlNifPid g_thread_coordinator_pid;
107+
ErlNifPid g_thread_coordinator_pid;
108108

109109
/** @brief Flag: coordinator PID has been set */
110-
static bool g_has_thread_coordinator = false;
110+
bool g_has_thread_coordinator = false;
111111

112112
/* ============================================================================
113113
* Thread Worker Destructor (pthread_key cleanup)
@@ -550,3 +550,54 @@ static ERL_NIF_TERM nif_thread_worker_signal_ready(ErlNifEnv *env, int argc,
550550

551551
return ATOM_OK;
552552
}
553+
554+
/**
555+
* @brief NIF to write an async callback response
556+
*
557+
* Args: WriteFd, CallbackId, ResponseBinary
558+
* Called by Erlang to send the result of an async callback back to Python.
559+
* The response is written to the async callback pipe in the format:
560+
* callback_id (8 bytes) + response_len (4 bytes) + response_data
561+
*/
562+
static ERL_NIF_TERM nif_async_callback_response(ErlNifEnv *env, int argc,
563+
const ERL_NIF_TERM argv[]) {
564+
(void)argc;
565+
int fd;
566+
ErlNifUInt64 callback_id;
567+
ErlNifBinary response;
568+
569+
if (!enif_get_int(env, argv[0], &fd)) {
570+
return make_error(env, "invalid_fd");
571+
}
572+
573+
if (!enif_get_uint64(env, argv[1], &callback_id)) {
574+
return make_error(env, "invalid_callback_id");
575+
}
576+
577+
if (!enif_inspect_binary(env, argv[2], &response)) {
578+
return make_error(env, "invalid_response");
579+
}
580+
581+
/* Write callback_id (8 bytes) */
582+
ssize_t n = write(fd, &callback_id, sizeof(callback_id));
583+
if (n != sizeof(callback_id)) {
584+
return make_error(env, "write_callback_id_failed");
585+
}
586+
587+
/* Write response_len (4 bytes) */
588+
uint32_t len = (uint32_t)response.size;
589+
n = write(fd, &len, sizeof(len));
590+
if (n != sizeof(len)) {
591+
return make_error(env, "write_length_failed");
592+
}
593+
594+
/* Write response_data */
595+
if (len > 0) {
596+
n = write(fd, response.data, response.size);
597+
if (n != (ssize_t)response.size) {
598+
return make_error(env, "write_data_failed");
599+
}
600+
}
601+
602+
return ATOM_OK;
603+
}

src/py_nif.erl

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@
6363
%% Thread worker support (ThreadPoolExecutor)
6464
thread_worker_set_coordinator/1,
6565
thread_worker_write/2,
66-
thread_worker_signal_ready/1
66+
thread_worker_signal_ready/1,
67+
%% Async callback support (for erlang.async_call)
68+
async_callback_response/3
6769
]).
6870

6971
-on_load(load_nif/0).
@@ -382,3 +384,20 @@ thread_worker_write(_Fd, _Response) ->
382384
-spec thread_worker_signal_ready(integer()) -> ok | {error, term()}.
383385
thread_worker_signal_ready(_Fd) ->
384386
?NIF_STUB.
387+
388+
%%% ============================================================================
389+
%%% Async Callback Support (for erlang.async_call)
390+
%%% ============================================================================
391+
392+
%% @doc Write an async callback response to the async callback pipe.
393+
%% Fd is the write end of the async callback pipe.
394+
%% CallbackId is the unique identifier for this callback.
395+
%% Response is the result binary (status byte + encoded result).
396+
%%
397+
%% This is called when an async_callback message is processed.
398+
%% The response is written in the format:
399+
%% callback_id (8 bytes) + response_len (4 bytes) + response_data
400+
-spec async_callback_response(integer(), non_neg_integer(), binary()) ->
401+
ok | {error, term()}.
402+
async_callback_response(_Fd, _CallbackId, _Response) ->
403+
?NIF_STUB.

src/py_thread_handler.erl

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,17 @@ handle_info({thread_callback, WorkerId, CallbackId, FuncName, Args},
105105
end,
106106
{noreply, State};
107107

108+
%% Handle async callback request from Python async_call()
109+
%% Unlike thread_callback, this uses a global pipe for all async callbacks.
110+
%% Each response includes the callback_id so Python can match it to the right Future.
111+
handle_info({async_callback, CallbackId, FuncName, Args, WriteFd}, State) ->
112+
%% Spawn a process to handle this callback asynchronously
113+
%% This allows multiple async callbacks to be processed concurrently
114+
spawn_link(fun() ->
115+
handle_async_callback(WriteFd, CallbackId, FuncName, Args)
116+
end),
117+
{noreply, State};
118+
108119
%% Handle handler process exit
109120
handle_info({'EXIT', Pid, _Reason}, #state{handlers = Handlers} = State) ->
110121
%% Remove handler from map
@@ -172,6 +183,37 @@ handle_thread_callback(WriteFd, FuncName, Args) ->
172183
%% Write response to pipe
173184
py_nif:thread_worker_write(WriteFd, Response).
174185

186+
%% Execute an async callback and write response to the async callback pipe.
187+
%% Unlike handle_thread_callback, this includes the callback_id in the response
188+
%% so Python can match it to the correct Future.
189+
handle_async_callback(WriteFd, CallbackId, FuncName, Args) ->
190+
%% Convert Args from tuple to list if needed
191+
ArgsList = case Args of
192+
T when is_tuple(T) -> tuple_to_list(T);
193+
L when is_list(L) -> L;
194+
_ -> [Args]
195+
end,
196+
197+
%% Execute the registered function
198+
Response = case py_callback:execute(FuncName, ArgsList) of
199+
{ok, Result} ->
200+
%% Encode result as Python-parseable string
201+
%% Format: status_byte (0=ok) + python_repr
202+
ResultStr = term_to_python_repr(Result),
203+
<<0, ResultStr/binary>>;
204+
{error, {not_found, Name}} ->
205+
ErrMsg = iolist_to_binary(
206+
io_lib:format("Function '~s' not registered", [Name])),
207+
<<1, ErrMsg/binary>>;
208+
{error, {Class, Reason, _Stack}} ->
209+
ErrMsg = iolist_to_binary(
210+
io_lib:format("~p: ~p", [Class, Reason])),
211+
<<1, ErrMsg/binary>>
212+
end,
213+
214+
%% Write response to async callback pipe (includes callback_id)
215+
py_nif:async_callback_response(WriteFd, CallbackId, Response).
216+
175217
%%% ============================================================================
176218
%%% Term to Python repr conversion
177219
%%% (Same as py_worker.erl - could be factored out to py_util.erl)

test/py_reentrant_SUITE.erl

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
test_callback_with_complex_types/1,
2323
test_multiple_sequential_callbacks/1,
2424
test_call_from_non_worker_thread/1,
25-
test_callback_with_try_except/1
25+
test_callback_with_try_except/1,
26+
test_async_call/1
2627
]).
2728

2829
all() ->
@@ -34,7 +35,8 @@ all() ->
3435
test_callback_with_complex_types,
3536
test_multiple_sequential_callbacks,
3637
test_call_from_non_worker_thread,
37-
test_callback_with_try_except
38+
test_callback_with_try_except,
39+
test_async_call
3840
].
3941

4042
init_per_suite(Config) ->
@@ -58,6 +60,7 @@ end_per_testcase(_TestCase, _Config) ->
5860
catch py:unregister_function(add_ten),
5961
catch py:unregister_function(multiply_by_two),
6062
catch py:unregister_function(subtract_five),
63+
catch py:unregister_function(async_multiply),
6164
ok.
6265

6366
%%% ============================================================================
@@ -308,3 +311,27 @@ test_callback_with_try_except(_Config) ->
308311
%% Cleanup
309312
py:unregister_function(get_value),
310313
ok.
314+
315+
%% @doc Test erlang.async_call() for asyncio-compatible callbacks.
316+
%% This tests the new async API that doesn't raise exceptions for control flow,
317+
%% making it safe to use from ASGI/asyncio applications.
318+
test_async_call(_Config) ->
319+
%% Register an Erlang function
320+
py:register_function(async_multiply, fun([X, Y]) -> X * Y end),
321+
322+
%% Add test directory to Python path
323+
TestDir = code:lib_dir(erlang_python, test),
324+
ok = py:exec(iolist_to_binary(io_lib:format(
325+
"import sys; sys.path.insert(0, '~s')", [TestDir]))),
326+
327+
%% Test single async_call
328+
{ok, Result} = py:call(py_test_async, run_single_test, []),
329+
42 = Result,
330+
331+
%% Test concurrent async_calls
332+
{ok, ConcurrentResult} = py:call(py_test_async, run_concurrent_test, []),
333+
[6, 20, 42] = ConcurrentResult,
334+
335+
%% Cleanup
336+
py:unregister_function(async_multiply),
337+
ok.

test/py_test_async.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
"""Test module for erlang.async_call() functionality.
2+
3+
This module tests the async API that integrates with asyncio without
4+
raising exceptions for control flow.
5+
"""
6+
7+
import asyncio
8+
import erlang
9+
10+
11+
async def single_async_call():
12+
"""Test a single async_call."""
13+
result = await erlang.async_call('async_multiply', 6, 7)
14+
return result
15+
16+
17+
async def concurrent_async_calls():
18+
"""Test multiple concurrent async_calls using asyncio.gather."""
19+
results = await asyncio.gather(
20+
erlang.async_call('async_multiply', 2, 3),
21+
erlang.async_call('async_multiply', 4, 5),
22+
erlang.async_call('async_multiply', 6, 7)
23+
)
24+
return results
25+
26+
27+
def run_single_test():
28+
"""Run single async_call test."""
29+
return asyncio.run(single_async_call())
30+
31+
32+
def run_concurrent_test():
33+
"""Run concurrent async_calls test."""
34+
return asyncio.run(concurrent_async_calls())

0 commit comments

Comments
 (0)