forked from tuxu/python-samplerate
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_asyncio_performance.py
More file actions
384 lines (310 loc) · 13.1 KB
/
test_asyncio_performance.py
File metadata and controls
384 lines (310 loc) · 13.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
"""
Test asyncio performance with resampling operations.
This demonstrates that CPU-bound resampling operations should use
executor-based async execution to avoid blocking the event loop,
and validates that GIL release allows true parallelism when using
ThreadPoolExecutor.
Event Loop Testing:
- Tests run with all available event loop implementations on the platform
- Windows: Tests with default asyncio and winloop (if installed)
- Unix/Linux/macOS: Tests with default asyncio and uvloop (if installed)
- Use the event_loop fixture to access the current loop type being tested
"""
import asyncio
import sys
import time
import numpy as np
import pytest
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import samplerate
def get_available_loop_types():
"""
Get list of available event loop types.
Returns:
List of available loop types: always includes "default",
plus "uvloop" (Unix only) and/or "winloop" (Windows only) if available.
"""
available = ["default"]
# uvloop only works on Unix-like systems
if sys.platform != 'win32':
try:
import uvloop
available.append("uvloop")
except ImportError:
pass
# winloop only works on Windows
if sys.platform == 'win32':
try:
import winloop
available.append("winloop")
except ImportError:
pass
return available
# Get available loop types for parameterization
AVAILABLE_LOOP_TYPES = get_available_loop_types()
@pytest.fixture(params=AVAILABLE_LOOP_TYPES)
def event_loop_policy(request):
"""
Pytest fixture that provides different event loop policies.
This allows pytest-asyncio to use uvloop, winloop, or default asyncio
based on what's available on the platform.
"""
loop_type = request.param
if loop_type == "uvloop":
import uvloop
policy = uvloop.EventLoopPolicy()
elif loop_type == "winloop":
import winloop
policy = winloop.EventLoopPolicy()
else:
policy = asyncio.DefaultEventLoopPolicy()
# Store loop type for test output
policy.loop_type_name = loop_type
return policy
@pytest.fixture
def event_loop(event_loop_policy):
"""
Override pytest-asyncio's event_loop fixture to use our custom policy.
"""
asyncio.set_event_loop_policy(event_loop_policy)
loop = event_loop_policy.new_event_loop()
# Store loop type name on the loop for access in tests
loop.loop_type_name = event_loop_policy.loop_type_name
yield loop
loop.close()
asyncio.set_event_loop_policy(None)
async def resample_async(data, ratio, converter_type, executor=None):
"""Asynchronously resample data using an executor."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
executor,
samplerate.resample,
data,
ratio,
converter_type
)
async def resampler_process_async(data, ratio, converter_type, channels, executor=None):
"""Asynchronously resample using Resampler.process()."""
def _process():
resampler = samplerate.Resampler(converter_type, channels)
return resampler.process(data, ratio, end_of_input=True)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, _process)
@pytest.mark.asyncio
@pytest.mark.parametrize("num_concurrent", [2, 4, 8])
@pytest.mark.parametrize("converter_type", ["sinc_fastest", "sinc_medium", "sinc_best"])
async def test_asyncio_threadpool_parallel(event_loop, num_concurrent, converter_type):
"""Test async execution with ThreadPoolExecutor shows parallel speedup."""
loop_type = event_loop.loop_type_name
# Create test data
fs = 44100
duration = 5.0
ratio = 2.0
num_samples = int(fs * duration)
data = np.random.randn(num_samples).astype(np.float32)
# Sequential baseline - run tasks one at a time
start = time.perf_counter()
for _ in range(num_concurrent):
samplerate.resample(data, ratio, converter_type)
sequential_time = time.perf_counter() - start
# Concurrent execution with ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=num_concurrent)
try:
start = time.perf_counter()
tasks = [
resample_async(data, ratio, converter_type, executor)
for _ in range(num_concurrent)
]
await asyncio.gather(*tasks)
parallel_time = time.perf_counter() - start
finally:
executor.shutdown(wait=True)
speedup = sequential_time / parallel_time
expected_speedup = 1.3 if num_concurrent == 2 else 1.5
print(f"\n{loop_type} loop - {converter_type} async with ThreadPoolExecutor ({num_concurrent} concurrent):")
print(f" Sequential: {sequential_time:.4f}s")
print(f" Parallel: {parallel_time:.4f}s")
print(f" Speedup: {speedup:.2f}x")
assert speedup >= expected_speedup, (
f"Async with ThreadPoolExecutor should show speedup due to GIL release. "
f"Expected {expected_speedup}x, got {speedup:.2f}x"
)
@pytest.mark.asyncio
@pytest.mark.parametrize("converter_type", ["sinc_fastest"])
async def test_asyncio_no_executor_blocks(event_loop, converter_type):
"""Test that running CPU-bound work without executor blocks the event loop."""
loop_type = event_loop.loop_type_name
# This test demonstrates the WRONG way - blocking the event loop
fs = 44100
duration = 1.0
ratio = 2.0
num_samples = int(fs * duration)
data = np.random.randn(num_samples).astype(np.float32)
# Run two tasks "concurrently" but without executor (blocks event loop)
async def blocking_resample():
# This blocks the event loop!
return samplerate.resample(data, ratio, converter_type)
start = time.perf_counter()
task1 = asyncio.create_task(blocking_resample())
task2 = asyncio.create_task(blocking_resample())
await asyncio.gather(task1, task2)
blocking_time = time.perf_counter() - start
# Run with executor (proper async)
executor = ThreadPoolExecutor(max_workers=2)
try:
start = time.perf_counter()
tasks = [
resample_async(data, ratio, converter_type, executor)
for _ in range(2)
]
await asyncio.gather(*tasks)
executor_time = time.perf_counter() - start
finally:
executor.shutdown(wait=True)
print(f"\n{loop_type} loop - {converter_type} blocking vs executor:")
print(f" Without executor (blocks loop): {blocking_time:.4f}s")
print(f" With ThreadPoolExecutor: {executor_time:.4f}s")
print(f" Improvement: {blocking_time/executor_time:.2f}x")
# Executor should be significantly faster (at least 1.3x due to parallelism)
assert executor_time < blocking_time * 0.77, (
"ThreadPoolExecutor should be faster than blocking the event loop"
)
@pytest.mark.asyncio
@pytest.mark.parametrize("num_concurrent", [2, 4])
async def test_asyncio_processpool_comparison(event_loop, num_concurrent):
"""Compare ThreadPoolExecutor vs ProcessPoolExecutor for CPU-bound work."""
loop_type = event_loop.loop_type_name
# Note: ProcessPoolExecutor should be slower due to pickling overhead
# for the large numpy arrays, even though it avoids GIL entirely
fs = 44100
duration = 2.0 # Shorter for process pool (slower due to overhead)
ratio = 2.0
converter_type = "sinc_fastest"
num_samples = int(fs * duration)
data = np.random.randn(num_samples).astype(np.float32)
# ThreadPoolExecutor (benefits from GIL release)
thread_executor = ThreadPoolExecutor(max_workers=num_concurrent)
try:
start = time.perf_counter()
tasks = [
resample_async(data, ratio, converter_type, thread_executor)
for _ in range(num_concurrent)
]
await asyncio.gather(*tasks)
thread_time = time.perf_counter() - start
finally:
thread_executor.shutdown(wait=True)
# ProcessPoolExecutor (no GIL but pickling overhead)
process_executor = ProcessPoolExecutor(max_workers=num_concurrent)
try:
start = time.perf_counter()
tasks = [
resample_async(data, ratio, converter_type, process_executor)
for _ in range(num_concurrent)
]
await asyncio.gather(*tasks)
process_time = time.perf_counter() - start
finally:
process_executor.shutdown(wait=True)
print(f"\n{loop_type} loop - {num_concurrent} concurrent tasks - ThreadPool vs ProcessPool:")
print(f" ThreadPoolExecutor: {thread_time:.4f}s")
print(f" ProcessPoolExecutor: {process_time:.4f}s")
print(f" Ratio: {process_time/thread_time:.2f}x")
# ThreadPool should be faster or comparable due to no pickling overhead
# and GIL being properly released
print(f" → ThreadPool is {'faster' if thread_time < process_time else 'slower'}")
print(f" (GIL release makes ThreadPool competitive with ProcessPool)")
@pytest.mark.asyncio
async def test_asyncio_mixed_workload(event_loop):
"""Test mixing I/O and CPU-bound operations in async context."""
loop_type = event_loop.loop_type_name
fs = 44100
duration = 1.0
ratio = 2.0
converter_type = "sinc_fastest"
num_samples = int(fs * duration)
data = np.random.randn(num_samples).astype(np.float32)
async def io_task(delay):
"""Simulate I/O operation."""
await asyncio.sleep(delay)
return f"I/O completed after {delay}s"
# Mix CPU-bound resampling with I/O tasks
executor = ThreadPoolExecutor(max_workers=2)
try:
start = time.perf_counter()
results = await asyncio.gather(
io_task(0.1), # I/O task 1
resample_async(data, ratio, converter_type, executor), # CPU task 1
io_task(0.2), # I/O task 2
resample_async(data, ratio, converter_type, executor), # CPU task 2
io_task(0.15), # I/O task 3
)
total_time = time.perf_counter() - start
finally:
executor.shutdown(wait=True)
print(f"\n{loop_type} loop - Mixed I/O and CPU workload:")
print(f" Total time: {total_time:.4f}s")
print(f" Tasks completed: {len(results)}")
# Should complete faster than sequential execution
# I/O: 0.1 + 0.2 + 0.15 = 0.45s
# CPU: ~0.05s * 2 = ~0.1s
# Sequential would be ~0.55s, parallel should be ~0.2-0.25s
assert total_time < 0.35, (
f"Mixed workload should complete faster than 0.35s, got {total_time:.4f}s"
)
@pytest.mark.asyncio
async def test_asyncio_performance_report():
"""Generate comprehensive async performance report."""
print("\n" + "="*70)
print("Asyncio Performance Report")
print("="*70)
converters = ["sinc_fastest", "sinc_medium", "sinc_best"]
concurrent_counts = [1, 2, 4]
fs = 44100
duration = 5.0
ratio = 2.0
num_samples = int(fs * duration)
data = np.random.randn(num_samples).astype(np.float32)
print(f"\nTest Configuration:")
print(f" Sample rate: {fs} Hz")
print(f" Duration: {duration} seconds ({num_samples} samples)")
print(f" Conversion ratio: {ratio}x")
print(f" Executor: ThreadPoolExecutor")
for converter in converters:
print(f"\n{'-'*70}")
print(f"Converter: {converter}")
print(f"{'-'*70}")
baseline_time = None
for num_concurrent in concurrent_counts:
if num_concurrent == 1:
# Single task baseline
executor = ThreadPoolExecutor(max_workers=1)
try:
start = time.perf_counter()
await resample_async(data, ratio, converter, executor)
baseline_time = time.perf_counter() - start
finally:
executor.shutdown(wait=True)
print(f" 1 concurrent task (baseline):")
print(f" Execution time: {baseline_time:.4f}s")
else:
# Multiple concurrent tasks
executor = ThreadPoolExecutor(max_workers=num_concurrent)
try:
start = time.perf_counter()
tasks = [
resample_async(data, ratio, converter, executor)
for _ in range(num_concurrent)
]
await asyncio.gather(*tasks)
parallel_time = time.perf_counter() - start
finally:
executor.shutdown(wait=True)
sequential_time = baseline_time * num_concurrent
speedup = sequential_time / parallel_time
efficiency = (speedup / num_concurrent) * 100
print(f" {num_concurrent} concurrent tasks:")
print(f" Parallel execution time: {parallel_time:.4f}s")
print(f" Equivalent sequential time: {sequential_time:.4f}s ({num_concurrent} × {baseline_time:.4f}s)")
print(f" Speedup: {speedup:.2f}x")
print(f" Parallel efficiency: {efficiency:.1f}%")