Skip to content

Commit b9cf1e7

Browse files
committed
Fix test failures after API simplification to worker mode
Remove subinterpreter-specific tests that are no longer applicable: - py_context_process_SUITE: Remove subinterp test group - py_channel_SUITE: Remove subinterp_sync_receive_wait_test - py_import_SUITE: Remove registry_applied_to_subinterp_test - py_reactor_SUITE: Remove reactor_context_subinterp_isolation_test Fix parallel pool crash by properly restoring main interpreter state: - Save main_tstate before creating OWN_GIL subinterpreters - Use PyEval_RestoreThread(main_tstate) instead of PyGILState_Ensure() - Defer event loop initialization to avoid NULL env parameter All 415 tests pass in both standard and parallel modes.
1 parent 2b04f5a commit b9cf1e7

7 files changed

Lines changed: 32 additions & 249 deletions

c_src/py_parallel_pool.c

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,10 @@ int parallel_pool_init(int size) {
8282
memset(g_parallel_pool, 0, sizeof(g_parallel_pool));
8383
atomic_store(&g_parallel_next_slot, 0);
8484

85-
/* Note: When creating OWN_GIL subinterpreters, the main GIL is released
86-
* after each Py_NewInterpreterFromConfig call and we need to re-acquire
87-
* it with PyGILState_Ensure() for the next iteration. */
85+
/* Save the main thread state. We need to restore this after creating
86+
* each OWN_GIL subinterpreter because Py_NewInterpreterFromConfig
87+
* releases the main GIL and switches to the new subinterpreter. */
88+
PyThreadState *main_tstate = PyThreadState_Get();
8889

8990
for (int i = 0; i < size; i++) {
9091
parallel_slot_t *slot = &g_parallel_pool[i];
@@ -122,8 +123,8 @@ int parallel_pool_init(int size) {
122123
}
123124
}
124125

125-
/* Try to restore main thread state */
126-
PyGILState_Ensure();
126+
/* Restore main thread state */
127+
PyEval_RestoreThread(main_tstate);
127128
return -1;
128129
}
129130

@@ -155,7 +156,8 @@ int parallel_pool_init(int size) {
155156
}
156157
}
157158

158-
PyGILState_Ensure();
159+
/* Restore main thread state */
160+
PyEval_RestoreThread(main_tstate);
159161
return -1;
160162
}
161163

@@ -189,23 +191,25 @@ int parallel_pool_init(int size) {
189191
}
190192
}
191193

192-
/* Initialize event loop for this subinterpreter */
193-
if (init_subinterpreter_event_loop(NULL) < 0) {
194-
fprintf(stderr, "parallel_pool_init: failed to init event loop in slot %d\n", i);
195-
log_and_clear_python_error("parallel_pool event_loop_init");
196-
/* Non-fatal - async features won't work */
197-
}
194+
/* Note: Event loop initialization is deferred until first use.
195+
* The init_subinterpreter_event_loop() function requires an ErlNifEnv
196+
* which we don't have during pool initialization. Event loops will be
197+
* created lazily when async operations are first performed. */
198198

199199
slot->initialized = true;
200200
atomic_store(&slot->active_count, 0);
201201
atomic_store(&slot->shutdown_requested, false);
202202

203203
/* Release this slot's GIL (save thread state).
204-
* After this, we need to re-acquire main GIL to create next subinterpreter. */
204+
* After this, we need to restore the main thread state to create the
205+
* next subinterpreter or to return to the caller. */
205206
PyEval_SaveThread();
206207

207-
/* Re-acquire main GIL for next iteration */
208-
PyGILState_Ensure();
208+
/* Restore main interpreter thread state for next iteration.
209+
* This is critical: PyGILState_Ensure() doesn't work correctly after
210+
* creating OWN_GIL subinterpreters because there's no thread state
211+
* associated with the calling thread anymore. */
212+
PyEval_RestoreThread(main_tstate);
209213
}
210214

211215
g_parallel_pool_size = size;

c_src/py_subinterp_pool.c

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,10 @@ int subinterp_pool_init(int size) {
187187
}
188188
}
189189

190-
/* Initialize event loop for this subinterpreter.
191-
* This enables asyncio support (sleep, timers, etc.) */
192-
if (init_subinterpreter_event_loop(NULL) < 0) {
193-
fprintf(stderr, "subinterp_pool_init: failed to init event loop in subinterp %d\n", i);
194-
log_and_clear_python_error("subinterp event_loop_init");
195-
/* Non-fatal - async features just won't work */
196-
}
190+
/* Note: Event loop initialization is deferred until first use.
191+
* The init_subinterpreter_event_loop() function requires an ErlNifEnv
192+
* which we don't have during pool initialization. Event loops will be
193+
* created lazily when async operations are first performed. */
197194

198195
slot->initialized = true;
199196

c_src/py_worker_pool.c

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -493,12 +493,10 @@ static void *py_pool_worker_thread(void *arg) {
493493

494494
worker->interp = PyThreadState_GetInterpreter(worker->tstate);
495495

496-
/* Initialize event loop for this subinterpreter */
497-
if (init_subinterpreter_event_loop(NULL) < 0) {
498-
gil_release(guard);
499-
worker->running = false;
500-
return NULL;
501-
}
496+
/* Note: Event loop initialization is deferred until first use.
497+
* The init_subinterpreter_event_loop() function requires an ErlNifEnv
498+
* which we don't have during worker initialization. Event loops will be
499+
* created lazily when async operations are first performed. */
502500

503501
/* Release main GIL - we now have our own */
504502
gil_release(guard);

test/py_channel_SUITE.erl

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
mixed_waiter_async_blocks_sync_test/1,
4343
%% Async receive with actual waiting
4444
async_receive_wait_e2e_test/1,
45-
%% Subinterpreter mode tests
46-
subinterp_sync_receive_wait_test/1,
4745
%% Close and drain tests
4846
close_drain_erlang_test/1,
4947
close_drain_python_sync_test/1,
@@ -80,8 +78,6 @@ all() -> [
8078
mixed_waiter_async_blocks_sync_test,
8179
%% Async receive with actual waiting
8280
async_receive_wait_e2e_test,
83-
%% Subinterpreter mode tests
84-
subinterp_sync_receive_wait_test,
8581
%% Close and drain tests
8682
close_drain_erlang_test,
8783
close_drain_python_sync_test,
@@ -551,71 +547,6 @@ async def receive_from_channel(ch_ref):
551547

552548
ok = py_channel:close(Ch).
553549

554-
%%% ============================================================================
555-
%%% Subinterpreter Mode Tests
556-
%%% ============================================================================
557-
558-
%% @doc Test sync blocking receive works with subinterpreter mode contexts
559-
%% This is a true e2e test: Python Channel.receive() blocks until Erlang sends data
560-
subinterp_sync_receive_wait_test(_Config) ->
561-
case py_nif:subinterp_supported() of
562-
false ->
563-
{skip, "Subinterpreters not supported (requires Python 3.12+)"};
564-
true ->
565-
do_subinterp_sync_receive_wait_test()
566-
end.
567-
568-
do_subinterp_sync_receive_wait_test() ->
569-
{ok, Ch} = py_channel:new(),
570-
Self = self(),
571-
572-
%% Create a context explicitly in subinterp mode
573-
CtxId = erlang:unique_integer([positive]),
574-
{ok, CtxPid} = py_context:start_link(CtxId, subinterp),
575-
576-
%% Import Channel class in the subinterp context
577-
ok = py_context:exec(CtxPid, <<"from erlang import Channel">>),
578-
579-
%% Test 1: Immediate receive with data already available
580-
ok = py_channel:send(Ch, <<"immediate_data">>),
581-
{ok, <<"immediate_data">>} = py_context:eval(CtxPid,
582-
<<"Channel(ch).receive()">>, #{<<"ch">> => Ch}),
583-
ct:pal("Subinterp immediate receive OK"),
584-
585-
%% Test 2: Blocking receive - spawn process to send data after delay
586-
spawn_link(fun() ->
587-
timer:sleep(100),
588-
ok = py_channel:send(Ch, <<"delayed_data">>),
589-
Self ! data_sent
590-
end),
591-
592-
%% This should block until the spawned process sends data
593-
{ok, <<"delayed_data">>} = py_context:eval(CtxPid,
594-
<<"Channel(ch).receive()">>, #{<<"ch">> => Ch}),
595-
receive data_sent -> ok after 1000 -> ok end,
596-
ct:pal("Subinterp blocking receive OK"),
597-
598-
%% Test 3: try_receive on empty channel returns None
599-
{ok, none} = py_context:eval(CtxPid,
600-
<<"Channel(ch).try_receive()">>, #{<<"ch">> => Ch}),
601-
ct:pal("Subinterp try_receive empty OK"),
602-
603-
%% Test 4: Channel close detected by receive
604-
ok = py_channel:close(Ch),
605-
ok = py_context:exec(CtxPid, <<"
606-
def test_closed(ch_ref):
607-
try:
608-
Channel(ch_ref).receive()
609-
return 'no_exception'
610-
except:
611-
return 'got_exception'
612-
">>),
613-
{ok, <<"got_exception">>} = py_context:eval(CtxPid,
614-
<<"test_closed(ch)">>, #{<<"ch">> => Ch}),
615-
ct:pal("Subinterp closed channel detected OK"),
616-
617-
py_context:stop(CtxPid).
618-
619550
%%% ============================================================================
620551
%%% Close and Drain Tests
621552
%%% ============================================================================

test/py_context_process_SUITE.erl

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@
3636

3737
all() ->
3838
[
39-
{group, worker},
40-
{group, subinterp}
39+
{group, worker}
4140
].
4241

4342
groups() ->
@@ -56,8 +55,7 @@ groups() ->
5655
test_context_type_conversions
5756
],
5857
[
59-
{worker, [sequence], Tests},
60-
{subinterp, [sequence], Tests}
58+
{worker, [sequence], Tests}
6159
].
6260

6361
init_per_suite(Config) ->
@@ -69,14 +67,7 @@ end_per_suite(_Config) ->
6967
ok.
7068

7169
init_per_group(worker, Config) ->
72-
[{context_type, worker} | Config];
73-
init_per_group(subinterp, Config) ->
74-
case py_nif:subinterp_supported() of
75-
true ->
76-
[{context_type, subinterp} | Config];
77-
false ->
78-
{skip, "Subinterpreters not supported (requires Python 3.12+)"}
79-
end.
70+
[{context_type, worker} | Config].
8071

8172
end_per_group(_Group, _Config) ->
8273
ok.

test/py_import_SUITE.erl

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
event_loop_pool_import_test/1,
3737
spawn_task_uses_import_test/1,
3838
subinterp_isolation_test/1,
39-
registry_applied_to_subinterp_test/1,
4039
%% sys.modules verification tests
4140
import_in_sys_modules_test/1,
4241
registry_import_in_sys_modules_test/1,
@@ -78,7 +77,6 @@ groups() ->
7877
event_loop_pool_import_test,
7978
spawn_task_uses_import_test,
8079
subinterp_isolation_test,
81-
registry_applied_to_subinterp_test,
8280
%% sys.modules verification tests
8381
import_in_sys_modules_test,
8482
registry_import_in_sys_modules_test,
@@ -589,34 +587,6 @@ subinterp_isolation_test(_Config) ->
589587
ct:pal("Subinterpreter isolation verified - different interpreters are isolated")
590588
end.
591589

592-
%% @doc Test that registry imports are applied to new subinterpreter contexts
593-
%%
594-
%% When py:import is called, it adds to the registry. New contexts should
595-
%% have these imports applied to their interpreter.
596-
registry_applied_to_subinterp_test(_Config) ->
597-
%% Skip if subinterpreters not supported
598-
case py_nif:subinterp_supported() of
599-
false ->
600-
{skip, "Subinterpreters not supported"};
601-
true ->
602-
%% Clear registry and add an import
603-
ok = py_import:clear_imports(),
604-
ok = py_import:ensure_imported(uuid),
605-
606-
%% Create a new subinterp context
607-
{ok, Ctx} = py_context:new(#{mode => subinterp}),
608-
609-
%% The uuid module should be available (applied from registry)
610-
{ok, Result} = py_context:call(Ctx, uuid, uuid4, [], #{}),
611-
?assert(is_binary(Result) orelse is_list(Result)),
612-
613-
%% Clean up
614-
py_context:destroy(Ctx),
615-
ok = py_import:clear_imports(),
616-
617-
ct:pal("Registry imports successfully applied to new subinterpreter")
618-
end.
619-
620590
%% ============================================================================
621591
%% sys.modules Verification Tests
622592
%% ============================================================================

0 commit comments

Comments
 (0)