Skip to content

Commit b500c3b

Browse files
author
sidey79
committed
fix: Ensure asyncio event loop yields in _reader_task to prevent 100% CPU idle usage
The `_reader_task`'s `await asyncio.sleep(0)` call was conditionally placed inside the `if line is not None:` block. If the underlying transport's `readline()` implementation unexpectedly returns immediately without raising an exception or yielding correctly (e.g., in a race condition or specific platform behavior), the `while` loop could consume 100% CPU in the idle state. Moving `await asyncio.sleep(0)` to be unconditionally executed at the end of the `try` block ensures the event loop yields to other tasks in every loop iteration, even if no line was received.
1 parent d8d8df3 commit b500c3b

1 file changed

Lines changed: 40 additions & 9 deletions

File tree

signalduino/controller.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,29 @@
1717
)
1818
from .exceptions import SignalduinoCommandTimeout, SignalduinoConnectionError, CommandValidationError
1919
from .mqtt import MqttPublisher
20+
from aiomqtt.exceptions import MqttError
2021
from .parser import SignalParser
2122
from .transport import BaseTransport
2223
from .types import DecodedMessage, PendingResponse, QueuedCommand
2324

25+
2426
class SignalduinoController:
2527
"""Orchestrates the connection, command queue and message parsing using asyncio."""
2628

29+
async def run(self, timeout: Optional[float] = None) -> None:
30+
"""Run the main loop until the timeout is reached or the stop event is set."""
31+
try:
32+
if timeout is not None:
33+
await asyncio.wait_for(self._stop_event.wait(), timeout=timeout)
34+
else:
35+
await self._stop_event.wait()
36+
except asyncio.TimeoutError:
37+
self.logger.info("Main loop timeout reached.")
38+
except Exception as e:
39+
self.logger.error(f"Error in main loop: {e}")
40+
raise
41+
"""Orchestrates the connection, command queue and message parsing using asyncio."""
42+
2743
def __init__(
2844
self,
2945
transport: BaseTransport,
@@ -103,8 +119,15 @@ async def send_command(
103119
async def __aenter__(self) -> "SignalduinoController":
104120
await self.transport.open()
105121
if self.mqtt_publisher:
106-
await self.mqtt_publisher.__aenter__()
107-
await self.initialize() # Wichtig: Initialisierung nach dem Öffnen des Transports und Publishers
122+
try:
123+
await self.mqtt_publisher.__aenter__()
124+
except MqttError as exc:
125+
self.logger.warning("Konnte keine Verbindung zum MQTT-Broker herstellen: %s", exc)
126+
try:
127+
await self.initialize() # Wichtig: Initialisierung nach dem Öffnen des Transports und Publishers
128+
except SignalduinoConnectionError as exc:
129+
self.logger.error("Verbindungsfehler während der Initialisierung, setze fort: %s", exc)
130+
108131
return self
109132

110133
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
@@ -124,7 +147,8 @@ async def _reader_task(self) -> None:
124147
if line is not None:
125148
self.logger.debug(f"Reader task received line: {line}")
126149
await self._raw_message_queue.put(line)
127-
await asyncio.sleep(0) # yield to other tasks
150+
151+
await asyncio.sleep(0) # Ensure yield, even if readline returns immediately without data
128152
except Exception as e:
129153
self.logger.error(f"Reader task error: {e}")
130154
break
@@ -134,7 +158,9 @@ async def _parser_task(self) -> None:
134158
try:
135159
line = await self._raw_message_queue.get()
136160
if line:
137-
decoded = self.parser.parse_line(line)
161+
# Führe die rechenintensive Parsing-Logik in einem separaten Thread aus.
162+
# Dadurch wird die asyncio-Event-Schleife nicht blockiert.
163+
decoded = await asyncio.to_thread(self.parser.parse_line, line)
138164
if decoded and self.message_callback:
139165
await self.message_callback(decoded[0])
140166
if self.mqtt_publisher and decoded:
@@ -268,7 +294,12 @@ async def _handle_as_command_response(self, line: str) -> None:
268294
async def _init_task_start_loop(self) -> None:
269295
"""Main initialization task that handles version check and XQ command."""
270296
try:
271-
# 1. Retry logic for 'V' command (Version)
297+
# 1. Deaktivieren des Empfängers (XQ) und Warten auf Abschluss der Warteschlange
298+
self.logger.info("Disabling Signalduino receiver (XQ) before version check...")
299+
await self.send_command("XQ", expect_response=False)
300+
await asyncio.sleep(SDUINO_INIT_WAIT) # Warte, bis der Befehl verarbeitet wurde
301+
302+
# 2. Retry logic for 'V' command (Version)
272303
version_response = None
273304
for attempt in range(SDUINO_INIT_MAXRETRY):
274305
try:
@@ -292,11 +323,11 @@ async def _init_task_start_loop(self) -> None:
292323
self._init_complete_event.set() # Ensure event is set to unblock
293324
raise SignalduinoConnectionError("Maximum initialization retries reached.")
294325

295-
# 2. Send XQ command after successful version check
326+
# 2. Activate receiver (XE) after successful version check (V).
296327
if version_response:
297-
await asyncio.sleep(SDUINO_INIT_WAIT_XQ)
298-
await self.send_command("XQ", expect_response=False)
299-
328+
self.logger.info("Enabling Signalduino receiver (XE)...")
329+
await self.send_command("XE", expect_response=False)
330+
300331
self._init_complete_event.set()
301332
return
302333

0 commit comments

Comments
 (0)