Skip to content

Commit 61cf70d

Browse files
Implement lazy dataframes.
1 parent 633027d commit 61cf70d

1 file changed

Lines changed: 115 additions & 1 deletion

File tree

btrdb/experimental/dask.py

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
import btrdb.exceptions
33
import dask
44
import dask.distributed
5-
import pandas as pd
5+
import dask.dataframe
6+
import pyarrow
7+
import pandas
68

79
# This process local connection variable is initialized in all
810
# dask worker processes by the configure function.
@@ -84,3 +86,115 @@ def configure(client=None, conn_str=None, apikey=None, profile=None):
8486
# Configure the distributed scheduler.
8587
plugin = BtrdbConnectionPlugin(**creds)
8688
client.register_worker_plugin(plugin, name="btrdb_connection")
89+
90+
91+
@dask.delayed
92+
def _stream_as_dataframe_part(uuid, start, end, snap_period, data_column, version):
93+
db = get_btrdb()
94+
# For now we use multi values because it implements both timesnapping and does
95+
# not return duplicate values for a single timestamp. Both of these are useful
96+
# properties for how dask dataframe partitions are supposed to behave.
97+
values = db.ep.arrowMultiValues([uuid], start, end, [version], snap_period)
98+
values = [v for v in values]
99+
if len(values) != 0:
100+
values = pyarrow.concat_tables([v for v in values])
101+
else:
102+
schema = pa.schema(
103+
[
104+
pyarrow.field(
105+
"time", pyarrow.timestamp("ns", tz="UTC"), nullable=False
106+
),
107+
pyarrow.field(str(uuid), pyarrow.float64(), nullable=False),
108+
]
109+
)
110+
values = pa.Table.from_arrays([pa.array([]), pa.array([])], schema=schema)
111+
# XXX ensure this is zero copy.
112+
values = values.rename_columns(["time", data_column])
113+
values = values.to_pandas()
114+
# XXX Can we do this from to_dataframe?
115+
values.set_index("time", inplace=True)
116+
return values
117+
118+
119+
def stream_as_dataframe(
120+
stream,
121+
start=None,
122+
end=None,
123+
partitions=1,
124+
snap_period=0,
125+
data_column=None,
126+
version=0,
127+
):
128+
"""
129+
Converts a btrdb stream to a lazy Dask DataFrame.
130+
131+
Parameters:
132+
----------
133+
stream : btrdb.stream.Stream
134+
The stream containing the data.
135+
136+
start : datetime-like, optional
137+
The start time for the data from the stream. Defaults to the earliest time in the stream.
138+
139+
end : datetime-like, optional
140+
The end time for the data from the stream. Defaults to the latest time in the stream.
141+
142+
partitions : int, optional
143+
Number of partitions for the dask dataframe. Default is 1.
144+
145+
snap_period : int, optional
146+
The period for data time snapping.
147+
Defaults to 0, which means no snapping.
148+
149+
data_column : str or callable, optional
150+
The name of the data column. If None, it defaults to the collection and name of the stream.
151+
If callable, the function is applied to the stream object to determine the data column name.
152+
153+
version : int, optional
154+
The stream version to be used. Defaults to 0.
155+
156+
Returns:
157+
--------
158+
Dask DataFrame
159+
The Dask DataFrame containing the stream data.
160+
"""
161+
if data_column is None:
162+
data_column = stream.collection + "/" + stream.name
163+
else:
164+
if type(data_column) != str:
165+
# assume callable.
166+
data_column = data_column(s)
167+
if start is None:
168+
start = stream.earliest()[0].time
169+
if end is None:
170+
end = stream.latest()[0].time
171+
duration = end - start
172+
if partitions >= duration:
173+
partitions = 1
174+
part_duration = duration // partitions
175+
if snap_period != 0:
176+
# N.B. Due to the way the server does time snapping, we need to ensure that our partitions are aligned to the
177+
# timesnapping period. The reason for this is we don't want values to snapped into both partitions by accident.
178+
remainder = part_duration % snap_period
179+
if remainder != 0:
180+
part_duration += snap_period - remainder
181+
parts = []
182+
divisions = []
183+
part_start = start
184+
while part_start < end:
185+
part_end = min(part_start + part_duration, end)
186+
part = _stream_as_dataframe_part(
187+
stream.uuid, part_start, part_end, snap_period, data_column, version
188+
)
189+
parts.append(part)
190+
divisions.append(part_start)
191+
part_start += part_duration
192+
divisions.append(end)
193+
meta = pandas.DataFrame(
194+
index=pandas.DatetimeIndex([], tz="UTC"),
195+
columns=[data_column],
196+
dtype="float64",
197+
)
198+
return dask.dataframe.from_delayed(
199+
parts, meta=meta, divisions=divisions, verify_meta=False
200+
)

0 commit comments

Comments
 (0)