Skip to content

Commit e5b31d8

Browse files
Tobias AhrensTobias Ahrens
authored andcommitted
Register queue on streaming handle before sending the subscribe request
This commit creates the subscription queue befor sending out the subscribe request. This ensures that if the data server sends an update event before the subscribe request returns, the data is still processed.
1 parent abde7fb commit e5b31d8

2 files changed

Lines changed: 15 additions & 7 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Labone Python API Changelog
22

3+
## Version 3.2.1
4+
* Fix bug that caused subscriptions to potentially miss value updates after the subscription was registered but before the subscribe functions returned.
5+
36
## Version 3.2.0
47
* `subscribe` accepts keyword arguments, which are forwarded to the data-server.
58
This allows to configure the subscription to the data-server.

src/labone/core/session.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -853,23 +853,28 @@ async def subscribe(
853853
},
854854
}
855855
if get_initial_value:
856-
_, initial_value = await asyncio.gather(
857-
self._session.subscribe(subscription=subscription),
858-
self.get(path),
859-
)
860856
new_queue_type = queue_type or DataQueue
861857
queue = new_queue_type(
862858
path=path,
863859
handle=streaming_handle,
864860
)
865-
queue.put_nowait(initial_value)
861+
_, initial_value = await asyncio.gather(
862+
self._session.subscribe(subscription=subscription),
863+
self.get(path),
864+
)
865+
# If the queue already has received a update event we do not
866+
# need to put the initial value in the queue. As it may break the
867+
# order.
868+
if queue.empty():
869+
queue.put_nowait(initial_value)
866870
return queue
867-
await self._session.subscribe(subscription=subscription)
868871
new_queue_type = queue_type or DataQueue
869-
return new_queue_type(
872+
queue = new_queue_type(
870873
path=path,
871874
handle=streaming_handle,
872875
)
876+
await self._session.subscribe(subscription=subscription)
877+
return queue
873878

874879
async def wait_for_state_change(
875880
self,

0 commit comments

Comments
 (0)