66from __future__ import annotations
77
88import logging
9+ from functools import partial
910from types import TracebackType
1011
1112try :
1213 from typing import Self # type: ignore # Python 3.11
1314except ImportError :
1415 from typing_extensions import Self
16+
1517from typing import Any , AsyncGenerator , Type
1618from uuid import UUID
1719
2123
2224from async_event_bus import event_bus
2325from data_types import DataEvent
26+ from database .models import TinkerforgeSensorConfigModel
2427from errors import ConfigurationError
2528from helper_functions import call_safely , context , create_device_function
2629from sensors .drivers .shared import DeviceStatus
@@ -125,7 +128,10 @@ def stream_data(self) -> AsyncGenerator[DataEvent, None]:
125128 self ._stream_config_updates (sensor )
126129 | pipe .switchmap (
127130 lambda config : stream .chain (
131+ # First inject the initial config into the stream,
128132 stream .just (config ),
133+ # then listen for remove events and map those to None (no config) to unregister
134+ # the sensor
129135 stream .iterate (event_bus .subscribe (f"nodes/by_uuid/{ config ['uuid' ]} /remove" ))[:1 ]
130136 | pipe .map (lambda x : None ),
131137 )
@@ -152,7 +158,22 @@ def stream_data(self) -> AsyncGenerator[DataEvent, None]:
152158
153159 return data_stream
154160
155- def _create_config (self , config : dict [str , Any ] | None ) -> dict [str , Any ] | None :
161+ def _create_config (self , config : dict [str , Any ] | None ) -> dict [str , tuple [partial , float ] | Any ] | None :
162+ """
163+ Maps the 'on_connect' key of the config to functions provided by the Tinkerforge device.
164+ Parameters
165+ ----------
166+ config: dict or None
167+ A dict that contains the key 'on_connect' which defines functions calls to made on the self.device object.
168+ If None is given, None will be returned.
169+
170+ Returns
171+ -------
172+ dict or None
173+ A dict containing a key called 'on_connect' that holds a tuple[partial, float], with partials that run on
174+ the self.device object with a timeout in seconds (float).
175+ Returns None if the config input parameter is None as well.
176+ """
156177 if config is None :
157178 return None
158179 try :
@@ -164,7 +185,7 @@ def _create_config(self, config: dict[str, Any] | None) -> dict[str, Any] | None
164185
165186 return config
166187
167- def _read_sensor ( # pylint: disable=too-many-arguments
188+ def _read_sensor ( # pylint: disable=too-many-arguments,too-many-positional-arguments
168189 self , source_uuid : UUID , sid : int , unit : str , topic : str , callback_config : AdvancedCallbackConfiguration
169190 ) -> AsyncGenerator [DataEvent , None ]:
170191 monitor_stream = (
@@ -191,6 +212,23 @@ def _read_sensor( # pylint: disable=too-many-arguments
191212 def _parse_callback_configuration (
192213 sid : str | int , config : dict [str , Any ]
193214 ) -> tuple [int , str , str , AdvancedCallbackConfiguration ]:
215+ """
216+
217+ Parameters
218+ ----------
219+ sid: int or str
220+ The secondary id, naming the sensor of the node. Each node may have multiple sensors with a separate sid.
221+ config: dict
222+ A dictionary containing the keys 'interval', 'trigger_only_on_change'. 'unit', and 'topic'
223+
224+ Returns
225+ -------
226+ tuple of int, str, str, AdvancedCallbackConfiguration:
227+ The sid is an int specifying the secondary id of the sensor in the tree below the node
228+ The unit is a str naming the unit of measurement of this sensor
229+ The topic is a str defining where to publish the data
230+ The callback_config is an AdvancedCallbackConfiguration that specifies the configuration if this sensor
231+ """
194232 sid = int (sid )
195233 callback_config = AdvancedCallbackConfiguration (
196234 period = config ["interval" ],
@@ -219,26 +257,43 @@ async def _set_callback_configuration(self, sid: int, unit: str, topic: str, con
219257 return stream .empty ()
220258 return stream .just ((sid , unit , topic , remote_callback_config ))
221259
222- def _configure_and_stream (self , config : dict [str , Any ] | None ) -> AsyncGenerator [DataEvent , None ]:
260+ @staticmethod
261+ @staticmethod
262+ def _configure (config : TinkerforgeSensorConfigModel ):
263+ """
264+ Configure the sensor by calling all 'on_connect' functions set in the config.
265+
266+ Parameters
267+ ----------
268+ config: dict[str, tuple[partial, float]]
269+ A config dict, that contains the key 'on_connect', which holds the function and a timeout when calling it.
270+ """
271+ return (
272+ stream .iterate (config ["on_connect" ])
273+ | pipe .starmap (lambda func , timeout : stream .just (func ()) | pipe .timeout (timeout ))
274+ | pipe .concat (task_limit = 1 )
275+ | pipe .filter (lambda result : False )
276+ )
277+
278+ def _configure_and_stream (
279+ self ,
280+ config : TinkerforgeSensorConfigModel | None ,
281+ ) -> AsyncGenerator [DataEvent , None ]:
223282 if config is None :
224283 return stream .empty ()
225284 try :
226285 # Run all config steps in order (concat) and one at a time (task_limit=1). Drop the output. There is
227286 # nothing to compare them to (filter => false), then read all sensors of the bricklet and process them in
228287 # parallel (flatten).
229- config_stream = stream .chain (
230- stream .iterate (config ["on_connect" ])
231- | pipe .starmap (lambda func , timeout : stream .just (func ()) | pipe .timeout (timeout ))
232- | pipe .concat (task_limit = 1 )
233- | pipe .filter (lambda result : False ),
288+ data_stream = stream .chain (
289+ self ._configure (config ),
234290 stream .iterate (config ["config" ].items ())
235291 | pipe .starmap (self ._parse_callback_configuration )
236292 | pipe .starmap (self ._set_callback_configuration )
237293 | pipe .flatten ()
238- | pipe .map (lambda args : self ._read_sensor (config ["uuid" ], * args ))
239- | pipe .flatten (),
294+ | pipe .flatmap (lambda args : self ._read_sensor (config ["uuid" ], * args )),
240295 )
241- return config_stream
296+ return data_stream
242297 except NotConnectedError :
243298 # Do not log it
244299 raise
0 commit comments