1+ import json
2+ import logging
3+ import os
4+ from dataclasses import asdict
5+ from typing import Optional , Any , Callable
6+
7+ import paho .mqtt .client as mqtt
8+
9+ from .types import DecodedMessage , RawFrame
10+
11+ class MqttPublisher :
12+ """Publishes DecodedMessage objects to an MQTT server and listens for commands."""
13+
14+ def __init__ (self , logger : Optional [logging .Logger ] = None ) -> None :
15+ self .logger = logger or logging .getLogger (__name__ )
16+ self .client = mqtt .Client ()
17+ self .client .on_connect = self ._on_connect
18+ self .client .on_disconnect = self ._on_disconnect
19+
20+ self .mqtt_host = os .environ .get ("MQTT_HOST" , "localhost" )
21+ self .mqtt_port = int (os .environ .get ("MQTT_PORT" , 1883 ))
22+ self .mqtt_topic = os .environ .get ("MQTT_TOPIC" , "signalduino/messages" )
23+ self .mqtt_username = os .environ .get ("MQTT_USERNAME" )
24+ self .mqtt_password = os .environ .get ("MQTT_PASSWORD" )
25+
26+ if self .mqtt_username and self .mqtt_password :
27+ self .client .username_pw_set (self .mqtt_username , self .mqtt_password )
28+
29+ self .command_callback : Optional [Callable [[str , str ], None ]] = None
30+ self .client .on_message = self ._on_message
31+
32+ # Will connect on first publish attempt if not connected
33+
34+ def _on_connect (self , client : mqtt .Client , userdata : Any , flags : Any , rc : int ) -> None :
35+ if rc == 0 :
36+ self .logger .info ("Connected to MQTT broker %s:%s" , self .mqtt_host , self .mqtt_port )
37+ # Subscribe to command topic
38+ command_topic = f"{ self .mqtt_topic } /commands/#"
39+ self .client .subscribe (command_topic )
40+ self .logger .info ("Subscribed to %s" , command_topic )
41+ else :
42+ self .logger .error ("Failed to connect to MQTT broker. Result code: %s" , rc )
43+
44+ def _on_message (self , client : mqtt .Client , userdata : Any , msg : mqtt .MQTTMessage ) -> None :
45+ """Handles incoming MQTT messages."""
46+ try :
47+ payload = msg .payload .decode ("utf-8" )
48+ self .logger .debug ("Received MQTT message on %s: %s" , msg .topic , payload )
49+
50+ if self .command_callback :
51+ # Extract command from topic or payload
52+ # Topic structure: signalduino/messages/commands/<command>
53+ # Example: signalduino/messages/commands/version -> get version
54+
55+ parts = msg .topic .split ("/" )
56+ if "commands" in parts :
57+ cmd_index = parts .index ("commands" )
58+ if len (parts ) > cmd_index + 1 :
59+ command_name = parts [cmd_index + 1 ]
60+ self .command_callback (command_name , payload )
61+ else :
62+ self .logger .warning ("Received command on generic command topic without specific command: %s" , msg .topic )
63+
64+ except Exception :
65+ self .logger .exception ("Error processing incoming MQTT message" )
66+
67+ def _on_disconnect (self , client : mqtt .Client , userdata : Any , rc : int ) -> None :
68+ if rc != 0 :
69+ self .logger .warning ("Disconnected from MQTT broker with result code: %s. Attempting auto-reconnect." , rc )
70+ else :
71+ self .logger .info ("Disconnected from MQTT broker." )
72+
73+ def _connect_if_needed (self ) -> None :
74+ if not self .client .is_connected ():
75+ try :
76+ self .logger .debug ("Attempting to connect to MQTT broker..." )
77+ self .client .connect (self .mqtt_host , self .mqtt_port )
78+ self .client .loop_start () # Start a non-blocking loop
79+ except Exception :
80+ self .logger .error ("Could not connect to MQTT broker %s:%s" , self .mqtt_host , self .mqtt_port , exc_info = True )
81+
82+ @staticmethod
83+ def _message_to_json (message : DecodedMessage ) -> str :
84+ """Serializes a DecodedMessage to a JSON string."""
85+
86+ # DecodedMessage uses dataclasses, but RawFrame inside it also uses a dataclass.
87+ # We need a custom serializer to handle nested dataclasses like RawFrame.
88+ def _raw_frame_to_dict (raw_frame : RawFrame ) -> dict :
89+ return asdict (raw_frame )
90+
91+ message_dict = asdict (message )
92+
93+ # Convert RawFrame nested object to dict
94+ if "raw" in message_dict and isinstance (message_dict ["raw" ], RawFrame ):
95+ message_dict ["raw" ] = _raw_frame_to_dict (message_dict ["raw" ])
96+
97+ # Remove empty or non-useful fields for publication
98+ message_dict .pop ("raw" , None ) # Do not publish raw frame data by default
99+
100+ return json .dumps (message_dict , indent = 4 )
101+
102+ def publish (self , message : DecodedMessage ) -> None :
103+ """Publishes a DecodedMessage."""
104+ if not self .client .is_connected ():
105+ self ._connect_if_needed ()
106+
107+ if self .client .is_connected ():
108+ try :
109+ topic = f"{ self .mqtt_topic } /{ message .protocol_id } "
110+ payload = self ._message_to_json (message )
111+ self .client .publish (topic , payload )
112+ self .logger .debug ("Published message for protocol %s to %s" , message .protocol_id , topic )
113+ except Exception :
114+ self .logger .error ("Failed to publish message" , exc_info = True )
115+
116+ def register_command_callback (self , callback : Callable [[str , str ], None ]) -> None :
117+ """Registers a callback for incoming commands."""
118+ self .command_callback = callback
119+
120+ def stop (self ) -> None :
121+ """Stops the MQTT client and disconnects."""
122+ if self .client .is_connected ():
123+ self .logger .info ("Disconnecting from MQTT broker..." )
124+ self .client .loop_stop ()
125+ self .client .disconnect ()
126+
127+
0 commit comments