Skip to content

Commit 015199f

Browse files
Tobias AhrensTobias Ahrens
authored andcommitted
fix problem of streaming errors causing a unsubscribe
This commit fixes a bug which caused a subscription to unsubscribe when a streaming error was received since a uninitialized value was accessed. In addition errors happening in the subscription handling are now bubbled up to the dataserver to show up in the logs.
1 parent 49fe5fb commit 015199f

3 files changed

Lines changed: 90 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
# Labone Python API Changelog
22

3-
## Version 3.1.1
3+
## Version 3.1.2
4+
* Fix bug which caused streaming errors to cancel the subscriptions
5+
* Raise severity of errors during subscriptions to `FAILED` to cause a data server
6+
log entry.
47

8+
## Version 3.1.1
59
* Add support for Python 3.13
610

711
## Version 3.1.0

src/labone/core/subscription.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,7 @@ def _distribute_to_data_queues(
496496
# error should not be raised here since this would disconnect the
497497
# subscription.
498498
logger.exception(err.args[0])
499+
return
499500
except ValueError as err: # pragma: no cover
500501
self._data_queues = [
501502
data_queue().disconnect() # type: ignore[union-attr] # supposed to throw
@@ -539,4 +540,4 @@ async def capnp_callback(
539540
list(map(self._distribute_to_data_queues, call_input.values))
540541
fulfiller.fulfill()
541542
except Exception as err: # noqa: BLE001
542-
fulfiller.reject(zhinst.comms.Fulfiller.DISCONNECTED, err.args[0])
543+
fulfiller.reject(zhinst.comms.Fulfiller.FAILED, err.args[0])

tests/core/test_subscription.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
"""Tests for the `labone.core.subscription` module."""
22

33
import asyncio
4+
import logging
5+
from unittest.mock import MagicMock
46

57
import pytest
68

7-
from labone.core import errors
9+
from labone.core import errors, hpk_schema
810
from labone.core.subscription import (
911
CircularDataQueue,
1012
DataQueue,
@@ -257,6 +259,86 @@ def test_streaming_handle_with_parser_callback():
257259
)
258260

259261

262+
@pytest.mark.asyncio
263+
async def test_capnp_callback(caplog):
264+
streaming_handle = StreamingHandle()
265+
queue = DataQueue(
266+
path="dummy",
267+
handle=streaming_handle,
268+
)
269+
call_param = hpk_schema.StreamingHandleSendValuesParams()
270+
values = call_param.init_values(2)
271+
272+
values[0].init_metadata(timestamp=0, path="dummy")
273+
values[0].init_value(int64=42)
274+
275+
values[1].init_metadata(timestamp=1, path="dummy")
276+
values[1].init_value(double=22.0)
277+
278+
fulfiller = MagicMock()
279+
with caplog.at_level(logging.ERROR):
280+
await streaming_handle.capnp_callback(0, 0, call_param, fulfiller)
281+
assert "" in caplog.text
282+
assert queue.qsize() == 2
283+
assert queue.get_nowait() == AnnotatedValue(value=42, path="dummy", timestamp=0)
284+
assert queue.get_nowait() == AnnotatedValue(value=22.0, path="dummy", timestamp=1)
285+
fulfiller.fulfill.assert_called_once()
286+
287+
288+
@pytest.mark.asyncio
289+
async def test_streaming_error(caplog):
290+
streaming_handle = StreamingHandle()
291+
queue = DataQueue(
292+
path="dummy",
293+
handle=streaming_handle,
294+
)
295+
call_param = hpk_schema.StreamingHandleSendValuesParams()
296+
values = call_param.init_values(1)
297+
values[0].init_metadata(timestamp=0, path="dummy")
298+
values[0].init_value().init_streamingError(
299+
code=1,
300+
message="test error",
301+
category="unknown",
302+
)
303+
fulfiller = MagicMock()
304+
with caplog.at_level(logging.ERROR):
305+
await streaming_handle.capnp_callback(0, 0, call_param, fulfiller)
306+
assert "test error" in caplog.text
307+
assert queue.qsize() == 0
308+
fulfiller.fulfill.assert_called_once()
309+
310+
311+
@pytest.mark.asyncio
312+
async def test_streaming_error_with_value(caplog):
313+
streaming_handle = StreamingHandle()
314+
queue = DataQueue(
315+
path="dummy",
316+
handle=streaming_handle,
317+
)
318+
call_param = hpk_schema.StreamingHandleSendValuesParams()
319+
values = call_param.init_values(2)
320+
321+
# Fist value is a streaming error
322+
values[0].init_metadata(timestamp=0, path="dummy")
323+
values[0].init_value().init_streamingError(
324+
code=1,
325+
message="test error",
326+
category="unknown",
327+
)
328+
329+
# Second value is a normal value
330+
values[1].init_metadata(timestamp=0, path="dummy")
331+
values[1].init_value(int64=42)
332+
333+
fulfiller = MagicMock()
334+
with caplog.at_level(logging.ERROR):
335+
await streaming_handle.capnp_callback(0, 0, call_param, fulfiller)
336+
assert "test error" in caplog.text
337+
assert queue.qsize() == 1
338+
assert queue.get_nowait() == AnnotatedValue(value=42, path="dummy", timestamp=0)
339+
fulfiller.fulfill.assert_called_once()
340+
341+
260342
@pytest.mark.asyncio
261343
async def test_distinct_data_queue_put_no_wait_new_value():
262344
subscription = FakeSubscription()

0 commit comments

Comments
 (0)