Skip to content

Commit 71764e3

Browse files
authored
Adds count to stream and streamset (#57)
1 parent 2130fcb commit 71764e3

2 files changed

Lines changed: 140 additions & 0 deletions

File tree

btrdb/stream.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,42 @@ def exists(self):
128128
return False
129129
raise bte
130130

131+
def count(self, start=MINIMUM_TIME, end=MAXIMUM_TIME, pointwidth=62, version=0):
132+
"""
133+
Compute the total number of points in the stream
134+
135+
Counts the number of points in the specified window and version. By
136+
default returns the latest total count of points in the stream. This
137+
helper method sums the counts of all StatPoints returned by
138+
``aligned_windows``. Because of this, note that the start and end
139+
timestamps may be adjusted if they are not powers of 2. For smaller
140+
windows of time, you may also need to adjust the pointwidth to ensure
141+
that the count granularity is captured appropriately.
142+
143+
Parameters
144+
----------
145+
start : int or datetime like object, default: MINIMUM_TIME
146+
The start time in nanoseconds for the range to be queried. (see
147+
:func:`btrdb.utils.timez.to_nanoseconds` for valid input types)
148+
149+
end : int or datetime like object, default: MAXIMUM_TIME
150+
The end time in nanoseconds for the range to be queried. (see
151+
:func:`btrdb.utils.timez.to_nanoseconds` for valid input types)
152+
153+
pointwidth : int, default: 62
154+
Specify the number of ns between data points (2**pointwidth)
155+
156+
version : int, default: 0
157+
Version of the stream to query
158+
159+
Returns
160+
-------
161+
int
162+
The total number of points in the stream for the specified window.
163+
"""
164+
points = self.aligned_windows(start, end, pointwidth, version)
165+
return sum([point.count for point, _ in points])
166+
131167
@property
132168
def btrdb(self):
133169
"""
@@ -772,6 +808,44 @@ def versions(self):
772808
"""
773809
return self._pinned_versions if self._pinned_versions else self._latest_versions()
774810

811+
def count(self):
812+
"""
813+
Compute the total number of points in the streams using filters.
814+
815+
Computes the total number of points across all streams using the
816+
specified filters. By default, this returns the latest total count of
817+
all points in the streams. The count is modified by start and end
818+
filters or by pinning versions.
819+
820+
Note that this helper method sums the counts of all StatPoints returned
821+
by ``aligned_windows``. Because of this the start and end timestamps
822+
may be adjusted if they are not powers of 2. You can also set the
823+
pointwidth property for smaller windows of time to ensure that the
824+
count granularity is captured appropriately.
825+
826+
Parameters
827+
----------
828+
None
829+
830+
Returns
831+
-------
832+
int
833+
The total number of points in all streams for the specified filters.
834+
"""
835+
params = self._params_from_filters()
836+
start = params.get("start", MINIMUM_TIME)
837+
end = params.get("end", MAXIMUM_TIME)
838+
839+
pointwidth = self.pointwidth if self.pointwidth is not None else 62
840+
versions = self._pinned_versions if self._pinned_versions else {}
841+
842+
count = 0
843+
for s in self._streams:
844+
version = versions.get(s.uuid, 0)
845+
count += s.count(start, end, pointwidth, version)
846+
847+
return count
848+
775849
def earliest(self):
776850
"""
777851
Returns earliest points of data in streams using available filters.

tests/btrdb/test_stream.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,28 @@ def test_aligned_windows(self):
478478
)
479479

480480

481+
def test_count(self):
482+
"""
483+
Test that stream count method uses aligned windows
484+
"""
485+
uu = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a')
486+
endpoint = Mock(Endpoint)
487+
windows = [
488+
[(StatPointProto(time=1,min=2,mean=3,max=4,count=5,stddev=6), StatPointProto(time=2,min=3,mean=4,max=5,count=6,stddev=7)), 42],
489+
[(StatPointProto(time=3,min=4,mean=5,max=6,count=7,stddev=8), StatPointProto(time=4,min=5,mean=6,max=7,count=8,stddev=9)), 42],
490+
]
491+
endpoint.alignedWindows = Mock(return_value=windows)
492+
stream = Stream(btrdb=BTrDB(endpoint), uuid=uu)
493+
494+
assert stream.count() == 26
495+
stream._btrdb.ep.alignedWindows.assert_called_once_with(
496+
uu, MINIMUM_TIME, MAXIMUM_TIME, 62, 0
497+
)
498+
499+
stream.count(10, 1000, 48, 1200)
500+
stream._btrdb.ep.alignedWindows.assert_called_with(uu, 10, 1000, 48, 1200)
501+
502+
481503
##########################################################################
482504
## earliest/latest tests
483505
##########################################################################
@@ -1083,6 +1105,50 @@ def test_currently_out_of_range(self, mocked):
10831105
with pytest.raises(ValueError, match="current time is not included in filtered stream range"):
10841106
streams.filter(start=0, end=10).current()
10851107

1108+
def test_count(self):
1109+
"""
1110+
Test the stream set count method
1111+
"""
1112+
uu1 = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a')
1113+
uu2 = uuid.UUID('4dadf38d-52a5-4b7a-ada9-a5d563f9538c')
1114+
endpoint = Mock(Endpoint)
1115+
windows = [
1116+
[(StatPointProto(time=1,min=2,mean=3,max=4,count=5,stddev=6), StatPointProto(time=2,min=3,mean=4,max=5,count=6,stddev=7)), 42],
1117+
[(StatPointProto(time=3,min=4,mean=5,max=6,count=7,stddev=8), StatPointProto(time=4,min=5,mean=6,max=7,count=8,stddev=9)), 42],
1118+
]
1119+
endpoint.alignedWindows = Mock(return_value=windows)
1120+
streams = StreamSet([
1121+
Stream(btrdb=BTrDB(endpoint), uuid=uu1),
1122+
Stream(btrdb=BTrDB(endpoint), uuid=uu2),
1123+
])
1124+
1125+
assert streams.count() == 52
1126+
endpoint.alignedWindows.assert_any_call(uu1, MINIMUM_TIME, MAXIMUM_TIME, 62, 0)
1127+
endpoint.alignedWindows.assert_any_call(uu2, MINIMUM_TIME, MAXIMUM_TIME, 62, 0)
1128+
1129+
1130+
def test_count_filtered(self):
1131+
"""
1132+
Test the stream set count method with filters
1133+
"""
1134+
uu1 = uuid.UUID('0d22a53b-e2ef-4e0a-ab89-b2d48fb2592a')
1135+
uu2 = uuid.UUID('4dadf38d-52a5-4b7a-ada9-a5d563f9538c')
1136+
endpoint = Mock(Endpoint)
1137+
endpoint.alignedWindows = Mock(return_value=[])
1138+
streams = StreamSet([
1139+
Stream(btrdb=BTrDB(endpoint), uuid=uu1),
1140+
Stream(btrdb=BTrDB(endpoint), uuid=uu2),
1141+
])
1142+
1143+
streams = streams.filter(start=10, end=1000)
1144+
streams.pin_versions({uu1: 42, uu2: 99})
1145+
streams.pointwidth = 48
1146+
1147+
streams.count()
1148+
endpoint.alignedWindows.assert_any_call(uu1, 10, 1000, 48, 42)
1149+
endpoint.alignedWindows.assert_any_call(uu2, 10, 1000, 48, 99)
1150+
1151+
10861152

10871153
##########################################################################
10881154
## filter tests

0 commit comments

Comments
 (0)