Skip to content

Commit be246a4

Browse files
committed
Add event loop pool with process affinity for parallel execution
Event loop worker pool inspired by libuv's "one loop per thread" model. Each loop has its own worker and maintains event ordering. Process affinity: All tasks from the same Erlang process are routed to the same event loop (via PID hash), guaranteeing that timers and related async operations execute in order. Changes: - py_event_loop_pool.erl: Rewrite with process affinity, persistent_term for O(1) access, distributed task API (create_task, run, spawn_task, await) - py_event_loop.erl: Export get_process_env/0 for pool use - py_event_loop_pool_SUITE.erl: New test suite (9 tests) - bench_event_loop_pool.erl: New benchmark Configuration: - event_loop_pool_size: Number of loops (default: schedulers count) Benchmarks (Python 3.14, 14 schedulers): - Sequential (pool): 150k tasks/sec (vs 83k single loop) - Concurrent (50 procs): 164k tasks/sec - Fire-and-collect: 417k tasks/sec
1 parent da1eb98 commit be246a4

4 files changed

Lines changed: 472 additions & 52 deletions

File tree

examples/bench_event_loop_pool.erl

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#!/usr/bin/env escript
2+
%% -*- erlang -*-
3+
%%! -pa _build/default/lib/erlang_python/ebin
4+
5+
%%% @doc Benchmark for event loop pool parallel processing.
6+
%%%
7+
%%% Compares single event loop vs pool performance.
8+
%%%
9+
%%% Run with:
10+
%%% rebar3 compile && escript examples/bench_event_loop_pool.erl
11+
12+
-mode(compile).
13+
14+
main(_Args) ->
15+
io:format("~n=== Event Loop Pool Benchmark ===~n~n"),
16+
17+
{ok, _} = application:ensure_all_started(erlang_python),
18+
{ok, _} = py:start_contexts(),
19+
timer:sleep(500),
20+
21+
print_system_info(),
22+
23+
%% Verify pool is ready
24+
case py_event_loop_pool:get_loop() of
25+
{ok, _} -> ok;
26+
{error, R} ->
27+
io:format("Pool not available: ~p~n", [R]),
28+
halt(1)
29+
end,
30+
31+
Stats = py_event_loop_pool:get_stats(),
32+
io:format("Pool Stats: ~p~n~n", [Stats]),
33+
34+
%% Run benchmarks
35+
bench_single_vs_pool_sequential(1000),
36+
bench_pool_concurrent(20, 100),
37+
bench_pool_concurrent(50, 100),
38+
bench_pool_throughput(10000),
39+
40+
io:format("=== Benchmark Complete ===~n"),
41+
halt(0).
42+
43+
print_system_info() ->
44+
io:format("System Information:~n"),
45+
io:format(" Erlang/OTP: ~s~n", [erlang:system_info(otp_release)]),
46+
io:format(" Schedulers: ~p~n", [erlang:system_info(schedulers)]),
47+
{ok, PyVer} = py:version(),
48+
io:format(" Python: ~s~n~n", [PyVer]).
49+
50+
%% Compare single loop vs pool for sequential tasks
51+
bench_single_vs_pool_sequential(N) ->
52+
io:format("Benchmark: Sequential tasks (single caller)~n"),
53+
io:format(" Iterations: ~p~n", [N]),
54+
55+
%% Single event loop
56+
{T1, _} = timer:tc(fun() ->
57+
lists:foreach(fun(I) ->
58+
Ref = py_event_loop:create_task(math, sqrt, [float(I)]),
59+
{ok, _} = py_event_loop:await(Ref)
60+
end, lists:seq(1, N))
61+
end),
62+
63+
%% Pool (should be similar since same caller = same loop)
64+
{T2, _} = timer:tc(fun() ->
65+
lists:foreach(fun(I) ->
66+
Ref = py_event_loop_pool:create_task(math, sqrt, [float(I)]),
67+
{ok, _} = py_event_loop_pool:await(Ref)
68+
end, lists:seq(1, N))
69+
end),
70+
71+
io:format(" py_event_loop: ~.2f ms (~p tasks/sec)~n",
72+
[T1/1000, round(N / (T1/1000000))]),
73+
io:format(" py_event_loop_pool: ~.2f ms (~p tasks/sec)~n~n",
74+
[T2/1000, round(N / (T2/1000000))]).
75+
76+
%% Pool with concurrent callers (each gets own loop = parallel)
77+
bench_pool_concurrent(NumProcs, TasksPerProc) ->
78+
TotalTasks = NumProcs * TasksPerProc,
79+
io:format("Benchmark: Concurrent callers via pool~n"),
80+
io:format(" Processes: ~p, Tasks/process: ~p, Total: ~p~n",
81+
[NumProcs, TasksPerProc, TotalTasks]),
82+
83+
Parent = self(),
84+
85+
{Time, _} = timer:tc(fun() ->
86+
Pids = [spawn_link(fun() ->
87+
lists:foreach(fun(I) ->
88+
Ref = py_event_loop_pool:create_task(math, sqrt, [float(I)]),
89+
{ok, _} = py_event_loop_pool:await(Ref)
90+
end, lists:seq(1, TasksPerProc)),
91+
Parent ! {done, self()}
92+
end) || _ <- lists:seq(1, NumProcs)],
93+
94+
[receive {done, Pid} -> ok end || Pid <- Pids]
95+
end),
96+
97+
io:format(" Total time: ~.2f ms~n", [Time/1000]),
98+
io:format(" Throughput: ~p tasks/sec~n~n", [round(TotalTasks / (Time/1000000))]).
99+
100+
%% High throughput test
101+
bench_pool_throughput(N) ->
102+
io:format("Benchmark: Pool throughput (fire-and-collect)~n"),
103+
io:format(" Tasks: ~p~n", [N]),
104+
105+
%% Submit all tasks first, then await all
106+
{Time, _} = timer:tc(fun() ->
107+
Refs = [py_event_loop_pool:create_task(math, sqrt, [float(I)])
108+
|| I <- lists:seq(1, N)],
109+
[py_event_loop_pool:await(Ref) || Ref <- Refs]
110+
end),
111+
112+
io:format(" Total time: ~.2f ms~n", [Time/1000]),
113+
io:format(" Throughput: ~p tasks/sec~n~n", [round(N / (Time/1000000))]).

src/py_event_loop.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
spawn_task/3, spawn_task/4,
3737
%% Per-process namespace API
3838
exec/1, exec/2,
39-
eval/1, eval/2
39+
eval/1, eval/2,
40+
%% Internal API (used by py_event_loop_pool)
41+
get_process_env/0
4042
]).
4143

4244
%% gen_server callbacks

0 commit comments

Comments
 (0)