3434##########################################################################
3535
3636INSERT_BATCH_SIZE = 5000
37+ MINIMUM_TIME = - (16 << 56 )
38+ MAXIMUM_TIME = (48 << 56 ) - 1
3739
3840try :
3941 RE_PATTERN = re ._pattern_type
@@ -196,12 +198,14 @@ def collection(self):
196198
197199 def earliest (self , version = 0 ):
198200 """
199- Returns the first point of data in the stream. Returns None if error
201+ Returns the first point of data in the stream. Returns None if error
200202 encountered during lookup or no values in stream.
201203
202204 Parameters
203205 ----------
204- None
206+ version : int, default: 0
207+ Specify the version of the stream to query; if zero, queries the latest
208+ stream state rather than pinning to a version.
205209
206210 Returns
207211 -------
@@ -210,18 +214,19 @@ def earliest(self, version=0):
210214 the value was retrieved at (tuple(RawPoint, int)).
211215
212216 """
213- start = 0
217+ start = MINIMUM_TIME
214218 return self .nearest (start , version = version , backward = False )
215219
216220 def latest (self , version = 0 ):
217221 """
218- Returns last point of data in the stream. Note that this method will
219- return None if no point can be found that is less than the current
220- date/time.
222+ Returns last point of data in the stream. Returns None if error
223+ encountered during lookup or no values in stream.
221224
222225 Parameters
223226 ----------
224- None
227+ version : int, default: 0
228+ Specify the version of the stream to query; if zero, queries the latest
229+ stream state rather than pinning to a version.
225230
226231 Returns
227232 -------
@@ -230,9 +235,30 @@ def latest(self, version=0):
230235 the value was retrieved at (tuple(RawPoint, int)).
231236
232237 """
233- start = currently_as_ns ()
238+ start = MAXIMUM_TIME
234239 return self .nearest (start , version = version , backward = True )
235240
241+ def current (self , version = 0 ):
242+ """
243+ Returns the point that is closest to the current timestamp, e.g. the latest
244+ point in the stream up until now. Note that no future values will be returned.
245+ Returns None if errors during lookup or there are no values before now.
246+
247+ Parameters
248+ ----------
249+ version : int, default: 0
250+ Specify the version of the stream to query; if zero, queries the latest
251+ stream state rather than pinning to a version.
252+
253+ Returns
254+ -------
255+ tuple
256+ The last data point in the stream up until now and the version of the stream
257+ the value was retrieved at (tuple(RawPoint, int)).
258+ """
259+ start = currently_as_ns ()
260+ return self .nearest (start , version , backward = True )
261+
236262 def tags (self , refresh = False ):
237263 """
238264 Returns the stream's tags.
@@ -728,8 +754,7 @@ def versions(self):
728754
729755 def earliest (self ):
730756 """
731- Returns earliest points of data in streams using available
732- filters.
757+ Returns earliest points of data in streams using available filters.
733758
734759 Parameters
735760 ----------
@@ -743,7 +768,7 @@ def earliest(self):
743768 """
744769 earliest = []
745770 params = self ._params_from_filters ()
746- start = params .get ("start" , 0 )
771+ start = params .get ("start" , MINIMUM_TIME )
747772
748773 for s in self ._streams :
749774 version = self .versions ()[s .uuid ]
@@ -754,10 +779,7 @@ def earliest(self):
754779
755780 def latest (self ):
756781 """
757- Returns latest points of data in the streams using available
758- filters. Note that this method will return None if no
759- end filter is provided and point cannot be found that is less than the
760- current date/time.
782+ Returns latest points of data in the streams using available filters.
761783
762784 Parameters
763785 ----------
@@ -771,7 +793,7 @@ def latest(self):
771793 """
772794 latest = []
773795 params = self ._params_from_filters ()
774- start = params .get ("end" , currently_as_ns () )
796+ start = params .get ("end" , MAXIMUM_TIME )
775797
776798 for s in self ._streams :
777799 version = self .versions ()[s .uuid ]
@@ -780,6 +802,37 @@ def latest(self):
780802
781803 return tuple (latest )
782804
805+ def current (self ):
806+ """
807+ Returns the points of data in the streams closest to the current timestamp. If
808+ the current timestamp is outside of the filtered range of data, a ValueError is
809+ raised.
810+
811+ Parameters
812+ ----------
813+ None
814+
815+ Returns
816+ -------
817+ tuple
818+ The latest points of data found among all streams
819+ """
820+ latest = []
821+ params = self ._params_from_filters ()
822+ now = currently_as_ns ()
823+ end = params .get ("end" , None )
824+ start = params .get ("start" , None )
825+
826+ if (end is not None and end <= now ) or (start is not None and start > now ):
827+ raise ValueError ("current time is not included in filtered stream range" )
828+
829+ for s in self ._streams :
830+ version = self .versions ()[s .uuid ]
831+ point , _ = s .nearest (now , version = version , backward = True )
832+ latest .append (point )
833+
834+ return tuple (latest )
835+
783836 def filter (self , start = None , end = None , collection = None , name = None , unit = None ,
784837 tags = None , annotations = None ):
785838 """
0 commit comments