2121from copy import deepcopy
2222from collections .abc import Sequence
2323
24+ from btrdb .utils .buffer import PointBuffer
2425from btrdb .point import RawPoint , StatPoint
2526from btrdb .transformers import StreamSetTransformer
26- from btrdb .utils .buffer import PointBuffer
27- from btrdb .utils .timez import currently_as_ns , to_nanoseconds
28- from btrdb .utils .conversion import AnnotationEncoder
2927from btrdb .exceptions import BTrDBError , InvalidOperation
28+ from btrdb .utils .timez import currently_as_ns , to_nanoseconds
29+ from btrdb .utils .conversion import AnnotationEncoder , AnnotationDecoder
3030
3131
3232##########################################################################
3939
4040try :
4141 RE_PATTERN = re ._pattern_type
42- except :
42+ except Exception :
4343 RE_PATTERN = re .Pattern
4444
4545
@@ -74,29 +74,23 @@ def __init__(self, btrdb, uuid, **db_values):
7474 self ._btrdb = btrdb
7575 self ._uuid = uuid
7676
77-
7877 def refresh_metadata (self ):
7978 """
8079 Refreshes the locally cached meta data for a stream
8180
8281 Queries the BTrDB server for all stream metadata including collection,
8382 annotation, and tags. This method requires a round trip to the server.
84-
8583 """
8684
8785 ep = self ._btrdb .ep
8886 self ._collection , self ._property_version , self ._tags , self ._annotations , _ = ep .streamInfo (self ._uuid , False , True )
8987 self ._known_to_exist = True
9088
9189 # deserialize annoation values
92- parts = []
93- for k , v in self ._annotations .items ():
94- try :
95- parts .append ([k , json .loads (v )])
96- except json .decoder .JSONDecodeError :
97- parts .append ([k , v ])
98-
99- self ._annotations = dict (parts )
90+ self ._annotations = {
91+ key : json .loads (val , cls = AnnotationDecoder )
92+ for key , val in self ._annotations .items ()
93+ }
10094
10195 def exists (self ):
10296 """
@@ -128,6 +122,42 @@ def exists(self):
128122 return False
129123 raise bte
130124
125+ def count (self , start = MINIMUM_TIME , end = MAXIMUM_TIME , pointwidth = 62 , version = 0 ):
126+ """
127+ Compute the total number of points in the stream
128+
129+ Counts the number of points in the specified window and version. By
130+ default returns the latest total count of points in the stream. This
131+ helper method sums the counts of all StatPoints returned by
132+ ``aligned_windows``. Because of this, note that the start and end
133+ timestamps may be adjusted if they are not powers of 2. For smaller
134+ windows of time, you may also need to adjust the pointwidth to ensure
135+ that the count granularity is captured appropriately.
136+
137+ Parameters
138+ ----------
139+ start : int or datetime like object, default: MINIMUM_TIME
140+ The start time in nanoseconds for the range to be queried. (see
141+ :func:`btrdb.utils.timez.to_nanoseconds` for valid input types)
142+
143+ end : int or datetime like object, default: MAXIMUM_TIME
144+ The end time in nanoseconds for the range to be queried. (see
145+ :func:`btrdb.utils.timez.to_nanoseconds` for valid input types)
146+
147+ pointwidth : int, default: 62
148+ Specify the number of ns between data points (2**pointwidth)
149+
150+ version : int, default: 0
151+ Version of the stream to query
152+
153+ Returns
154+ -------
155+ int
156+ The total number of points in the stream for the specified window.
157+ """
158+ points = self .aligned_windows (start , end , pointwidth , version )
159+ return sum ([point .count for point , _ in points ])
160+
131161 @property
132162 def btrdb (self ):
133163 """
@@ -396,9 +426,14 @@ def _update_tags_collection(self, tags, collection):
396426 )
397427
398428 def _update_annotations (self , annotations , encoder ):
399- serialized = dict (
400- [[k , json .dumps (v , cls = encoder )] for k , v in annotations .items ()]
401- )
429+ # make a copy of the annotations to prevent accidental mutable object mutation
430+ serialized = deepcopy (annotations )
431+ if encoder is not None :
432+ serialized = {
433+ k : json .dumps (v , cls = encoder , indent = None , allow_nan = True )
434+ for k , v in serialized .items ()
435+ }
436+
402437 self ._btrdb .ep .setStreamAnnotations (
403438 uu = self .uuid ,
404439 expected = self ._property_version ,
@@ -417,8 +452,9 @@ def update(self, tags=None, annotations=None, collection=None, encoder=Annotatio
417452 dict of annotation information for the stream.
418453 collection: str
419454 The collection prefix for a stream
420- encoder: json.JSONEncoder
421- JSON encoder to class to use for annotation serializations
455+ encoder: json.JSONEncoder or None
456+ JSON encoder to class to use for annotation serializations, set to
457+ None to prevent JSON encoding of the annotations.
422458
423459 Returns
424460 -------
@@ -772,6 +808,44 @@ def versions(self):
772808 """
773809 return self ._pinned_versions if self ._pinned_versions else self ._latest_versions ()
774810
811+ def count (self ):
812+ """
813+ Compute the total number of points in the streams using filters.
814+
815+ Computes the total number of points across all streams using the
816+ specified filters. By default, this returns the latest total count of
817+ all points in the streams. The count is modified by start and end
818+ filters or by pinning versions.
819+
820+ Note that this helper method sums the counts of all StatPoints returned
821+ by ``aligned_windows``. Because of this the start and end timestamps
822+ may be adjusted if they are not powers of 2. You can also set the
823+ pointwidth property for smaller windows of time to ensure that the
824+ count granularity is captured appropriately.
825+
826+ Parameters
827+ ----------
828+ None
829+
830+ Returns
831+ -------
832+ int
833+ The total number of points in all streams for the specified filters.
834+ """
835+ params = self ._params_from_filters ()
836+ start = params .get ("start" , MINIMUM_TIME )
837+ end = params .get ("end" , MAXIMUM_TIME )
838+
839+ pointwidth = self .pointwidth if self .pointwidth is not None else 62
840+ versions = self ._pinned_versions if self ._pinned_versions else {}
841+
842+ count = 0
843+ for s in self ._streams :
844+ version = versions .get (s .uuid , 0 )
845+ count += s .count (start , end , pointwidth , version )
846+
847+ return count
848+
775849 def earliest (self ):
776850 """
777851 Returns earliest points of data in streams using available filters.
0 commit comments