diff --git a/src/om1_speech/audio/audio_input_stream.py b/src/om1_speech/audio/audio_input_stream.py index fb836f0..cf1a370 100644 --- a/src/om1_speech/audio/audio_input_stream.py +++ b/src/om1_speech/audio/audio_input_stream.py @@ -5,9 +5,10 @@ import json import logging import queue +import struct import threading import time -from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union +from typing import Any, Callable, Generator, List, Optional, Tuple, Union import pyaudio import zenoh @@ -416,7 +417,7 @@ def fill_buffer_remote(self, data: str): self._rate = rate self._language_code = language_code - def generator(self) -> Generator[Dict[str, Union[bytes, int]], None, None]: + def generator(self) -> Generator[bytes, None, None]: """ Generates a stream of audio data chunk. @@ -437,17 +438,17 @@ def generator(self) -> Generator[Dict[str, Union[bytes, int]], None, None]: if self._is_tts_active: continue - data = [chunk] - - response = { - "audio": base64.b64encode(b"".join(data)).decode("utf-8"), + config = { "rate": self._rate, "language_code": self._language_code, "alternative_language_codes": self._alternative_language_codes, - "timestamp": int(time.time()), } + + config["timestamp"] = int(time.time() * 1000) + header = json.dumps(config).encode("utf-8") + response = struct.pack(">I", len(header)) + header + chunk for audio_callback in self._audio_data_callbacks: - audio_callback(json.dumps(response)) + audio_callback(response) yield response diff --git a/src/om1_speech/audio/audio_rtsp_input_stream.py b/src/om1_speech/audio/audio_rtsp_input_stream.py index ec9a3cd..ad37bb4 100644 --- a/src/om1_speech/audio/audio_rtsp_input_stream.py +++ b/src/om1_speech/audio/audio_rtsp_input_stream.py @@ -2,10 +2,11 @@ import json import logging import multiprocessing as mp +import struct import threading import time from queue import Empty, Full -from typing import Callable, Dict, Generator, List, Optional, Union +from typing import Callable, Generator, List, Optional, Union import av import numpy as np @@ -414,7 +415,7 @@ def fill_buffer_remote(self, data: str): self._rate = rate self._language_code = language_code - def generator(self) -> Generator[Dict[str, Union[bytes, int]], None, None]: + def generator(self) -> Generator[bytes, None, None]: """ Generates a stream of audio data chunk. @@ -435,17 +436,17 @@ def generator(self) -> Generator[Dict[str, Union[bytes, int]], None, None]: if self._is_tts_active: continue - data = [chunk] - - response = { - "audio": base64.b64encode(b"".join(data)).decode("utf-8"), + config = { "rate": self._rate, "language_code": self._language_code, "alternative_language_codes": self._alternative_language_codes, - "timestamp": int(time.time()), } + + config["timestamp"] = int(time.time() * 1000) + header = json.dumps(config).encode("utf-8") + response = struct.pack(">I", len(header)) + header + chunk for audio_callback in self._audio_data_callbacks: - audio_callback(json.dumps(response)) + audio_callback(response) yield response diff --git a/tests/om1_speech/audio/test_audio_input_stream.py b/tests/om1_speech/audio/test_audio_input_stream.py index f3adb9b..f202f0a 100644 --- a/tests/om1_speech/audio/test_audio_input_stream.py +++ b/tests/om1_speech/audio/test_audio_input_stream.py @@ -1,6 +1,7 @@ import base64 import json import queue +import struct import threading from unittest.mock import MagicMock, Mock, patch @@ -146,7 +147,7 @@ def collect_data(): audio_stream._buff.put(None) assert len(collected_chunks) > 0 - assert all(isinstance(data, dict) for data in collected_chunks) + assert all(isinstance(data, bytes) for data in collected_chunks) def test_stop(audio_stream, mock_pyaudio): @@ -182,16 +183,17 @@ def test_callback(data): # Process one chunk through generator next(stream.generator()) - # Verify callback was called with correct data - assert callback_data == json.dumps( - { - "audio": base64.b64encode(test_data).decode("utf-8"), - "rate": 16000, - "language_code": "en-US", - "alternative_language_codes": [], - "timestamp": fixed_time, - } - ) + # Verify callback was called with binary header format + assert isinstance(callback_data, bytes) + header_len = struct.unpack(">I", callback_data[:4])[0] + header = json.loads(callback_data[4 : 4 + header_len]) + audio = callback_data[4 + header_len :] + + assert header["rate"] == 16000 + assert header["language_code"] == "en-US" + assert header["alternative_language_codes"] == [] + assert header["timestamp"] == int(fixed_time * 1000) + assert audio == test_data stream.stop() @@ -258,10 +260,15 @@ def test_generator_with_alternative_languages(mock_pyaudio): generated = next(stream.generator()) - assert generated["language_code"] == "en-GB" - assert generated["alternative_language_codes"] == alt_langs - assert generated["audio"] == base64.b64encode(test_data).decode("utf-8") - assert generated["rate"] == 16000 + assert isinstance(generated, bytes) + header_len = struct.unpack(">I", generated[:4])[0] + header = json.loads(generated[4 : 4 + header_len]) + audio = generated[4 + header_len :] + + assert header["language_code"] == "en-GB" + assert header["alternative_language_codes"] == alt_langs + assert header["rate"] == 16000 + assert audio == test_data stream.stop() @@ -318,15 +325,15 @@ def test_callback(data): next(stream.generator()) - expected_data = json.dumps( - { - "audio": base64.b64encode(test_data).decode("utf-8"), - "rate": 16000, - "language_code": "es-MX", - "alternative_language_codes": alt_langs, - "timestamp": fixed_time, - } - ) - assert callback_data == expected_data + assert isinstance(callback_data, bytes) + header_len = struct.unpack(">I", callback_data[:4])[0] + header = json.loads(callback_data[4 : 4 + header_len]) + audio = callback_data[4 + header_len :] + + assert header["rate"] == 16000 + assert header["language_code"] == "es-MX" + assert header["alternative_language_codes"] == alt_langs + assert header["timestamp"] == int(fixed_time * 1000) + assert audio == test_data stream.stop()