22import queue
33import re
44import threading
5+ import os # NEU: Import für Umgebungsvariablen
56from datetime import datetime , timedelta , timezone
6- from typing import Any , Callable , List , Literal , Optional
7+ from typing import Any , Callable , List , Literal , Optional , Pattern
78
9+ from .constants import SDUINO_CMD_TIMEOUT
810from .exceptions import SignalduinoCommandTimeout , SignalduinoConnectionError
11+ from .mqtt import MqttPublisher # NEU: MQTT-Import
912from .parser import SignalParser
1013from .transport import BaseTransport
1114from .types import DecodedMessage , PendingResponse , QueuedCommand
@@ -26,6 +29,13 @@ def __init__(
2629 self .message_callback = message_callback
2730 self .logger = logger or logging .getLogger (__name__ )
2831
32+ # NEU: MQTT Publisher initialisieren
33+ self .mqtt_publisher : Optional [MqttPublisher ] = None
34+ if os .environ .get ("MQTT_HOST" ):
35+ # Nur initialisieren, wenn MQTT-Host konfiguriert ist
36+ self .mqtt_publisher = MqttPublisher (logger = self .logger )
37+ self .mqtt_publisher .register_command_callback (self ._handle_mqtt_command )
38+
2939 self ._reader_thread : Optional [threading .Thread ] = None
3040 self ._parser_thread : Optional [threading .Thread ] = None
3141 self ._writer_thread : Optional [threading .Thread ] = None
@@ -68,6 +78,10 @@ def disconnect(self) -> None:
6878 self .logger .info ("Disconnecting..." )
6979 self ._stop_event .set ()
7080
81+ # NEU: MQTT Publisher stoppen
82+ if self .mqtt_publisher :
83+ self .mqtt_publisher .stop ()
84+
7185 # Wake up threads that might be waiting on queues
7286 self ._raw_message_queue .put ("" )
7387 self ._write_queue .put (QueuedCommand ("" , 0 ))
@@ -113,6 +127,12 @@ def _parser_loop(self) -> None:
113127
114128 decoded_messages = self .parser .parse_line (raw_line )
115129 for message in decoded_messages :
130+ if self .mqtt_publisher :
131+ try :
132+ self .mqtt_publisher .publish (message )
133+ except Exception :
134+ self .logger .exception ("Error in MQTT publish" )
135+
116136 if self .message_callback :
117137 try :
118138 self .message_callback (message )
@@ -241,7 +261,11 @@ def send_message(self, message: str) -> None:
241261 self .send_command (message )
242262
243263 def send_command (
244- self , payload : str , expect_response : bool = False , timeout : float = 2.0
264+ self ,
265+ payload : str ,
266+ expect_response : bool = False ,
267+ timeout : float = 2.0 ,
268+ response_pattern : Optional [Pattern [str ]] = None ,
245269 ) -> Optional [str ]:
246270 """Queues a command and optionally waits for a specific response."""
247271 if not self .transport .is_open :
@@ -256,11 +280,16 @@ def send_command(
256280 def on_response (response : str ):
257281 response_queue .put (response )
258282
283+ if response_pattern is None :
284+ response_pattern = re .compile (
285+ f".*{ re .escape (payload )} .*|.*OK.*" , re .IGNORECASE
286+ )
287+
259288 command = QueuedCommand (
260289 payload = payload ,
261290 timeout = timeout ,
262291 expect_response = True ,
263- response_pattern = re . compile ( f".* { re . escape ( payload ) } .*|.*OK.*" , re . IGNORECASE ) ,
292+ response_pattern = response_pattern ,
264293 on_response = on_response ,
265294 description = payload ,
266295 )
@@ -270,4 +299,52 @@ def on_response(response: str):
270299 try :
271300 return response_queue .get (timeout = timeout )
272301 except queue .Empty :
273- raise SignalduinoCommandTimeout (f"Command '{ payload } ' timed out" )
302+ raise SignalduinoCommandTimeout (f"Command '{ payload } ' timed out" )
303+
304+ def _handle_mqtt_command (self , command : str , payload : str ) -> None :
305+ """Handles commands received via MQTT."""
306+ self .logger .info ("Handling MQTT command: %s (payload: %s)" , command , payload )
307+
308+ if command == "version" :
309+ try :
310+ # Send 'V' command and wait for response matching version pattern
311+ # Perl: 'V\s.*SIGNAL(?:duino|ESP|STM).*(?:\s\d\d:\d\d:\d\d)'
312+ version_pattern = re .compile (
313+ r"V\s.*SIGNAL(?:duino|ESP|STM).*" , re .IGNORECASE
314+ )
315+
316+ try :
317+ response = self .send_command (
318+ payload = "V" ,
319+ expect_response = True ,
320+ timeout = SDUINO_CMD_TIMEOUT ,
321+ response_pattern = version_pattern ,
322+ )
323+ self .logger .info ("Got version response: %s" , response )
324+ # Publish result back to MQTT
325+ # Topic: signalduino/messages/result/version
326+ # We need access to the client to publish ad-hoc messages or add a method to publisher
327+ if (
328+ self .mqtt_publisher
329+ and self .mqtt_publisher .client .is_connected ()
330+ ):
331+ result_topic = (
332+ f"{ self .mqtt_publisher .mqtt_topic } /result/{ command } "
333+ )
334+ self .mqtt_publisher .client .publish (result_topic , response )
335+
336+ except SignalduinoCommandTimeout :
337+ self .logger .error ("Timeout waiting for version response" )
338+ if (
339+ self .mqtt_publisher
340+ and self .mqtt_publisher .client .is_connected ()
341+ ):
342+ result_topic = (
343+ f"{ self .mqtt_publisher .mqtt_topic } /error/{ command } "
344+ )
345+ self .mqtt_publisher .client .publish (result_topic , "Timeout" )
346+
347+ except Exception as e :
348+ self .logger .error ("Error executing version command: %s" , e )
349+ else :
350+ self .logger .warning ("Unknown MQTT command: %s" , command )
0 commit comments