Skip to content

Commit 54e90f2

Browse files
committed
Add threading.Thread support and documentation
Verify and document that erlang.call() works from any Python thread, not just ThreadPoolExecutor workers. The thread worker mechanism already supported this, but it wasn't documented or tested. - Add tests for simple threading.Thread calling erlang.call() - Update C code comments to mention all supported thread types - Update CHANGELOG to reflect broader thread support - Add docs/threading.md with usage examples and architecture
1 parent eb4f27a commit 54e90f2

5 files changed

Lines changed: 265 additions & 13 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44

55
### Added
66

7-
- **ThreadPoolExecutor Support** - Python threads spawned via `concurrent.futures.ThreadPoolExecutor`
8-
can now call `erlang.call()` without blocking
7+
- **Python Thread Support** - Any spawned Python thread can now call `erlang.call()` without blocking
8+
- Supports `threading.Thread`, `concurrent.futures.ThreadPoolExecutor`, and any other Python threads
99
- Each spawned thread lazily acquires a dedicated "thread worker" channel
1010
- One lightweight Erlang process per Python thread handles callbacks
1111
- Automatic cleanup when Python thread exits via `pthread_key_t` destructor
1212
- New module: `py_thread_handler.erl` - Coordinator and per-thread handlers
1313
- New C file: `py_thread_worker.c` - Thread worker pool management
1414
- New test suite: `test/py_thread_callback_SUITE.erl`
15+
- New documentation: `docs/threading.md` - Threading support guide
1516

1617
- **Reentrant Callbacks** - Python→Erlang→Python callback chains without deadlocks
1718
- Exception-based suspension mechanism interrupts Python execution cleanly

c_src/py_callback.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -542,12 +542,15 @@ static PyObject *erlang_call_impl(PyObject *self, PyObject *args) {
542542

543543
/*
544544
* Check if this is a call from an executor thread (normal path) or
545-
* from a spawned thread like ThreadPoolExecutor (thread worker path).
545+
* from a spawned thread (thread worker path).
546546
*/
547547
if (tl_current_worker == NULL || !tl_current_worker->has_callback_handler) {
548548
/*
549-
* Not an executor thread - try thread worker path.
550-
* This enables ThreadPoolExecutor threads to call erlang.call().
549+
* Not an executor thread - use thread worker path.
550+
* This enables any spawned Python thread to call erlang.call():
551+
* - threading.Thread instances
552+
* - concurrent.futures.ThreadPoolExecutor workers
553+
* - Any other Python threads
551554
*/
552555
Py_ssize_t nargs = PyTuple_Size(args);
553556
if (nargs < 1) {

c_src/py_thread_worker.c

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616

1717
/**
1818
* @file py_thread_worker.c
19-
* @brief Thread worker pool for ThreadPoolExecutor support
19+
* @brief Thread worker pool for Python thread support
2020
* @author Benoit Chesneau
2121
*
22-
* This module enables Python threads spawned via concurrent.futures.ThreadPoolExecutor
23-
* to call erlang.call() without blocking. Each spawned thread lazily acquires a
24-
* dedicated "thread worker" channel for communicating with Erlang.
22+
* This module enables any spawned Python thread to call erlang.call() without
23+
* blocking. Supported thread types include:
24+
* - threading.Thread instances
25+
* - concurrent.futures.ThreadPoolExecutor workers
26+
* - Any other Python threads
27+
*
28+
* Each spawned thread lazily acquires a dedicated "thread worker" channel
29+
* for communicating with Erlang.
2530
*
2631
* @par Architecture
2732
*

docs/threading.md

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# Threading Support
2+
3+
erlang_python supports calling `erlang.call()` from Python threads, enabling
4+
concurrent Erlang callbacks from multithreaded Python code.
5+
6+
## Supported Thread Types
7+
8+
- `threading.Thread` - Standard Python threads
9+
- `concurrent.futures.ThreadPoolExecutor` - Thread pool workers
10+
- Any other spawned Python threads
11+
12+
## Usage
13+
14+
### With threading.Thread
15+
16+
```python
17+
import threading
18+
import erlang
19+
20+
def worker():
21+
result = erlang.call('my_function', arg1, arg2)
22+
print(f"Got: {result}")
23+
24+
thread = threading.Thread(target=worker)
25+
thread.start()
26+
thread.join()
27+
```
28+
29+
### With ThreadPoolExecutor
30+
31+
```python
32+
from concurrent.futures import ThreadPoolExecutor
33+
import erlang
34+
35+
def call_erlang(x):
36+
return erlang.call('double_it', x)
37+
38+
with ThreadPoolExecutor(max_workers=4) as executor:
39+
results = list(executor.map(call_erlang, range(10)))
40+
```
41+
42+
### Multiple Calls from Same Thread
43+
44+
A single thread can make multiple sequential `erlang.call()` invocations:
45+
46+
```python
47+
import threading
48+
import erlang
49+
50+
def worker():
51+
# Chain multiple Erlang calls
52+
x = erlang.call('add_one', 0)
53+
x = erlang.call('add_one', x)
54+
x = erlang.call('add_one', x)
55+
return x # Returns 3
56+
57+
thread = threading.Thread(target=worker)
58+
thread.start()
59+
thread.join()
60+
```
61+
62+
## Architecture
63+
64+
Each Python thread that calls `erlang.call()` gets:
65+
66+
1. **A dedicated response pipe** - For receiving results from Erlang
67+
2. **A lightweight Erlang process** - Handles callbacks for that thread
68+
3. **Automatic cleanup** - Resources released when the thread exits
69+
70+
This allows multiple Python threads to make concurrent Erlang calls without
71+
blocking each other.
72+
73+
```
74+
┌─────────────────────────────────────────────────────────────────┐
75+
│ Python Process │
76+
├─────────────────────────────────────────────────────────────────┤
77+
│ Thread 1 Thread 2 Thread 3 │
78+
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
79+
│ │ Worker │ │ Worker │ │ Worker │ │
80+
│ │ Pipe │ │ Pipe │ │ Pipe │ │
81+
│ └────┬────┘ └────┬────┘ └────┬────┘ │
82+
│ │ │ │ │
83+
└───────┼──────────────────┼──────────────────┼───────────────────┘
84+
│ │ │
85+
▼ ▼ ▼
86+
┌─────────────────────────────────────────────────────────────────┐
87+
│ Erlang VM (BEAM) │
88+
├─────────────────────────────────────────────────────────────────┤
89+
│ Handler 1 Handler 2 Handler 3 │
90+
│ (process) (process) (process) │
91+
│ │ │ │ │
92+
│ └──────────────────┼──────────────────┘ │
93+
│ │ │
94+
│ ┌──────▼──────┐ │
95+
│ │ Coordinator │ │
96+
│ │ (process) │ │
97+
│ └─────────────┘ │
98+
└─────────────────────────────────────────────────────────────────┘
99+
```
100+
101+
## Thread Safety
102+
103+
- **Isolated state** - Each thread has its own worker channel, no locks needed
104+
- **Concurrent calls** - Multiple threads can call `erlang.call()` simultaneously
105+
- **Correct delivery** - Results are delivered to the correct thread via per-thread pipes
106+
107+
## Lifecycle
108+
109+
1. **First call** - When a Python thread first calls `erlang.call()`:
110+
- A thread worker is acquired from the pool (or created if none available)
111+
- An Erlang handler process is spawned for this worker
112+
- The worker is associated with the thread via `pthread_key_t`
113+
114+
2. **Subsequent calls** - The same worker is reused for all calls from that thread
115+
116+
3. **Thread exit** - When the Python thread terminates:
117+
- The `pthread_key_t` destructor is invoked automatically
118+
- The worker is released back to the pool
119+
- The Erlang handler process continues to exist for potential reuse
120+
121+
## Performance Considerations
122+
123+
- **Worker reuse** - Workers are pooled and reused across thread lifetimes
124+
- **Lazy initialization** - Handlers are only spawned when first needed
125+
- **No GIL blocking** - The GIL is released while waiting for Erlang responses
126+
- **Lightweight processes** - Each Erlang handler is a lightweight BEAM process
127+
128+
## Limitations
129+
130+
- Threads must be Python threads (not C threads that don't hold the GIL)
131+
- The thread coordinator must be started (happens automatically with application start)
132+
133+
## Registering Erlang Functions
134+
135+
Before Python threads can call Erlang functions, register them:
136+
137+
```erlang
138+
%% In Erlang
139+
py:register_function(double_it, fun([X]) -> X * 2 end).
140+
py:register_function(add_one, fun([X]) -> X + 1 end).
141+
```
142+
143+
Then call from Python threads:
144+
145+
```python
146+
import threading
147+
import erlang
148+
149+
def worker(n):
150+
return erlang.call('double_it', n)
151+
152+
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
153+
for t in threads:
154+
t.start()
155+
for t in threads:
156+
t.join()
157+
```
158+
159+
## Error Handling
160+
161+
Errors from Erlang functions are raised as Python exceptions:
162+
163+
```python
164+
import threading
165+
import erlang
166+
167+
def worker():
168+
try:
169+
result = erlang.call('maybe_fail', 42)
170+
except RuntimeError as e:
171+
print(f"Erlang error: {e}")
172+
173+
thread = threading.Thread(target=worker)
174+
thread.start()
175+
thread.join()
176+
```

test/py_thread_callback_SUITE.erl

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
%%% @doc Common Test suite for Python ThreadPoolExecutor callback support.
1+
%%% @doc Common Test suite for Python thread callback support.
22
%%%
3-
%%% Tests that Python threads spawned via concurrent.futures.ThreadPoolExecutor
3+
%%% Tests that Python threads (both threading.Thread and ThreadPoolExecutor)
44
%%% can call erlang.call() without blocking.
55
-module(py_thread_callback_SUITE).
66

@@ -20,7 +20,10 @@
2020
threadpool_multiple_calls_test/1,
2121
threadpool_nested_callback_test/1,
2222
threadpool_error_handling_test/1,
23-
threadpool_thread_reuse_test/1
23+
threadpool_thread_reuse_test/1,
24+
simple_thread_basic_test/1,
25+
simple_thread_multiple_calls_test/1,
26+
simple_thread_concurrent_test/1
2427
]).
2528

2629
all() ->
@@ -30,7 +33,10 @@ all() ->
3033
threadpool_multiple_calls_test,
3134
threadpool_nested_callback_test,
3235
threadpool_error_handling_test,
33-
threadpool_thread_reuse_test
36+
threadpool_thread_reuse_test,
37+
simple_thread_basic_test,
38+
simple_thread_multiple_calls_test,
39+
simple_thread_concurrent_test
3440
].
3541

3642
init_per_suite(Config) ->
@@ -132,3 +138,64 @@ threadpool_thread_reuse_test(_Config) ->
132138
{ok, ThreadCount} = py:eval(Code),
133139
true = (ThreadCount =< 2),
134140
ok.
141+
142+
%%% ============================================================================
143+
%%% Simple threading.Thread Test Cases
144+
%%% ============================================================================
145+
146+
%% @doc Simple thread calling erlang.call()
147+
simple_thread_basic_test(_Config) ->
148+
py:register_function(double_it, fun([X]) -> X * 2 end),
149+
150+
%% Create a threading.Thread subclass that stores its result
151+
Code = <<"
152+
(lambda: (
153+
__import__('threading').Thread(target=lambda: setattr(__import__('sys').modules[__name__], '_result', __import__('erlang').call('double_it', 5))).start() or
154+
__import__('time').sleep(0.1) or
155+
getattr(__import__('sys').modules[__name__], '_result', None)
156+
))()
157+
">>,
158+
{ok, Result} = py:eval(Code),
159+
10 = Result,
160+
ok.
161+
162+
%% @doc Same simple thread makes multiple erlang.call() invocations
163+
simple_thread_multiple_calls_test(_Config) ->
164+
py:register_function(add_one, fun([X]) -> X + 1 end),
165+
166+
%% Thread that makes 3 sequential calls: add_one(add_one(add_one(0)))
167+
Code = <<"
168+
(lambda: (
169+
__import__('threading').Thread(target=lambda: setattr(
170+
__import__('sys').modules[__name__], '_result',
171+
__import__('erlang').call('add_one',
172+
__import__('erlang').call('add_one',
173+
__import__('erlang').call('add_one', 0)))
174+
)).start() or
175+
__import__('time').sleep(0.1) or
176+
getattr(__import__('sys').modules[__name__], '_result', None)
177+
))()
178+
">>,
179+
{ok, Result} = py:eval(Code),
180+
3 = Result,
181+
ok.
182+
183+
%% @doc Multiple simple threads calling erlang.call() concurrently
184+
simple_thread_concurrent_test(_Config) ->
185+
py:register_function(double_it, fun([X]) -> X * 2 end),
186+
187+
%% Create 5 threads, each calling double_it with different values
188+
Code = <<"
189+
(lambda: (
190+
(threads := [__import__('threading').Thread(target=lambda x=x: setattr(t, 'result', __import__('erlang').call('double_it', x))) for x in range(5) for t in [type('T', (), {'result': None})()]]),
191+
[setattr(threads[i], 'obj', type('T', (), {'result': None})()) for i in range(5)],
192+
(workers := []),
193+
[workers.append(type('Worker', (__import__('threading').Thread,), {'result': None, 'run': lambda self, x=x: setattr(self, 'result', __import__('erlang').call('double_it', x))})()) for x in range(5)],
194+
[w.start() for w in workers],
195+
[w.join() for w in workers],
196+
[w.result for w in workers]
197+
)[-1])()
198+
">>,
199+
{ok, Results} = py:eval(Code),
200+
[0, 2, 4, 6, 8] = Results,
201+
ok.

0 commit comments

Comments
 (0)