Skip to content

Commit 95fedda

Browse files
committed
Remove legacy py_event_router module
The py_event_worker now handles all event loop functionality including FD events, timers, and task processing. This simplifies the architecture by consolidating event handling into a single worker process. Changes: - Delete src/py_event_router.erl - Remove router_pid from py_event_loop state record - Remove set_shared_router NIF from py_nif.erl and C code - Simplify C code to always use worker_pid instead of router fallback - Update all tests to use py_event_worker - Update asyncio.md documentation
1 parent 458d659 commit 95fedda

10 files changed

Lines changed: 161 additions & 408 deletions

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@
4141
contexts uses `thread_worker_call()` rather than suspension/resume protocol;
4242
re-entrant calls to the same OWN_GIL context are not supported
4343

44+
### Changed
45+
46+
- **Removed py_event_router** - Removed legacy `py_event_router` module. The `py_event_worker`
47+
now handles all event loop functionality including FD events, timers, and task processing.
48+
This simplifies the architecture by consolidating event handling into a single worker process.
49+
The `py_nif:set_shared_router/1` function has been removed.
50+
4451
### Added
4552

4653
- **Event Loop Pool** - Pool of event loops for parallel Python coroutine execution

c_src/py_event_loop.c

Lines changed: 64 additions & 119 deletions
Large diffs are not rendered by default.

c_src/py_event_loop.h

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,10 @@ typedef struct {
264264
* - Synchronization primitives
265265
*/
266266
typedef struct erlang_event_loop {
267-
/** @brief PID of the py_event_router gen_server (legacy) */
267+
/** @brief Legacy field - kept for binary compatibility */
268268
ErlNifPid router_pid;
269269

270-
/** @brief Whether router_pid has been set */
270+
/** @brief Legacy field - kept for binary compatibility */
271271
bool has_router;
272272

273273
/** @brief PID of the py_event_worker gen_server (scalable I/O model) */
@@ -594,9 +594,9 @@ ERL_NIF_TERM nif_get_pending(ErlNifEnv *env, int argc,
594594
const ERL_NIF_TERM argv[]);
595595

596596
/**
597-
* @brief Dispatch a callback from the router
597+
* @brief Dispatch a callback from the worker
598598
*
599-
* Called by py_event_router when an event occurs.
599+
* Called by py_event_worker when an event occurs.
600600
*
601601
* NIF: dispatch_callback(LoopRef, CallbackId, Type) -> ok
602602
*/
@@ -978,14 +978,6 @@ int py_event_loop_init_python(ErlNifEnv *env, erlang_event_loop_t *loop);
978978
ERL_NIF_TERM nif_set_python_event_loop(ErlNifEnv *env, int argc,
979979
const ERL_NIF_TERM argv[]);
980980

981-
/**
982-
* @brief Set the shared router PID for per-loop created loops
983-
*
984-
* NIF: set_shared_router(RouterPid) -> ok | {error, Reason}
985-
*/
986-
ERL_NIF_TERM nif_set_shared_router(ErlNifEnv *env, int argc,
987-
const ERL_NIF_TERM argv[]);
988-
989981
/**
990982
* @brief Set the shared worker PID for task_ready notifications
991983
*

c_src/py_nif.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6568,7 +6568,6 @@ static ErlNifFunc nif_funcs[] = {
65686568
/* Python event loop integration */
65696569
{"set_python_event_loop", 1, nif_set_python_event_loop, 0},
65706570
{"set_isolation_mode", 1, nif_set_isolation_mode, 0},
6571-
{"set_shared_router", 1, nif_set_shared_router, 0},
65726571
{"set_shared_worker", 1, nif_set_shared_worker, 0},
65736572

65746573
/* ASGI optimizations */

docs/asyncio.md

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ erlang.run(main())
4747
│ │ _run_once() │ └────────────────────────────────────┘ │
4848
│ │ │ │ │
4949
│ │ ▼ │ ┌────────────────────────────────────┐ │
50-
│ │ process pending │ │ py_event_router │ │
50+
│ │ process pending │ │ │ │
5151
│ │ callbacks │ │ │ │
52-
│ └──────────────────┘ │ Routes events to correct loop │ │
53-
│ │ based on resource backref │ │
52+
│ └──────────────────┘ │ │ │
53+
│ │ │ │
5454
│ ┌──────────────────┐ └────────────────────────────────────┘ │
5555
│ │ asyncio (via │ │
5656
│ │ erlang.run()) │ ┌────────────────────────────────────┐ │
@@ -68,8 +68,7 @@ erlang.run(main())
6868
| Component | Role |
6969
|-----------|------|
7070
| `ErlangEventLoop` | Python asyncio event loop using Erlang for I/O and timers |
71-
| `py_event_worker` | Erlang gen_server managing FDs and timers for a Python context |
72-
| `py_event_router` | Routes timer/FD events to the correct event loop instance |
71+
| `py_event_worker` | Erlang gen_server handling FDs, timers, and task processing |
7372
| `erlang.run()` | Entry point to run asyncio code with the Erlang event loop |
7473

7574
## Usage Patterns
@@ -541,13 +540,13 @@ py_nif:close_test_fd(Fd).
541540

542541
## Integration with Erlang
543542

544-
The event loop integrates with Erlang's message passing system through a router process:
543+
The event loop integrates with Erlang's message passing system through a worker process:
545544

546545
```erlang
547-
%% Start the event router
546+
%% Start the event worker
548547
{ok, LoopRef} = py_nif:event_loop_new(),
549-
{ok, RouterPid} = py_event_router:start_link(LoopRef),
550-
ok = py_nif:event_loop_set_router(LoopRef, RouterPid).
548+
{ok, WorkerPid} = py_event_worker:start_link(<<"worker">>, LoopRef),
549+
ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid).
551550
```
552551

553552
Events are delivered as Erlang messages, enabling the event loop to participate in BEAM's supervision trees and distributed computing capabilities.
@@ -598,27 +597,28 @@ t2.join()
598597

599598
### Internal Architecture
600599

601-
A shared router process handles timer and FD events for all loops:
600+
Each event loop has an associated worker process that handles timer and FD events:
602601

603602
```
604603
┌─────────────────────────────────────────────────────────────────┐
605-
py_event_router (shared)
604+
py_event_worker
606605
│ │
607606
│ Receives: │
608607
│ - Timer expirations from erlang:send_after │
609608
│ - FD ready events from enif_select │
609+
│ - task_ready messages for processing tasks │
610610
│ │
611-
│ Dispatches to correct loop via resource backref
611+
│ Dispatches events to the loop's pending queue
612612
└─────────────────────────────────────────────────────────────────┘
613-
▲ ▲
614-
│ │
615-
┌────┴────┐ ┌────┴────┐ ┌────────┐
616-
│ Loop A │ │ Loop B │ Loop C
617-
│ pending │ │ pending │ pending │
618-
└─────────┘ └─────────┘ └─────────┘
613+
614+
615+
┌───────────┐
616+
Loop
617+
pending
618+
───────────┘
619619
```
620620

621-
Each loop has its own pending queue, ensuring callbacks are processed only by the loop that scheduled them. The shared router dispatches timer and FD events to the correct loop based on the capsule backref.
621+
Each loop has its own pending queue, ensuring callbacks are processed only by the loop that scheduled them. The worker dispatches timer, FD events, and tasks to the correct loop.
622622

623623
## Erlang Timer Integration
624624

src/py_event_loop.erl

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@
5454
-record(state, {
5555
loop_ref :: reference() | undefined,
5656
worker_pid :: pid() | undefined,
57-
worker_id :: binary(),
58-
router_pid :: pid() | undefined
57+
worker_id :: binary()
5958
}).
6059

6160
%% ============================================================================
@@ -326,19 +325,14 @@ init([]) ->
326325
%% Set global shared worker for dispatch_timer task_ready notifications
327326
ok = py_nif:set_shared_worker(WorkerPid),
328327

329-
%% Also start legacy router for backward compatibility
330-
{ok, RouterPid} = py_event_router:start_link(LoopRef),
331-
ok = py_nif:set_shared_router(RouterPid),
332-
333328
%% Make the event loop available to Python
334329
ok = py_nif:set_python_event_loop(LoopRef),
335330
%% Set ErlangEventLoop as the default asyncio policy
336331
ok = set_default_policy(),
337332
{ok, #state{
338333
loop_ref = LoopRef,
339334
worker_pid = WorkerPid,
340-
worker_id = WorkerId,
341-
router_pid = RouterPid
335+
worker_id = WorkerId
342336
}};
343337
{error, Reason} ->
344338
{stop, {event_loop_init_failed, Reason}}
@@ -392,14 +386,11 @@ handle_call(get_loop, _From, #state{loop_ref = undefined} = State) ->
392386
ok = py_nif:event_loop_set_worker(LoopRef, WorkerPid),
393387
ok = py_nif:event_loop_set_id(LoopRef, WorkerId),
394388
ok = py_nif:set_shared_worker(WorkerPid),
395-
{ok, RouterPid} = py_event_router:start_link(LoopRef),
396-
ok = py_nif:set_shared_router(RouterPid),
397389
ok = py_nif:set_python_event_loop(LoopRef),
398390
NewState = State#state{
399391
loop_ref = LoopRef,
400392
worker_pid = WorkerPid,
401-
worker_id = WorkerId,
402-
router_pid = RouterPid
393+
worker_id = WorkerId
403394
},
404395
{reply, {ok, LoopRef}, NewState};
405396
{error, _} = Error ->
@@ -418,19 +409,14 @@ handle_cast(_Msg, State) ->
418409
handle_info(_Info, State) ->
419410
{noreply, State}.
420411

421-
terminate(_Reason, #state{loop_ref = LoopRef, worker_pid = WorkerPid, router_pid = RouterPid}) ->
412+
terminate(_Reason, #state{loop_ref = LoopRef, worker_pid = WorkerPid}) ->
422413
%% Reset asyncio policy back to default before destroying the loop
423414
reset_default_policy(),
424-
%% Clean up worker (scalable I/O model)
415+
%% Clean up worker
425416
case WorkerPid of
426417
undefined -> ok;
427418
WPid -> py_event_worker:stop(WPid)
428419
end,
429-
%% Clean up legacy router
430-
case RouterPid of
431-
undefined -> ok;
432-
RPid -> py_event_router:stop(RPid)
433-
end,
434420
%% Clean up event loop
435421
case LoopRef of
436422
undefined -> ok;

src/py_event_router.erl

Lines changed: 0 additions & 168 deletions
This file was deleted.

0 commit comments

Comments
 (0)