-
Notifications
You must be signed in to change notification settings - Fork 114
Expand file tree
/
Copy pathtest_apm.py
More file actions
80 lines (64 loc) · 2.89 KB
/
test_apm.py
File metadata and controls
80 lines (64 loc) · 2.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import os
import wave
import numpy as np
from livekit.rtc import AudioProcessingModule, AudioFrame
# Test fixture directory
FIXTURES_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "fixtures")
def test_audio_processing():
sample_rate = 48000
num_channels = 1
frames_per_chunk = sample_rate // 100
capture_wav = os.path.join(FIXTURES_DIR, "test_echo_capture.wav")
render_wav = os.path.join(FIXTURES_DIR, "test_echo_render.wav")
output_wav = os.path.join(FIXTURES_DIR, "test_processed.wav")
# Initialize APM with echo cancellation enabled
apm = AudioProcessingModule(
echo_cancellation=True,
noise_suppression=True,
high_pass_filter=True,
auto_gain_control=True,
)
print("APM Internal Handle:", apm._ffi_handle)
with (
wave.open(capture_wav, "rb") as wf_in_cap,
wave.open(render_wav, "rb") as wf_in_rend,
wave.open(output_wav, "wb") as wf_out,
):
assert wf_in_cap.getnchannels() == num_channels, "Capture file must be mono."
assert wf_in_rend.getnchannels() == num_channels, "Render file must be mono."
assert wf_in_cap.getframerate() == sample_rate, "Capture file must be 48 kHz."
assert wf_in_rend.getframerate() == sample_rate, "Render file must be 48 kHz."
sampwidth = wf_in_cap.getsampwidth()
wf_out.setnchannels(num_channels)
wf_out.setsampwidth(sampwidth)
wf_out.setframerate(sample_rate)
while True:
capture_bytes = wf_in_cap.readframes(frames_per_chunk)
render_bytes = wf_in_rend.readframes(frames_per_chunk)
if not capture_bytes and not render_bytes:
break
# Convert bytes to numpy arrays
capture_data = np.frombuffer(capture_bytes, dtype=np.int16)
render_data = np.frombuffer(render_bytes, dtype=np.int16)
# Pad if necessary
if len(capture_data) < frames_per_chunk:
capture_data = np.pad(capture_data, (0, frames_per_chunk - len(capture_data)))
if len(render_data) < frames_per_chunk:
render_data = np.pad(render_data, (0, frames_per_chunk - len(render_data)))
capture_frame = AudioFrame(
data=capture_data.tobytes(),
sample_rate=sample_rate,
num_channels=num_channels,
samples_per_channel=frames_per_chunk,
)
render_frame = AudioFrame(
data=render_data.tobytes(),
sample_rate=sample_rate,
num_channels=num_channels,
samples_per_channel=frames_per_chunk,
)
# Process both streams
apm.process_reverse_stream(render_frame)
apm.process_stream(capture_frame)
wf_out.writeframes(capture_frame.data.tobytes())
print("Done! Processed audio saved to:", output_wav)