Skip to content

Commit 8964ba5

Browse files
committed
feat: mqtt hartbeat and more options
1 parent 6bdd287 commit 8964ba5

5 files changed

Lines changed: 163 additions & 108 deletions

File tree

signalduino/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
SDUINO_WRITEQUEUE_NEXT = 0.3
1111
SDUINO_WRITEQUEUE_TIMEOUT = 2
1212

13+
SDUINO_STATUS_HEARTBEAT_INTERVAL = 600.0 # 10 minutes
14+
1315
SDUINO_DISPATCH_VERBOSE = 5
1416
SDUINO_MC_DISPATCH_VERBOSE = 5
1517
SDUINO_MC_DISPATCH_LOG_ID = "12.1"

signalduino/controller.py

Lines changed: 127 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
1+
import json # NEU: Import für JSON-Serialisierung
12
import logging
23
import queue
34
import re
45
import threading
56
import os # NEU: Import für Umgebungsvariablen
67
from datetime import datetime, timedelta, timezone
7-
from typing import Any, Callable, List, Literal, Optional, Pattern
8+
from typing import Any, Callable, List, Optional, Pattern
89

10+
from .commands import SignalduinoCommands # NEU: Import für Befehle
911
from .constants import (
1012
SDUINO_CMD_TIMEOUT,
1113
SDUINO_INIT_MAXRETRY,
1214
SDUINO_INIT_WAIT,
1315
SDUINO_INIT_WAIT_XQ,
16+
SDUINO_STATUS_HEARTBEAT_INTERVAL, # NEU: Heartbeat-Konstante
1417
)
1518
from .exceptions import SignalduinoCommandTimeout, SignalduinoConnectionError
1619
from .mqtt import MqttPublisher # NEU: MQTT-Import
@@ -30,6 +33,7 @@ def __init__(
3033
logger: Optional[logging.Logger] = None,
3134
) -> None:
3235
self.transport = transport
36+
self.commands = SignalduinoCommands(self.send_command) # NEU: Befehlsklasse initialisieren
3337
self.parser = parser or SignalParser()
3438
self.message_callback = message_callback
3539
self.logger = logger or logging.getLogger(__name__)
@@ -45,6 +49,8 @@ def __init__(
4549
self._parser_thread: Optional[threading.Thread] = None
4650
self._writer_thread: Optional[threading.Thread] = None
4751

52+
self._heartbeat_timer: Optional[threading.Timer] = None # NEU: Heartbeat Timer initialisieren
53+
4854
self._stop_event = threading.Event()
4955
self._raw_message_queue: queue.Queue[str] = queue.Queue()
5056
self._write_queue: queue.Queue[QueuedCommand] = queue.Queue()
@@ -89,6 +95,10 @@ def disconnect(self) -> None:
8995
# NEU: MQTT Publisher stoppen
9096
if self.mqtt_publisher:
9197
self.mqtt_publisher.stop()
98+
99+
if self._heartbeat_timer: # NEU: Heartbeat Timer stoppen
100+
self._heartbeat_timer.cancel()
101+
self._heartbeat_timer = None
92102

93103
# Wake up threads that might be waiting on queues
94104
self._raw_message_queue.put("")
@@ -119,7 +129,7 @@ def initialize(self) -> None:
119129
def _send_xq(self) -> None:
120130
try:
121131
self.logger.debug("Sending XQ to disable receiver during init")
122-
self.send_command("XQ", expect_response=False)
132+
self.commands.disable_receiver()
123133
except Exception as e:
124134
self.logger.warning("Failed to send XQ: %s", e)
125135

@@ -142,15 +152,8 @@ def _start_init(self) -> None:
142152

143153
response = None
144154
try:
145-
# Perl Regex: 'V\s.*SIGNAL(?:duino|ESP|STM).*(?:\s\d\d:\d\d:\d\d)'
146-
version_pattern = re.compile(r"V\s.*SIGNAL(?:duino|ESP|STM).*", re.IGNORECASE)
147-
# Use a short timeout here to speed up failed attempts
148-
response = self.send_command(
149-
"V",
150-
expect_response=True,
151-
timeout=2.0, # Shorter timeout for retries
152-
response_pattern=version_pattern,
153-
)
155+
# Use commands class for version check
156+
response = self.commands.get_version(timeout=2.0) # Shorter timeout for retries
154157
except Exception as e:
155158
self.logger.debug("StartInit: Exception during version check: %s", e)
156159

@@ -161,18 +164,28 @@ def _check_version_resp(self, msg: Optional[str]) -> None:
161164
self.logger.info("Initialized %s", msg.strip())
162165
self.init_reset_flag = False
163166
self.init_retry_count = 0
167+
self.init_version_response = msg # Speichern der Version
168+
169+
# NEU: Versionsmeldung per MQTT veröffentlichen (Schritt 5)
170+
if self.mqtt_publisher:
171+
# Topic: <mqtt_topic>/status/version
172+
self.mqtt_publisher.publish_simple("status/version", msg.strip(), retain=True)
164173

165174
# Enable Receiver XE
166175
try:
167176
self.logger.info("Enabling receiver (XE)")
168-
self.send_command("XE", expect_response=False)
177+
self.commands.enable_receiver()
169178
except Exception as e:
170179
self.logger.warning("Failed to enable receiver: %s", e)
171180

172181
# Check for CC1101
173182
if "cc1101" in msg.lower():
174183
self.logger.info("CC1101 detected")
175184
# Here we could query ccconf and ccpatable like in Perl
185+
186+
# NEU: Starte Heartbeat-Timer
187+
self._start_heartbeat_timer()
188+
176189
else:
177190
self.logger.warning("StartInit: No valid version response.")
178191
self.init_retry_count += 1
@@ -321,49 +334,6 @@ def send_raw_command(self, command: str, expect_response: bool = False, timeout:
321334
"""Queues a raw command and optionally waits for a specific response."""
322335
return self.send_command(payload=command, expect_response=expect_response, timeout=timeout)
323336

324-
def set_message_type_enabled(
325-
self, message_type: Literal["MS", "MU", "MC"], enabled: bool
326-
) -> None:
327-
"""Enables or disables a specific message type in the firmware."""
328-
if message_type not in {"MS", "MU", "MC"}:
329-
raise ValueError(f"Invalid message type: {message_type}")
330-
331-
verb = "E" if enabled else "D"
332-
noun = message_type[-1] # S, U, or C
333-
command = f"C{verb}{noun}"
334-
self.send_command(command)
335-
336-
def _send_cc1101_command(self, command: str, value: Any) -> None:
337-
"""Helper to send a CC1101-specific command."""
338-
full_command = f"{command}{value}"
339-
self.send_command(full_command)
340-
341-
def set_bwidth(self, bwidth: int) -> None:
342-
"""Set the CC1101 bandwidth."""
343-
self._send_cc1101_command("C10", bwidth)
344-
345-
def set_rampl(self, rampl: int) -> None:
346-
"""Set the CC1101 rAmpl."""
347-
self._send_cc1101_command("W1D", rampl)
348-
349-
def set_sens(self, sens: int) -> None:
350-
"""Set the CC1101 sensitivity."""
351-
self._send_cc1101_command("W1F", sens)
352-
353-
def set_patable(self, patable: str) -> None:
354-
"""Set the CC1101 PA table."""
355-
self._send_cc1101_command("x", patable)
356-
357-
def set_freq(self, freq: float) -> None:
358-
"""Set the CC1101 frequency."""
359-
# This is a simplified version. The Perl code has complex logic here.
360-
command = f"W0F{int(freq):02X}" # Example, not fully correct
361-
self.send_command(command)
362-
363-
def send_message(self, message: str) -> None:
364-
"""Sends a pre-encoded message string."""
365-
self.send_command(message)
366-
367337
def send_command(
368338
self,
369339
payload: str,
@@ -419,50 +389,112 @@ def on_response(response: str):
419389
)
420390
raise SignalduinoCommandTimeout(f"Command '{payload}' timed out") from None
421391

392+
def _start_heartbeat_timer(self) -> None:
393+
"""Schedules the periodic status heartbeat."""
394+
if not self.mqtt_publisher:
395+
return
396+
397+
if self._heartbeat_timer:
398+
self._heartbeat_timer.cancel()
399+
400+
self._heartbeat_timer = threading.Timer(
401+
SDUINO_STATUS_HEARTBEAT_INTERVAL,
402+
self._publish_status_heartbeat
403+
)
404+
self._heartbeat_timer.name = "sd-heartbeat"
405+
self._heartbeat_timer.start()
406+
self.logger.info("Heartbeat timer started, interval: %d seconds.", SDUINO_STATUS_HEARTBEAT_INTERVAL)
407+
408+
def _publish_status_heartbeat(self) -> None:
409+
"""Publishes the current device status."""
410+
if not self.mqtt_publisher or not self.mqtt_publisher.is_connected():
411+
self.logger.warning("Cannot publish heartbeat; publisher not connected.")
412+
self._start_heartbeat_timer() # Try again later
413+
return
414+
415+
try:
416+
# 1. Heartbeat/Alive message (Retain: True)
417+
self.mqtt_publisher.publish_simple("status/alive", "online", retain=True)
418+
self.logger.debug("Published heartbeat status.")
419+
420+
# 2. Status data (version, ram, uptime)
421+
# Fetch data from device (non-blocking call, runs in timer thread)
422+
status_data = {}
423+
424+
# Version (if not already known from init)
425+
if self.init_version_response:
426+
status_data["version"] = self.init_version_response.strip()
427+
428+
# Free RAM
429+
try:
430+
ram_resp = self.commands.get_free_ram()
431+
# Format: R: 1234
432+
if ":" in ram_resp:
433+
status_data["free_ram"] = ram_resp.split(":")[-1].strip()
434+
else:
435+
status_data["free_ram"] = ram_resp.strip()
436+
except Exception as e:
437+
self.logger.warning("Could not get free RAM for heartbeat: %s", e)
438+
status_data["free_ram"] = "error"
439+
440+
# Uptime
441+
try:
442+
uptime_resp = self.commands.get_uptime()
443+
# Format: t: 1234
444+
if ":" in uptime_resp:
445+
status_data["uptime"] = uptime_resp.split(":")[-1].strip()
446+
else:
447+
status_data["uptime"] = uptime_resp.strip()
448+
except Exception as e:
449+
self.logger.warning("Could not get uptime for heartbeat: %s", e)
450+
status_data["uptime"] = "error"
451+
452+
# Publish all collected data to a single status/data topic
453+
if status_data:
454+
# Publish as JSON for structured data
455+
payload = json.dumps(status_data)
456+
self.mqtt_publisher.publish_simple("status/data", payload)
457+
458+
except Exception as e:
459+
self.logger.error("Error during status heartbeat: %s", e)
460+
461+
# Reschedule for next run
462+
self._start_heartbeat_timer()
463+
422464
def _handle_mqtt_command(self, command: str, payload: str) -> None:
423465
"""Handles commands received via MQTT."""
424466
self.logger.info("Handling MQTT command: %s (payload: %s)", command, payload)
425-
426-
if command == "version":
427-
try:
428-
# Send 'V' command and wait for response matching version pattern
429-
# Perl: 'V\s.*SIGNAL(?:duino|ESP|STM).*(?:\s\d\d:\d\d:\d\d)'
430-
version_pattern = re.compile(
431-
r"V\s.*SIGNAL(?:duino|ESP|STM).*", re.IGNORECASE
432-
)
433467

434-
try:
435-
response = self.send_command(
436-
payload="V",
437-
expect_response=True,
438-
timeout=SDUINO_CMD_TIMEOUT,
439-
response_pattern=version_pattern,
440-
)
441-
self.logger.info("Got version response: %s", response)
442-
# Publish result back to MQTT
443-
# Topic: signalduino/messages/result/version
444-
# We need access to the client to publish ad-hoc messages or add a method to publisher
445-
if (
446-
self.mqtt_publisher
447-
and self.mqtt_publisher.client.is_connected()
448-
):
449-
result_topic = (
450-
f"{self.mqtt_publisher.mqtt_topic}/result/{command}"
451-
)
452-
self.mqtt_publisher.client.publish(result_topic, response)
453-
454-
except SignalduinoCommandTimeout:
455-
self.logger.error("Timeout waiting for version response")
456-
if (
457-
self.mqtt_publisher
458-
and self.mqtt_publisher.client.is_connected()
459-
):
460-
result_topic = (
461-
f"{self.mqtt_publisher.mqtt_topic}/error/{command}"
462-
)
463-
self.mqtt_publisher.client.publish(result_topic, "Timeout")
468+
if not self.mqtt_publisher or not self.mqtt_publisher.is_connected():
469+
self.logger.warning("Cannot handle MQTT command; publisher not connected.")
470+
return
471+
472+
command_mapping = {
473+
"version": self.commands.get_version,
474+
"help": self.commands.get_help,
475+
"free_ram": self.commands.get_free_ram,
476+
"uptime": self.commands.get_uptime,
477+
}
464478

479+
if command in command_mapping:
480+
try:
481+
# Execute the corresponding command method
482+
response = command_mapping[command]()
483+
484+
self.logger.info("Got response for %s: %s", command, response)
485+
486+
# Publish result back to MQTT
487+
# Topic: <mqtt_topic>/result/<command>
488+
self.mqtt_publisher.publish_simple(f"result/{command}", response)
489+
490+
except SignalduinoCommandTimeout:
491+
self.logger.error("Timeout waiting for command response: %s", command)
492+
self.mqtt_publisher.publish_simple(f"error/{command}", "Timeout")
493+
465494
except Exception as e:
466-
self.logger.error("Error executing version command: %s", e)
495+
self.logger.error("Error executing command %s: %s", command, e)
496+
self.mqtt_publisher.publish_simple(f"error/{command}", f"Error: {e}")
497+
467498
else:
468-
self.logger.warning("Unknown MQTT command: %s", command)
499+
self.logger.warning("Unknown MQTT command: %s", command)
500+
self.mqtt_publisher.publish_simple(f"error/{command}", "Unknown command")

signalduino/mqtt.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,21 @@
77
import paho.mqtt.client as mqtt
88

99
from .types import DecodedMessage, RawFrame
10+
from .persistence import get_or_create_client_id
1011

1112
class MqttPublisher:
1213
"""Publishes DecodedMessage objects to an MQTT server and listens for commands."""
1314

1415
def __init__(self, logger: Optional[logging.Logger] = None) -> None:
1516
self.logger = logger or logging.getLogger(__name__)
16-
self.client = mqtt.Client()
17+
client_id = get_or_create_client_id()
18+
self.client = mqtt.Client(client_id=client_id)
1719
self.client.on_connect = self._on_connect
1820
self.client.on_disconnect = self._on_disconnect
1921

2022
self.mqtt_host = os.environ.get("MQTT_HOST", "localhost")
2123
self.mqtt_port = int(os.environ.get("MQTT_PORT", 1883))
22-
self.mqtt_topic = os.environ.get("MQTT_TOPIC", "signalduino/messages")
24+
self.mqtt_topic = os.environ.get("MQTT_TOPIC", "signalduino")
2325
self.mqtt_username = os.environ.get("MQTT_USERNAME")
2426
self.mqtt_password = os.environ.get("MQTT_PASSWORD")
2527

@@ -49,8 +51,8 @@ def _on_message(self, client: mqtt.Client, userdata: Any, msg: mqtt.MQTTMessage)
4951

5052
if self.command_callback:
5153
# Extract command from topic or payload
52-
# Topic structure: signalduino/messages/commands/<command>
53-
# Example: signalduino/messages/commands/version -> get version
54+
# Topic structure: signalduino/commands/<command>
55+
# Example: signalduino/commands/version -> get version
5456

5557
parts = msg.topic.split("/")
5658
if "commands" in parts:
@@ -99,14 +101,31 @@ def _raw_frame_to_dict(raw_frame: RawFrame) -> dict:
99101

100102
return json.dumps(message_dict, indent=4)
101103

104+
def is_connected(self) -> bool:
105+
"""Checks if the client is connected."""
106+
return self.client.is_connected()
107+
108+
def publish_simple(self, subtopic: str, payload: str, retain: bool = False) -> None:
109+
"""Publishes a simple string payload to a subtopic of the main topic."""
110+
if not self.is_connected():
111+
self._connect_if_needed()
112+
113+
if self.is_connected():
114+
try:
115+
topic = f"{self.mqtt_topic}/{subtopic}"
116+
self.client.publish(topic, payload, retain=retain)
117+
self.logger.debug("Published simple message to %s: %s", topic, payload)
118+
except Exception:
119+
self.logger.error("Failed to publish simple message to %s", subtopic, exc_info=True)
120+
102121
def publish(self, message: DecodedMessage) -> None:
103122
"""Publishes a DecodedMessage."""
104-
if not self.client.is_connected():
123+
if not self.is_connected():
105124
self._connect_if_needed()
106125

107-
if self.client.is_connected():
126+
if self.is_connected():
108127
try:
109-
topic = f"{self.mqtt_topic}/{message.protocol_id}"
128+
topic = f"{self.mqtt_topic}/messages"
110129
payload = self._message_to_json(message)
111130
self.client.publish(topic, payload)
112131
self.logger.debug("Published message for protocol %s to %s", message.protocol_id, topic)

0 commit comments

Comments
 (0)