diff --git a/.gitignore b/.gitignore index 4cacd56..3357dc9 100644 --- a/.gitignore +++ b/.gitignore @@ -62,6 +62,11 @@ data/ *.log .DS_Store syncpi.sh +RULES.md +PROBLEM_STATEMENT.md +PR_DESCRIPTION.md +.claude/CORE_FINDINGS.md +GITHUB_PR.md # Docker /data diff --git a/repeater/config_manager.py b/repeater/config_manager.py index 56cb05e..ba8baed 100644 --- a/repeater/config_manager.py +++ b/repeater/config_manager.py @@ -48,28 +48,6 @@ def _sync_repeater_handler_radio_config(self, radio_cfg: Dict[str, Any]) -> None } ) - def _kiss_transport_restart_required(self) -> bool: - radio = getattr(self.daemon, "radio", None) - kiss_cfg = self.config.get("kiss", {}) or {} - if radio is None or not kiss_cfg: - return False - - runtime_port = getattr(radio, "port", None) - runtime_baudrate = getattr(radio, "baudrate", None) - - configured_port = kiss_cfg.get("port") - configured_baudrate = kiss_cfg.get("baud_rate") - - if configured_port and runtime_port and str(configured_port) != str(runtime_port): - logger.info("KISS port change detected; service restart required") - return True - - if configured_baudrate and runtime_baudrate and int(configured_baudrate) != int(runtime_baudrate): - logger.info("KISS baud rate change detected; service restart required") - return True - - return False - def _apply_live_radio_config(self) -> bool: radio = getattr(self.daemon, "radio", None) if radio is None: @@ -79,64 +57,18 @@ def _apply_live_radio_config(self) -> bool: radio_cfg = self._get_live_radio_snapshot() try: - if hasattr(radio, "configure_radio"): - if hasattr(radio, "radio_config") and isinstance(radio.radio_config, dict): - radio.radio_config.update(radio_cfg) + if hasattr(radio, "radio_config") and isinstance(radio.radio_config, dict): + radio.radio_config.update(radio_cfg) - applied = radio.configure_radio( - frequency=radio_cfg["frequency"], - bandwidth=radio_cfg["bandwidth"], - spreading_factor=radio_cfg["spreading_factor"], - coding_rate=radio_cfg["coding_rate"], - ) - if not applied: - logger.warning("Live radio reconfiguration failed") - return False - else: - current_frequency = getattr(radio, "frequency", None) - current_bandwidth = getattr(radio, "bandwidth", None) - current_spreading_factor = getattr(radio, "spreading_factor", None) - current_coding_rate = getattr(radio, "coding_rate", None) - current_tx_power = getattr(radio, "tx_power", None) - - if ( - current_frequency != radio_cfg["frequency"] - and hasattr(radio, "set_frequency") - and not radio.set_frequency(radio_cfg["frequency"]) - ): - return False - - if ( - current_tx_power != radio_cfg["tx_power"] - and hasattr(radio, "set_tx_power") - and not radio.set_tx_power(radio_cfg["tx_power"]) - ): - return False - - coding_rate_changed = current_coding_rate != radio_cfg["coding_rate"] - if coding_rate_changed: - setattr(radio, "coding_rate", radio_cfg["coding_rate"]) - - if current_spreading_factor != radio_cfg["spreading_factor"]: - if not hasattr(radio, "set_spreading_factor"): - return False - if not radio.set_spreading_factor(radio_cfg["spreading_factor"]): - return False - - if current_bandwidth != radio_cfg["bandwidth"]: - if not hasattr(radio, "set_bandwidth"): - return False - if not radio.set_bandwidth(radio_cfg["bandwidth"]): - return False - elif coding_rate_changed: - if hasattr(radio, "set_bandwidth"): - if not radio.set_bandwidth(radio_cfg["bandwidth"]): - return False - elif hasattr(radio, "set_spreading_factor"): - if not radio.set_spreading_factor(radio_cfg["spreading_factor"]): - return False - else: - return False + applied = radio.configure_radio( + frequency=radio_cfg["frequency"], + bandwidth=radio_cfg["bandwidth"], + spreading_factor=radio_cfg["spreading_factor"], + coding_rate=radio_cfg["coding_rate"], + ) + if not applied: + logger.warning("Live radio reconfiguration failed") + return False self._sync_repeater_handler_radio_config(radio_cfg) logger.info("Applied live radio configuration to running daemon") @@ -226,7 +158,7 @@ def live_update_daemon(self, sections: Optional[List[str]] = None) -> bool: logger.info("Reloaded AdvertHelper config") # Re-apply dispatcher path hash mode when mesh section changed - if 'mesh' in sections and self.daemon and hasattr(self.daemon, 'dispatcher'): + if 'mesh' in sections and self.daemon and hasattr(self.daemon, 'dispatcher') and self.daemon.dispatcher: mesh_cfg = self.daemon.config.get("mesh", {}) path_hash_mode = mesh_cfg.get("path_hash_mode", 0) if path_hash_mode not in (0, 1, 2): @@ -237,12 +169,14 @@ def live_update_daemon(self, sections: Optional[List[str]] = None) -> bool: self.daemon.dispatcher.set_default_path_hash_mode(path_hash_mode) logger.info(f"Reloaded path hash mode: mesh.path_hash_mode={path_hash_mode}") - if 'radio_type' in sections: - logger.info("radio_type change detected; service restart required") - live_update_ok = False - - if 'kiss' in sections and self._kiss_transport_restart_required(): - live_update_ok = False + if 'radio_type' in sections or 'kiss' in sections: + radio_manager = getattr(self.daemon, "radio_manager", None) + if radio_manager: + logger.info("Radio transport config changed; triggering reconnect") + radio_manager.notify_config_changed() + else: + logger.info("Radio transport config changed; service restart required") + live_update_ok = False if 'radio' in sections: live_update_ok = self._apply_live_radio_config() and live_update_ok diff --git a/repeater/engine.py b/repeater/engine.py index 6bf0aaf..5bec50a 100644 --- a/repeater/engine.py +++ b/repeater/engine.py @@ -1392,6 +1392,16 @@ def reload_runtime_config(self): except Exception as e: logger.error(f"Error reloading runtime config: {e}") + async def stop(self) -> None: + """Awaitable teardown — cancels background task and waits for it to finish.""" + if self._background_task and not self._background_task.done(): + self._background_task.cancel() + try: + await self._background_task + except asyncio.CancelledError: + pass + logger.info("Engine stopped") + def cleanup(self): if self._background_task and not self._background_task.done(): self._background_task.cancel() diff --git a/repeater/main.py b/repeater/main.py index 7087738..1645737 100644 --- a/repeater/main.py +++ b/repeater/main.py @@ -8,7 +8,8 @@ import time from repeater.companion.utils import validate_companion_node_name, normalize_companion_identity_key -from repeater.config import get_radio_for_board, load_config, save_config +from repeater.config import load_config, save_config +from repeater.radio_manager import RadioManager from repeater.config_manager import ConfigManager from repeater.data_acquisition.glass_handler import GlassHandler from repeater.data_acquisition.gps_service import GPSService @@ -39,6 +40,7 @@ def __init__(self, config: dict, radio=None): self.dispatcher = None self.repeater_handler = None self.local_hash = None + self.local_hash_bytes = None self.local_identity = None self.identity_manager = None self.config_manager = None @@ -59,6 +61,8 @@ def __init__(self, config: dict, radio=None): self.companion_frame_servers: list = [] self._shutdown_started = False self._main_task = None + self.radio_manager = None + self._dispatcher_task = None log_level = config.get("logging", {}).get("level", "INFO") logging.basicConfig( @@ -96,47 +100,45 @@ async def initialize(self): logger.info(f"System Network IP: {self.network_ip}") #----------------------------------------------- - if self.radio is None: - radio_type = self.config.get("radio_type", "sx1262") - logger.info(f"Initializing radio hardware... (radio_type={radio_type})") - try: - self.radio = get_radio_for_board(self.config) + self.sensor_manager = SensorManager(self.config) + self.sensor_manager.start() + if self.sensor_manager.get_summary().get("loaded", 0): + logger.info("Sensor manager initialized") + else: + logger.info("No configured sensors loaded") - # KISS modem: schedule RX callbacks on the event loop for thread safety - if hasattr(self.radio, "set_event_loop"): - self.radio.set_event_loop(asyncio.get_running_loop()) + self.gps_service = GPSService( + self.config, + location_update_callback=self._update_repeater_location_from_gps, + ) + self.gps_service.start() + if self.config.get("gps", {}).get("enabled", False): + logger.info("GPS diagnostics initialized") + else: + logger.info("GPS diagnostics disabled") - if hasattr(self.radio, "set_custom_cad_thresholds"): - # Load CAD settings from config, with defaults - cad_config = self.config.get("radio", {}).get("cad", {}) - peak_threshold = cad_config.get("peak_threshold", 23) - min_threshold = cad_config.get("min_threshold", 11) + from pymc_core import LocalIdentity + identity_key = self.config.get("repeater", {}).get("identity_key") + if not identity_key: + logger.error("No identity key found in configuration. Cannot init repeater.") + raise RuntimeError("Identity key is required for repeater operation") - self.radio.set_custom_cad_thresholds(peak=peak_threshold, min_val=min_threshold) - logger.info( - f"CAD thresholds set from config: peak={peak_threshold}, min={min_threshold}" - ) - else: - logger.warning("Radio does not support CAD configuration") - - if hasattr(self.radio, "get_frequency"): - logger.info(f"Radio config - Freq: {self.radio.get_frequency():.1f}MHz") - if hasattr(self.radio, "get_spreading_factor"): - logger.info(f"Radio config - SF: {self.radio.get_spreading_factor()}") - if hasattr(self.radio, "get_bandwidth"): - logger.info(f"Radio config - BW: {self.radio.get_bandwidth()}kHz") - if hasattr(self.radio, "get_coding_rate"): - logger.info(f"Radio config - CR: {self.radio.get_coding_rate()}") - if hasattr(self.radio, "get_tx_power"): - logger.info(f"Radio config - TX Power: {self.radio.get_tx_power()}dBm") - - logger.info("Radio hardware initialized") - except Exception as e: - logger.error(f"Failed to initialize radio hardware: {e}") - raise RuntimeError("Repeater requires real LoRa hardware") from e + local_identity = LocalIdentity(seed=identity_key) + self.local_identity = local_identity + + pubkey = local_identity.get_public_key() + self.local_hash = pubkey[0] + self.local_hash_bytes = bytes(pubkey[:3]) + + logger.info(f"Local identity set: {local_identity.get_address_bytes().hex()}") + local_hash_hex = f"0x{self.local_hash:02x}" + logger.info(f"Local node hash (from identity): {local_hash_hex}") + + async def _on_radio_connected(self, radio) -> None: + """Called by RadioManager after hardware initialises. Sets up Dispatcher and all helpers.""" + self.radio = radio try: - from pymc_core import LocalIdentity from pymc_core.node.dispatcher import Dispatcher self.dispatcher = Dispatcher(self.radio) @@ -146,23 +148,7 @@ async def initialize(self): self.identity_manager = IdentityManager(self.config) logger.info("Identity manager initialized") - # Set up default repeater identity (not managed by identity manager) - identity_key = self.config.get("repeater", {}).get("identity_key") - if not identity_key: - logger.error("No identity key found in configuration. Cannot init repeater.") - raise RuntimeError("Identity key is required for repeater operation") - - local_identity = LocalIdentity(seed=identity_key) - self.local_identity = local_identity - self.dispatcher.local_identity = local_identity - - pubkey = local_identity.get_public_key() - self.local_hash = pubkey[0] - self.local_hash_bytes = bytes(pubkey[:3]) - - logger.info(f"Local identity set: {local_identity.get_address_bytes().hex()}") - local_hash_hex = f"0x{self.local_hash:02x}" - logger.info(f"Local node hash (from identity): {local_hash_hex}") + self.dispatcher.local_identity = self.local_identity # Load additional identities from config (e.g., room servers) await self._load_additional_identities() @@ -266,23 +252,6 @@ async def initialize(self): ) logger.info("Config manager initialized") - self.sensor_manager = SensorManager(self.config) - self.sensor_manager.start() - if self.sensor_manager.get_summary().get("loaded", 0): - logger.info("Sensor manager initialized") - else: - logger.info("No configured sensors loaded") - - self.gps_service = GPSService( - self.config, - location_update_callback=self._update_repeater_location_from_gps, - ) - self.gps_service.start() - if self.config.get("gps", {}).get("enabled", False): - logger.info("GPS diagnostics initialized") - else: - logger.info("GPS diagnostics disabled") - # Initialize text message helper with per-identity ACLs self.text_helper = TextHelper( identity_manager=self.identity_manager, @@ -381,6 +350,89 @@ async def initialize(self): logger.error(f"Failed to initialize dispatcher: {e}") raise + self._dispatcher_task = asyncio.create_task(self._run_dispatcher(), name="dispatcher") + + async def _run_dispatcher(self) -> None: + """Run dispatcher.run_forever() in a task; signal RadioManager when it exits.""" + try: + await self.dispatcher.run_forever() + except asyncio.CancelledError: + raise + except Exception as e: + logger.error(f"Dispatcher error: {e}") + finally: + if not self._shutdown_started and self.radio_manager: + self.radio_manager.signal_disconnected() + + async def _on_radio_disconnected(self) -> None: + """Called by RadioManager when the radio is lost mid-run. Tears down radio-dependent subsystems.""" + if self._dispatcher_task and not self._dispatcher_task.done(): + self._dispatcher_task.cancel() + try: + await self._dispatcher_task + except asyncio.CancelledError: + pass + self._dispatcher_task = None + + for frame_server in getattr(self, "companion_frame_servers", []): + try: + await frame_server.stop() + except Exception as e: + logger.warning(f"Companion frame server stop error: {e}") + self.companion_frame_servers = [] + + if hasattr(self, "companion_bridges"): + for bridge in self.companion_bridges.values(): + if hasattr(bridge, "stop"): + try: + await bridge.stop() + except Exception as e: + logger.warning(f"Companion bridge stop error: {e}") + self.companion_bridges = {} + + if self.router: + try: + await self.router.stop() + except Exception as e: + logger.warning(f"Error stopping router: {e}") + self.router = None + + if self.glass_handler: + try: + await self.glass_handler.stop() + except Exception as e: + logger.warning(f"Error stopping Glass handler: {e}") + self.glass_handler = None + + if self.repeater_handler: + try: + await self.repeater_handler.stop() + except Exception as e: + logger.warning(f"Error stopping engine: {e}") + try: + if self.repeater_handler.storage: + await asyncio.wait_for( + asyncio.to_thread(self.repeater_handler.storage.close), timeout=5 + ) + except asyncio.TimeoutError: + logger.warning("Timeout closing storage on disconnect") + except Exception as e: + logger.warning(f"Error closing storage on disconnect: {e}") + + self.dispatcher = None + self.repeater_handler = None + self.trace_helper = None + self.advert_helper = None + self.discovery_helper = None + self.login_helper = None + self.text_helper = None + self.path_helper = None + self.protocol_request_helper = None + self.config_manager = None + self.identity_manager = None + self.radio = None + logger.info("Radio-dependent subsystems torn down, awaiting reconnect") + async def _load_additional_identities(self): from pymc_core import LocalIdentity @@ -944,6 +996,9 @@ def get_stats(self) -> dict: if self.sensor_manager: stats["sensors"] = self.sensor_manager.get_summary() + if self.radio_manager: + stats["radio"] = self.radio_manager.get_status() + return stats async def _get_companion_stats(self, stats_type: int) -> dict: @@ -1117,7 +1172,6 @@ def _signal_shutdown(self, sig, loop): return logger.info(f"Received signal {sig.name}, shutting down...") loop.create_task(self._shutdown()) - # Cancel run() so dispatcher.run_forever() unwinds cleanly. if self._main_task and not self._main_task.done(): self._main_task.cancel() @@ -1127,6 +1181,21 @@ async def _shutdown(self): return self._shutdown_started = True + # Cancel dispatcher task before stopping RadioManager + if self._dispatcher_task and not self._dispatcher_task.done(): + self._dispatcher_task.cancel() + try: + await self._dispatcher_task + except asyncio.CancelledError: + pass + + # Stop RadioManager — cleans up radio hardware and cancels the retry loop + if self.radio_manager: + try: + await self.radio_manager.stop() + except Exception as e: + logger.warning(f"Error stopping radio manager: {e}") + # Stop companion frame servers first to close client sockets and child workers. for frame_server in getattr(self, "companion_frame_servers", []): try: @@ -1191,22 +1260,6 @@ async def _shutdown(self): except Exception as e: logger.warning(f"Error closing storage: {e}") - # Release radio resources - if self.radio and hasattr(self.radio, "cleanup"): - try: - self.radio.cleanup() - except Exception as e: - logger.warning(f"Error cleaning up radio: {e}") - - # Release CH341 USB device if in use - try: - if self.config.get("radio_type", "sx1262").lower() == "sx1262_ch341": - from pymc_core.hardware.ch341.ch341_async import CH341Async - - CH341Async.reset_instance() - except Exception as e: - logger.debug(f"CH341 reset skipped/failed: {e}") - # Do not force-stop the event loop here; asyncio.run() owns loop lifecycle. @staticmethod @@ -1243,7 +1296,7 @@ async def run(self): try: await self.initialize() - # Start HTTP stats server + # Start HTTP server unconditionally — radio connection happens asynchronously http_port = self.config.get("http", {}).get("port", 8000) http_host = self.config.get("http", {}).get("host", "0.0.0.0") @@ -1279,29 +1332,18 @@ async def run(self): except Exception as e: logger.error(f"Failed to start HTTP server: {e}") - # Run dispatcher (handles RX/TX via pymc_core) - try: - await self.dispatcher.run_forever() - except asyncio.CancelledError: - logger.info("Dispatcher loop cancelled for shutdown") - except KeyboardInterrupt: - logger.info("Shutting down...") - for frame_server in getattr(self, "companion_frame_servers", []): - try: - await frame_server.stop() - except Exception as e: - logger.debug(f"Companion frame server stop: {e}") - if hasattr(self, "companion_bridges"): - for bridge in self.companion_bridges.values(): - if hasattr(bridge, "stop"): - try: - await bridge.stop() - except Exception as e: - logger.debug(f"Companion bridge stop: {e}") - if self.router: - await self.router.stop() - if self.http_server: - self.http_server.stop() + # Start RadioManager — radio connection and retry loop runs as a background task + self.radio_manager = RadioManager( + get_config=lambda: self.config, + on_connected=self._on_radio_connected, + on_disconnected=self._on_radio_disconnected, + ) + self.radio_manager.start() + + # Block until a shutdown signal cancels this task + await asyncio.Event().wait() + except asyncio.CancelledError: + logger.info("Main task cancelled for shutdown") finally: await self._shutdown() diff --git a/repeater/radio_manager.py b/repeater/radio_manager.py new file mode 100644 index 0000000..1dfe3a2 --- /dev/null +++ b/repeater/radio_manager.py @@ -0,0 +1,253 @@ +import asyncio +import logging +import time +from typing import Any, Callable, Dict, Optional + +logger = logging.getLogger("RadioManager") + +# Retry delays in seconds: 5, 10, 30, 60, 60, 60, ... +_RETRY_DELAYS = [5, 10, 30, 60] + + +class RadioManager: + """ + Manages radio hardware lifecycle: connect, retry on failure, report status. + + Runs as a single asyncio task. The synchronous get_radio_for_board() call is + dispatched via run_in_executor so it never blocks the event loop — consistent + with engine._record_noise_floor_async(). + + on_connected(radio) is called (awaited) when hardware initialises successfully. + on_disconnected() is called (awaited) when the radio is lost mid-run so the + daemon can tear down the dispatcher and helpers before RadioManager retries. + + get_config is called on every attempt so that UI-driven config changes are + picked up without a service restart. + """ + + def __init__( + self, + get_config: Callable[[], Dict[str, Any]], + on_connected: Callable, + on_disconnected: Callable, + ) -> None: + self._get_config = get_config + self._on_connected = on_connected + self._on_disconnected = on_disconnected + + self._task: Optional[asyncio.Task] = None + self._stop_event = asyncio.Event() + self._retry_now_event = asyncio.Event() + self._disconnected_event = asyncio.Event() + + self._status: str = "stopped" + self._radio_type: Optional[str] = None + self._error: Optional[str] = None + self._connected_at: Optional[float] = None + self._last_error_at: Optional[float] = None + self._retry_count: int = 0 + self._retry_delay: int = 0 + self._current_radio: Any = None + + # ------------------------------------------------------------------ + # Public interface + # ------------------------------------------------------------------ + + def start(self) -> None: + """Spawn the connection loop as an asyncio task.""" + self._stop_event.clear() + self._task = asyncio.create_task(self._connect_loop(), name="radio-manager") + + async def stop(self) -> None: + """Signal the loop to stop, await task completion, and clean up hardware.""" + self._stop_event.set() + self._retry_now_event.set() # unblock any backoff wait + self._disconnected_event.set() # unblock any run_forever wait + if self._task and not self._task.done(): + self._task.cancel() + try: + await self._task + except (asyncio.CancelledError, Exception): + pass + self._cleanup_radio() + self._status = "stopped" + + def notify_config_changed(self) -> None: + """Reset backoff and retry immediately — call after a config save.""" + self._retry_count = 0 + self._retry_delay = 0 + self._retry_now_event.set() + if self._status == "connected": + self._disconnected_event.set() + + def signal_disconnected(self) -> None: + """ + Called by the daemon when dispatcher.run_forever() exits unexpectedly, + telling RadioManager the radio is gone and it should re-enter the retry loop. + """ + self._disconnected_event.set() + + def get_status(self) -> Dict[str, Any]: + return { + "status": self._status, + "type": self._radio_type, + "error": self._error, + "connected_at": self._connected_at, + "last_error_at": self._last_error_at, + "retry_count": self._retry_count, + "retry_delay_seconds": self._retry_delay, + } + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _cleanup_radio(self) -> None: + if self._current_radio: + try: + self._current_radio.cleanup() + except Exception as e: + logger.debug("Radio cleanup error: %s", e) + if self._radio_type == "sx1262_ch341": + try: + from pymc_core.hardware.ch341.ch341_async import CH341Async + CH341Async.reset_instance() + except Exception as e: + logger.debug("CH341 reset skipped/failed: %s", e) + self._current_radio = None + + async def _connect_loop(self) -> None: + from repeater.config import get_radio_for_board + + loop = asyncio.get_running_loop() + + while not self._stop_event.is_set(): + config = self._get_config() + self._radio_type = config.get("radio_type", "sx1262") + self._status = "connecting" + self._retry_now_event.clear() + + logger.info( + "Attempting radio connection (type=%s, attempt=%d)", + self._radio_type, + self._retry_count + 1, + ) + + try: + radio = await loop.run_in_executor(None, get_radio_for_board, config) + except asyncio.CancelledError: + raise + except Exception as e: + logger.error( + "Radio connection failed: %s. Retrying in %ds (attempt %d)", + e, + _RETRY_DELAYS[min(self._retry_count, len(_RETRY_DELAYS) - 1)], + self._retry_count + 1, + ) + await self._enter_backoff(str(e)) + continue + + self._current_radio = radio + self._apply_post_init_config(radio, config, loop) + + self._status = "connected" + self._connected_at = time.time() + self._error = None + self._retry_count = 0 + self._retry_delay = 0 + logger.info("Radio connected (type=%s)", self._radio_type) + + try: + await self._on_connected(radio) + except asyncio.CancelledError: + raise + except Exception as e: + logger.error("Daemon failed to initialise after radio connection: %s", e) + self._cleanup_radio() + await self._enter_backoff(str(e)) + continue + + # Wait until the radio dies, the daemon signals disconnect, or we are stopped + self._disconnected_event.clear() + await self._wait_for_disconnect(radio) + + if self._stop_event.is_set(): + break + + # Radio lost mid-run — notify daemon to tear down, then retry + logger.warning("Radio disconnected — notifying daemon, will retry") + + try: + await self._on_disconnected() + except asyncio.CancelledError: + raise + except Exception as e: + logger.warning("Daemon disconnect callback error: %s", e) + + self._cleanup_radio() + await self._enter_backoff("Radio disconnected") + + async def _enter_backoff(self, error: str = "") -> None: + """Record error state and wait out the current retry delay.""" + if error: + self._error = error + self._last_error_at = time.time() + self._status = "error" + delay = _RETRY_DELAYS[min(self._retry_count, len(_RETRY_DELAYS) - 1)] + self._retry_delay = delay + self._retry_count += 1 + await self._interruptible_wait(delay) + + def _apply_post_init_config(self, radio, config: dict, loop) -> None: + """Push event-loop reference and CAD thresholds into the radio, then log RF parameters.""" + if hasattr(radio, "set_event_loop"): + radio.set_event_loop(loop) + if hasattr(radio, "set_custom_cad_thresholds"): + cad_config = config.get("radio", {}).get("cad", {}) + radio.set_custom_cad_thresholds( + peak=cad_config.get("peak_threshold", 23), + min_val=cad_config.get("min_threshold", 11), + ) + if hasattr(radio, "get_frequency"): + logger.info("Radio config - Freq: %.1fMHz", radio.get_frequency()) + if hasattr(radio, "get_spreading_factor"): + logger.info("Radio config - SF: %s", radio.get_spreading_factor()) + if hasattr(radio, "get_bandwidth"): + logger.info("Radio config - BW: %skHz", radio.get_bandwidth()) + if hasattr(radio, "get_coding_rate"): + logger.info("Radio config - CR: %s", radio.get_coding_rate()) + if hasattr(radio, "get_tx_power"): + logger.info("Radio config - TX Power: %sdBm", radio.get_tx_power()) + + async def _interruptible_wait(self, delay: int) -> None: + """Wait for delay seconds, but return early if stop or retry-now is signalled.""" + try: + await asyncio.wait_for( + asyncio.shield(self._retry_now_event.wait()), + timeout=delay, + ) + self._retry_now_event.clear() + except asyncio.TimeoutError: + pass + + async def _wait_for_disconnect(self, radio) -> None: + """Block until radio dies, disconnected_event fires, or stop_event fires.""" + if hasattr(radio, "is_connected"): + while not self._stop_event.is_set() and not self._disconnected_event.is_set(): + if not radio.is_connected: + break + await asyncio.sleep(1.0) + else: + stop_wait = asyncio.create_task(self._stop_event.wait()) + disc_wait = asyncio.create_task(self._disconnected_event.wait()) + try: + done, pending = await asyncio.wait( + [stop_wait, disc_wait], + return_when=asyncio.FIRST_COMPLETED, + ) + for t in pending: + t.cancel() + except asyncio.CancelledError: + stop_wait.cancel() + disc_wait.cancel() + raise diff --git a/repeater/web/api_endpoints.py b/repeater/web/api_endpoints.py index ab40a5c..018bd6c 100644 --- a/repeater/web/api_endpoints.py +++ b/repeater/web/api_endpoints.py @@ -225,7 +225,7 @@ def _get_storage(self): not hasattr(self.daemon_instance, "repeater_handler") or not self.daemon_instance.repeater_handler ): - raise Exception("Repeater handler not initialized") + raise Exception("Radio not available — connecting or hardware unavailable") if ( not hasattr(self.daemon_instance.repeater_handler, "storage") diff --git a/tests/test_radio_manager.py b/tests/test_radio_manager.py new file mode 100644 index 0000000..79e4f55 --- /dev/null +++ b/tests/test_radio_manager.py @@ -0,0 +1,468 @@ +""" +Tests for RadioManager — asyncio radio lifecycle, retry, disconnect, and cleanup. + +Run with: + python -m pytest tests/test_radio_manager.py -v +""" + +import asyncio +import sys +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +from repeater.radio_manager import RadioManager + + +# --------------------------------------------------------------------------- +# Radio mock factories +# --------------------------------------------------------------------------- + +class _SX1262Spec: + """Minimal spec for SX1262-style radio: no is_connected, event-driven disconnect.""" + def cleanup(self): pass + + +class _KISSSpec: + """Minimal spec for KISS-style radio: has is_connected bool, polled for disconnect.""" + is_connected: bool + def cleanup(self): pass + + +def _sx1262_radio(): + """MagicMock radio without is_connected — triggers the asyncio-event disconnect path.""" + return MagicMock(spec=_SX1262Spec) + + +def _kiss_radio(connected=True): + """MagicMock radio with is_connected — triggers the 1s-poll disconnect path.""" + r = MagicMock(spec=_KISSSpec) + r.is_connected = connected + return r + + +# --------------------------------------------------------------------------- +# Manager factory helper +# --------------------------------------------------------------------------- + +def _make_manager(radio_type="sx1262", on_connected=None, on_disconnected=None): + if on_connected is None: + on_connected = AsyncMock() + if on_disconnected is None: + on_disconnected = AsyncMock() + m = RadioManager( + get_config=lambda: {"radio_type": radio_type}, + on_connected=on_connected, + on_disconnected=on_disconnected, + ) + return m, on_connected, on_disconnected + + +# --------------------------------------------------------------------------- +# 1. Happy path and status shape +# --------------------------------------------------------------------------- + +class TestRadioManagerConnect(unittest.IsolatedAsyncioTestCase): + + async def test_connects_calls_on_connected_with_radio(self): + mock_radio = _sx1262_radio() + m, on_connected, _ = _make_manager() + + with patch("repeater.config.get_radio_for_board", return_value=mock_radio), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.05) + + on_connected.assert_called_once_with(mock_radio) + await m.stop() + + async def test_status_is_connected_after_success(self): + m, _, _ = _make_manager() + + with patch("repeater.config.get_radio_for_board", return_value=_sx1262_radio()), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.05) + + s = m.get_status() + self.assertEqual(s["status"], "connected") + self.assertEqual(s["type"], "sx1262") + self.assertIsNone(s["error"]) + self.assertIsNotNone(s["connected_at"]) + self.assertEqual(s["retry_count"], 0) + await m.stop() + + async def test_get_status_has_all_required_keys(self): + m, _, _ = _make_manager() + s = m.get_status() + for key in ("status", "type", "error", "connected_at", + "last_error_at", "retry_count", "retry_delay_seconds"): + self.assertIn(key, s) + self.assertEqual(s["status"], "stopped") + + async def test_get_config_called_fresh_on_every_attempt(self): + """get_config must be called before each connection attempt so UI config changes apply.""" + config_calls = 0 + + def counting_config(): + nonlocal config_calls + config_calls += 1 + return {"radio_type": "sx1262"} + + attempt = [0] + + def fail_twice_then_connect(config): + attempt[0] += 1 + if attempt[0] < 3: + raise RuntimeError("not ready") + return _sx1262_radio() + + m = RadioManager(counting_config, AsyncMock(), AsyncMock()) + + with patch("repeater.config.get_radio_for_board", side_effect=fail_twice_then_connect), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.1) + self.assertGreaterEqual(config_calls, 3) + await m.stop() + + +# --------------------------------------------------------------------------- +# 2. Retry and backoff +# --------------------------------------------------------------------------- + +class TestRadioManagerRetry(unittest.IsolatedAsyncioTestCase): + + async def test_connect_failure_sets_error_status(self): + m, on_connected, _ = _make_manager() + + with patch("repeater.config.get_radio_for_board", side_effect=RuntimeError("no hardware")), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.05) + + on_connected.assert_not_called() + s = m.get_status() + self.assertEqual(s["status"], "error") + self.assertEqual(s["error"], "no hardware") + self.assertIsNotNone(s["last_error_at"]) + await m.stop() + + async def test_retry_count_increments_on_repeated_failure(self): + m, _, _ = _make_manager() + + with patch("repeater.config.get_radio_for_board", side_effect=RuntimeError("fail")), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.1) + self.assertGreater(m.get_status()["retry_count"], 1) + await m.stop() + + async def test_backoff_delay_sequence_follows_retry_delays(self): + """Delays passed to _interruptible_wait must match _RETRY_DELAYS in order.""" + delays_used = [] + + async def capture_wait(self_inner, delay): + delays_used.append(delay) + + attempt = [0] + + def fail_three_times(config): + attempt[0] += 1 + if attempt[0] <= 3: + raise RuntimeError("fail") + return _sx1262_radio() + + m, _, _ = _make_manager() + + with patch("repeater.config.get_radio_for_board", side_effect=fail_three_times), \ + patch.object(RadioManager, "_interruptible_wait", capture_wait), \ + patch("repeater.radio_manager._RETRY_DELAYS", [5, 10, 30, 60]): + m.start() + await asyncio.sleep(0.1) + self.assertEqual(delays_used[:3], [5, 10, 30]) + await m.stop() + + async def test_retry_count_and_delay_reset_on_successful_connect(self): + """retry_count and retry_delay_seconds return to 0 once connected after failures.""" + attempt = [0] + + def fail_twice_then_connect(_): + attempt[0] += 1 + if attempt[0] <= 2: + raise RuntimeError("fail") + return _sx1262_radio() + + m, _, _ = _make_manager() + + with patch("repeater.config.get_radio_for_board", side_effect=fail_twice_then_connect), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.1) + + s = m.get_status() + self.assertEqual(s["status"], "connected") + self.assertEqual(s["retry_count"], 0) + self.assertEqual(s["retry_delay_seconds"], 0) + await m.stop() + + async def test_notify_config_changed_unblocks_backoff_immediately(self): + """notify_config_changed() during a 1-second backoff causes immediate retry.""" + attempt = [0] + + def fail_once_then_connect(config): + attempt[0] += 1 + if attempt[0] == 1: + raise RuntimeError("fail") + return _sx1262_radio() + + m, _, _ = _make_manager() + + with patch("repeater.config.get_radio_for_board", side_effect=fail_once_then_connect), \ + patch("repeater.radio_manager._RETRY_DELAYS", [1, 1, 1, 1]): + m.start() + await asyncio.sleep(0.05) # First attempt failed, now in 1s backoff + self.assertEqual(m.get_status()["status"], "error") + + m.notify_config_changed() + await asyncio.sleep(0.1) # Retry should complete well within 1s + + self.assertEqual(m.get_status()["status"], "connected") + self.assertEqual(m.get_status()["retry_count"], 0) + self.assertEqual(m.get_status()["retry_delay_seconds"], 0) + await m.stop() + + +# --------------------------------------------------------------------------- +# 3. Mid-run disconnect +# --------------------------------------------------------------------------- + +class TestRadioManagerDisconnect(unittest.IsolatedAsyncioTestCase): + + async def test_sx1262_disconnect_via_signal_calls_on_disconnected(self): + """signal_disconnected() mid-run triggers on_disconnected then reconnect.""" + m, on_connected, on_disconnected = _make_manager() + + with patch("repeater.config.get_radio_for_board", return_value=_sx1262_radio()), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.05) + self.assertEqual(m.get_status()["status"], "connected") + + m.signal_disconnected() + await asyncio.sleep(0.05) + + on_disconnected.assert_called_once() + await m.stop() + + async def test_kiss_disconnect_via_is_connected_flag(self): + """is_connected → False on KISS radio triggers on_disconnected and reconnect. + + is_connected is set False inside on_connected (before _wait_for_disconnect + checks it), so the poll detects it on the first check without a 1s delay. + """ + mock_radio = _kiss_radio(connected=True) + on_disconnected = AsyncMock() + connect_count = [0] + reconnected = asyncio.Event() + + async def on_connected_fn(radio): + radio.is_connected = True # Reset so second connect stays alive + connect_count[0] += 1 + if connect_count[0] == 1: + # Set False before _wait_for_disconnect polls — no 1s wait needed + radio.is_connected = False + else: + reconnected.set() + + m = RadioManager( + get_config=lambda: {"radio_type": "kiss"}, + on_connected=on_connected_fn, + on_disconnected=on_disconnected, + ) + + with patch("repeater.config.get_radio_for_board", return_value=mock_radio), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.wait_for(reconnected.wait(), timeout=2.0) + + on_disconnected.assert_called_once() + self.assertGreaterEqual(connect_count[0], 2) + await m.stop() + + async def test_on_disconnected_raises_is_swallowed(self): + """Exception from on_disconnected must not abort the reconnect cycle.""" + connect_count = [0] + reconnected = asyncio.Event() + + async def on_connected_fn(radio): + connect_count[0] += 1 + if connect_count[0] > 1: + reconnected.set() + + m = RadioManager( + get_config=lambda: {"radio_type": "sx1262"}, + on_connected=on_connected_fn, + on_disconnected=AsyncMock(side_effect=RuntimeError("teardown failed")), + ) + + with patch("repeater.config.get_radio_for_board", return_value=_sx1262_radio()), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.05) + self.assertEqual(connect_count[0], 1) + + m.signal_disconnected() + await asyncio.wait_for(reconnected.wait(), timeout=1.0) + + self.assertGreaterEqual(connect_count[0], 2) + await m.stop() + + async def test_notify_config_changed_while_connected_forces_reconnect(self): + """notify_config_changed() while connected triggers disconnect + reconnect cycle.""" + connect_count = [0] + reconnected = asyncio.Event() + + async def on_connected_fn(radio): + connect_count[0] += 1 + if connect_count[0] > 1: + reconnected.set() + + m = RadioManager( + get_config=lambda: {"radio_type": "sx1262"}, + on_connected=on_connected_fn, + on_disconnected=AsyncMock(), + ) + + with patch("repeater.config.get_radio_for_board", return_value=_sx1262_radio()), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.05) + self.assertEqual(connect_count[0], 1) + + m.notify_config_changed() + await asyncio.wait_for(reconnected.wait(), timeout=1.0) + + self.assertGreaterEqual(connect_count[0], 2) + await m.stop() + + +# --------------------------------------------------------------------------- +# 4. Shutdown +# --------------------------------------------------------------------------- + +class TestRadioManagerShutdown(unittest.IsolatedAsyncioTestCase): + + async def test_stop_during_backoff_exits_cleanly(self): + m, _, _ = _make_manager() + + with patch("repeater.config.get_radio_for_board", side_effect=RuntimeError("fail")), \ + patch("repeater.radio_manager._RETRY_DELAYS", [60, 60, 60, 60]): + m.start() + await asyncio.sleep(0.05) # Now in 60s backoff + await m.stop() + self.assertEqual(m.get_status()["status"], "stopped") + + async def test_stop_while_connected_calls_radio_cleanup(self): + mock_radio = _sx1262_radio() + m, _, _ = _make_manager() + + with patch("repeater.config.get_radio_for_board", return_value=mock_radio), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.05) + self.assertEqual(m.get_status()["status"], "connected") + + await m.stop() + + mock_radio.cleanup.assert_called_once() + self.assertEqual(m.get_status()["status"], "stopped") + + async def test_stop_before_start_is_safe(self): + m, _, _ = _make_manager() + await m.stop() # Must not raise + self.assertEqual(m.get_status()["status"], "stopped") + + +# --------------------------------------------------------------------------- +# 5. Error handling +# --------------------------------------------------------------------------- + +class TestRadioManagerErrorHandling(unittest.IsolatedAsyncioTestCase): + + async def test_on_connected_raises_triggers_retry(self): + """on_connected raising is treated as a connection error — manager retries.""" + connect_count = [0] + + async def failing_then_ok(radio): + connect_count[0] += 1 + if connect_count[0] == 1: + raise RuntimeError("daemon setup failed") + + m = RadioManager( + get_config=lambda: {"radio_type": "sx1262"}, + on_connected=failing_then_ok, + on_disconnected=AsyncMock(), + ) + + with patch("repeater.config.get_radio_for_board", return_value=_sx1262_radio()), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.1) + self.assertGreaterEqual(connect_count[0], 2) + self.assertEqual(m.get_status()["status"], "connected") + await m.stop() + + async def test_on_connected_raises_calls_cleanup_on_radio(self): + """Radio must be cleaned up when on_connected raises — no hardware leak.""" + mock_radio = _sx1262_radio() + + m = RadioManager( + get_config=lambda: {"radio_type": "sx1262"}, + on_connected=AsyncMock(side_effect=RuntimeError("fail")), + on_disconnected=AsyncMock(), + ) + + with patch("repeater.config.get_radio_for_board", return_value=mock_radio), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.05) + mock_radio.cleanup.assert_called() + await m.stop() + + async def test_radio_cleanup_exception_is_swallowed(self): + """Exception from radio.cleanup() must not crash the manager or stop().""" + mock_radio = _sx1262_radio() + mock_radio.cleanup.side_effect = RuntimeError("cleanup exploded") + m, _, _ = _make_manager() + + with patch("repeater.config.get_radio_for_board", return_value=mock_radio), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4): + m.start() + await asyncio.sleep(0.05) + await m.stop() # Must not raise + self.assertEqual(m.get_status()["status"], "stopped") + + async def test_ch341_reset_called_for_ch341_radio_type(self): + """CH341Async.reset_instance() is called when radio_type is sx1262_ch341.""" + mock_radio = _sx1262_radio() + mock_ch341_class = MagicMock() + mock_ch341_module = MagicMock() + mock_ch341_module.CH341Async = mock_ch341_class + + m = RadioManager( + get_config=lambda: {"radio_type": "sx1262_ch341"}, + on_connected=AsyncMock(), + on_disconnected=AsyncMock(), + ) + + with patch("repeater.config.get_radio_for_board", return_value=mock_radio), \ + patch("repeater.radio_manager._RETRY_DELAYS", [0] * 4), \ + patch.dict(sys.modules, {"pymc_core.hardware.ch341.ch341_async": mock_ch341_module}): + m.start() + await asyncio.sleep(0.05) + await m.stop() + mock_ch341_class.reset_instance.assert_called() + + +if __name__ == "__main__": + unittest.main()