Skip to content

Commit 3c4b90e

Browse files
authored
Merge pull request #40 from benoitc/fix/channel-create-task-notification
Fix channel notification for create_task async receive
2 parents d412a4f + 1d20a9a commit 3c4b90e

4 files changed

Lines changed: 150 additions & 5 deletions

File tree

c_src/py_event_loop.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3700,6 +3700,25 @@ bool event_loop_add_pending(erlang_event_loop_t *loop, event_type_t type,
37003700
}
37013701

37023702
pthread_mutex_unlock(&loop->mutex);
3703+
3704+
/*
3705+
* Also send task_ready to the worker if one exists.
3706+
* This is needed for create_task: Python is not waiting on the condition
3707+
* variable, so we need to notify the Erlang worker to call process_ready_tasks.
3708+
*
3709+
* Uses the same coalescing logic as submit_task to avoid message floods.
3710+
*/
3711+
if (loop->has_worker) {
3712+
if (!atomic_exchange(&loop->task_wake_pending, true)) {
3713+
ErlNifEnv *msg_env = enif_alloc_env();
3714+
if (msg_env != NULL) {
3715+
ERL_NIF_TERM msg = enif_make_atom(msg_env, "task_ready");
3716+
enif_send(NULL, &loop->worker_pid, msg_env, msg);
3717+
enif_free_env(msg_env);
3718+
}
3719+
}
3720+
}
3721+
37033722
return true;
37043723
}
37053724

priv/_erlang_impl/_byte_channel.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ async def process_bytes_async(channel_ref):
5454
ch.send_bytes(b"HTTP/1.1 200 OK\\r\\n")
5555
"""
5656

57+
from typing import Optional
58+
5759
__all__ = ['ByteChannel', 'ByteChannelClosed']
5860

5961

@@ -102,7 +104,7 @@ def send_bytes(self, data: bytes) -> bool:
102104
# Use _channel_send which sends raw bytes
103105
return erlang._channel_send(self._ref, bytes(data))
104106

105-
def try_receive_bytes(self) -> bytes | None:
107+
def try_receive_bytes(self) -> Optional[bytes]:
106108
"""Try to receive bytes without blocking.
107109
108110
Returns:

test/py_byte_channel_SUITE.erl

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
%% Close and drain tests
4242
close_drain_bytes_erlang_test/1,
4343
close_drain_bytes_python_sync_test/1,
44-
close_drain_bytes_python_async_test/1
44+
close_drain_bytes_python_async_test/1,
45+
close_drain_bytes_create_task_async_test/1
4546
]).
4647

4748
all() -> [
@@ -72,7 +73,8 @@ all() -> [
7273
%% Close and drain tests
7374
close_drain_bytes_erlang_test,
7475
close_drain_bytes_python_sync_test,
75-
close_drain_bytes_python_async_test
76+
close_drain_bytes_python_async_test,
77+
close_drain_bytes_create_task_async_test
7678
].
7779

7880
init_per_suite(Config) ->
@@ -530,3 +532,63 @@ async def async_drain_byte_channel(ch_ref):
530532
py:eval(Ctx, <<"erlang.run(async_drain_byte_channel(ch))">>, #{<<"ch">> => Ch}),
531533

532534
ct:pal("Python async byte channel close+drain test passed").
535+
536+
%% @doc Test async drain with create_task when data arrives after task starts
537+
%% This tests the notification callback path: task registers waiter, then data arrives
538+
close_drain_bytes_create_task_async_test(_Config) ->
539+
{ok, Ch} = py_byte_channel:new(),
540+
541+
%% Define the async drain task
542+
Ctx = py:context(1),
543+
ok = py:exec(Ctx, <<"
544+
import erlang
545+
from erlang import ByteChannel
546+
547+
async def drain_task(ch_ref, reply_pid):
548+
'''Task that drains byte channel and sends results back.'''
549+
try:
550+
ch = ByteChannel(ch_ref)
551+
chunks = []
552+
async for chunk in ch:
553+
chunks.append(chunk)
554+
erlang.send(reply_pid, ('result', chunks))
555+
except Exception as e:
556+
erlang.send(reply_pid, ('error', str(e)))
557+
">>),
558+
559+
%% Create the task BEFORE sending any data
560+
%% This forces the task to register a waiter and wait for notifications
561+
TaskRef = py_event_loop:create_task(
562+
"__main__", "drain_task", [Ch, self()]),
563+
564+
%% Give the task time to start and register waiter
565+
timer:sleep(100),
566+
567+
%% Now send data - should trigger notification callback
568+
ok = py_byte_channel:send(Ch, <<"chunk1">>),
569+
ok = py_byte_channel:send(Ch, <<"chunk2">>),
570+
ok = py_byte_channel:send(Ch, <<"chunk3">>),
571+
572+
%% Close the channel to signal end of stream
573+
ok = py_byte_channel:close(Ch),
574+
575+
%% Wait for result from the task
576+
receive
577+
{<<"result">>, [<<"chunk1">>, <<"chunk2">>, <<"chunk3">>]} ->
578+
ct:pal("create_task async drain with delayed data OK");
579+
{<<"result">>, Other} ->
580+
ct:pal("Unexpected result: ~p", [Other]),
581+
ct:fail({unexpected_result, Other});
582+
{<<"error">>, ErrMsg} ->
583+
ct:pal("Task error: ~p", [ErrMsg]),
584+
ct:fail({task_error, ErrMsg})
585+
after 5000 ->
586+
ct:fail("Timeout waiting for drain task result")
587+
end,
588+
589+
%% Wait for task to complete
590+
case py_event_loop:await(TaskRef, 5000) of
591+
{ok, _} -> ok;
592+
{error, AwaitErr} ->
593+
ct:pal("Await error: ~p", [AwaitErr])
594+
end.

test/py_channel_SUITE.erl

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@
4747
%% Close and drain tests
4848
close_drain_erlang_test/1,
4949
close_drain_python_sync_test/1,
50-
close_drain_python_async_test/1
50+
close_drain_python_async_test/1,
51+
close_drain_create_task_async_test/1
5152
]).
5253

5354
all() -> [
@@ -84,7 +85,8 @@ all() -> [
8485
%% Close and drain tests
8586
close_drain_erlang_test,
8687
close_drain_python_sync_test,
87-
close_drain_python_async_test
88+
close_drain_python_async_test,
89+
close_drain_create_task_async_test
8890
].
8991

9092
init_per_suite(Config) ->
@@ -707,3 +709,63 @@ async def async_drain_channel(ch_ref):
707709
py:eval(Ctx, <<"erlang.run(async_drain_channel(ch))">>, #{<<"ch">> => Ch}),
708710

709711
ct:pal("Python async close+drain test passed").
712+
713+
%% @doc Test async drain with create_task when data arrives after task starts
714+
%% This tests the notification callback path: task registers waiter, then data arrives
715+
close_drain_create_task_async_test(_Config) ->
716+
{ok, Ch} = py_channel:new(),
717+
718+
%% Define the async drain task
719+
Ctx = py:context(1),
720+
ok = py:exec(Ctx, <<"
721+
import erlang
722+
from erlang import Channel
723+
724+
async def drain_task(ch_ref, reply_pid):
725+
'''Task that drains channel and sends results back.'''
726+
try:
727+
ch = Channel(ch_ref)
728+
messages = []
729+
async for msg in ch:
730+
messages.append(msg)
731+
erlang.send(reply_pid, ('result', messages))
732+
except Exception as e:
733+
erlang.send(reply_pid, ('error', str(e)))
734+
">>),
735+
736+
%% Create the task BEFORE sending any data
737+
%% This forces the task to register a waiter and wait for notifications
738+
TaskRef = py_event_loop:create_task(
739+
"__main__", "drain_task", [Ch, self()]),
740+
741+
%% Give the task time to start and register waiter
742+
timer:sleep(100),
743+
744+
%% Now send data - should trigger notification callback
745+
ok = py_channel:send(Ch, <<"msg1">>),
746+
ok = py_channel:send(Ch, <<"msg2">>),
747+
ok = py_channel:send(Ch, <<"msg3">>),
748+
749+
%% Close the channel to signal end of stream
750+
ok = py_channel:close(Ch),
751+
752+
%% Wait for result from the task
753+
receive
754+
{<<"result">>, [<<"msg1">>, <<"msg2">>, <<"msg3">>]} ->
755+
ct:pal("create_task async drain with delayed data OK");
756+
{<<"result">>, Other} ->
757+
ct:pal("Unexpected result: ~p", [Other]),
758+
ct:fail({unexpected_result, Other});
759+
{<<"error">>, ErrMsg} ->
760+
ct:pal("Task error: ~p", [ErrMsg]),
761+
ct:fail({task_error, ErrMsg})
762+
after 5000 ->
763+
ct:fail("Timeout waiting for drain task result")
764+
end,
765+
766+
%% Wait for task to complete
767+
case py_event_loop:await(TaskRef, 5000) of
768+
{ok, _} -> ok;
769+
{error, AwaitErr} ->
770+
ct:pal("Await error: ~p", [AwaitErr])
771+
end.

0 commit comments

Comments
 (0)