Skip to content

Commit a877e96

Browse files
minor fixes and enhanced support for StatPoint to the transformers [ch1705, ch3620, ch3621] (#63)
1 parent 5497532 commit a877e96

2 files changed

Lines changed: 224 additions & 52 deletions

File tree

btrdb/transformers.py

Lines changed: 79 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,18 @@ def _get_time_from_row(row):
2929
raise Exception("Row contains no data")
3030

3131

32-
def _stream_names(stream_set):
32+
def _stream_names(streamset):
3333
return tuple(
3434
s.collection + "/" + s.name \
35-
for s in stream_set._streams
35+
for s in streamset._streams
3636
)
3737

3838

3939
##########################################################################
4040
## Transform Functions
4141
##########################################################################
4242

43-
def to_series(stream_set, datetime64_index=True):
43+
def to_series(streamset, datetime64_index=True, agg="mean"):
4444
"""
4545
Returns a list of Pandas Series objects indexed by time
4646
@@ -50,20 +50,28 @@ def to_series(stream_set, datetime64_index=True):
5050
Directs function to convert Series index to np.datetime64[ns] or
5151
leave as np.int64.
5252
53+
agg : str, default: "mean"
54+
Specify the StatPoint field (e.g. aggregating function) to create the Series
55+
from. Must be one of "min", "mean", "max", "count", or "stddev". This
56+
argument is ignored if RawPoint values are passed into the function.
57+
5358
"""
5459
try:
5560
import pandas as pd
5661
except ImportError:
5762
raise ImportError("Please install Pandas to use this transformation function.")
5863

5964
result = []
60-
stream_names = _stream_names(stream_set)
65+
stream_names = _stream_names(streamset)
6166

62-
for idx, output in enumerate(stream_set.values()):
67+
for idx, output in enumerate(streamset.values()):
6368
times, values = [], []
64-
for item in output:
65-
times.append(item.time)
66-
values.append(item.value)
69+
for point in output:
70+
times.append(point.time)
71+
if point.__class__.__name__ == "RawPoint":
72+
values.append(point.value)
73+
else:
74+
values.append(getattr(point, agg))
6775

6876
if datetime64_index:
6977
times = pd.Index(times, dtype='datetime64[ns]')
@@ -73,7 +81,8 @@ def to_series(stream_set, datetime64_index=True):
7381
))
7482
return result
7583

76-
def to_dataframe(stream_set, columns=None):
84+
85+
def to_dataframe(streamset, columns=None, agg="mean"):
7786
"""
7887
Returns a Pandas DataFrame object indexed by time and using the values of a
7988
stream for each column.
@@ -82,47 +91,82 @@ def to_dataframe(stream_set, columns=None):
8291
----------
8392
columns: sequence
8493
column names to use for DataFrame
94+
95+
agg : str, default: "mean"
96+
Specify the StatPoint field (e.g. aggregating function) to create the Series
97+
from. Must be one of "min", "mean", "max", "count", or "stddev". This
98+
argument is ignored if RawPoint values are passed into the function.
99+
85100
"""
86101
try:
87102
import pandas as pd
88103
except ImportError:
89104
raise ImportError("Please install Pandas to use this transformation function.")
90105

91-
stream_names = _stream_names(stream_set)
106+
stream_names = _stream_names(streamset)
92107
columns = columns if columns else ["time"] + list(stream_names)
93-
return pd.DataFrame(to_dict(stream_set), columns=columns).set_index("time")
108+
return pd.DataFrame(to_dict(streamset,agg=agg), columns=columns).set_index("time")
94109

95110

96-
def to_array(stream_set):
111+
def to_array(streamset, agg="mean"):
97112
"""
98-
Returns a list of Numpy arrays (one per stream) containing point classes.
113+
Returns a multidimensional numpy array (similar to a list of lists) containing point
114+
classes.
115+
116+
Parameters
117+
----------
118+
agg : str, default: "mean"
119+
Specify the StatPoint field (e.g. aggregating function) to return for the
120+
arrays. Must be one of "min", "mean", "max", "count", or "stddev". This
121+
argument is ignored if RawPoint values are passed into the function.
122+
99123
"""
100124
try:
101125
import numpy as np
102126
except ImportError:
103127
raise ImportError("Please install Numpy to use this transformation function.")
104128

105-
return [np.array(output) for output in stream_set.values()]
129+
results = []
130+
for points in streamset.values():
131+
segment = []
132+
for point in points:
133+
if point.__class__.__name__ == "RawPoint":
134+
segment.append(point.value)
135+
else:
136+
segment.append(getattr(point, agg))
137+
results.append(segment)
138+
return np.array(results)
106139

107140

108-
def to_dict(stream_set):
141+
def to_dict(streamset, agg="mean"):
109142
"""
110143
Returns a list of OrderedDict for each time code with the appropriate
111144
stream data attached.
145+
146+
Parameters
147+
----------
148+
agg : str, default: "mean"
149+
Specify the StatPoint field (e.g. aggregating function) to constrain dict
150+
keys. Must be one of "min", "mean", "max", "count", or "stddev". This
151+
argument is ignored if RawPoint values are passed into the function.
152+
112153
"""
113154
data = []
114-
stream_names = _stream_names(stream_set)
115-
for row in stream_set.rows():
155+
stream_names = _stream_names(streamset)
156+
for row in streamset.rows():
116157
item = OrderedDict({
117158
"time": _get_time_from_row(row),
118159
})
119160
for idx, col in enumerate(stream_names):
120-
item[col] = row[idx].value if row[idx] else None
161+
if row[idx].__class__.__name__ == "RawPoint":
162+
item[col] = row[idx].value if row[idx] else None
163+
else:
164+
item[col] = getattr(row[idx], agg) if row[idx] else None
121165
data.append(item)
122166
return data
123167

124168

125-
def to_csv(stream_set, fobj, dialect=None, fieldnames=None):
169+
def to_csv(streamset, fobj, dialect=None, fieldnames=None, agg="mean"):
126170
"""
127171
Saves stream data as a CSV file.
128172
@@ -139,6 +183,10 @@ def to_csv(stream_set, fobj, dialect=None, fieldnames=None):
139183
A sequence of strings to use as fieldnames in the CSV header. See
140184
Python's csv module for more information.
141185
186+
agg : str, default: "mean"
187+
Specify the StatPoint field (e.g. aggregating function) to return when
188+
limiting results. Must be one of "min", "mean", "max", "count", or "stddev".
189+
This argument is ignored if RawPoint values are passed into the function.
142190
"""
143191

144192
@contextlib.contextmanager
@@ -155,27 +203,35 @@ def open_path_or_file(path_or_file):
155203
file_to_close.close()
156204

157205
with open_path_or_file(fobj) as csvfile:
158-
stream_names = _stream_names(stream_set)
206+
stream_names = _stream_names(streamset)
159207
fieldnames = fieldnames if fieldnames else ["time"] + list(stream_names)
160208

161209
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, dialect=dialect)
162210
writer.writeheader()
163211

164-
for item in to_dict(stream_set):
212+
for item in to_dict(streamset, agg=agg):
165213
writer.writerow(item)
166214

167215

168-
def to_table(stream_set):
216+
def to_table(streamset, agg="mean"):
169217
"""
170218
Returns string representation of the data in tabular form using the tabulate
171219
library.
220+
221+
Parameters
222+
----------
223+
agg : str, default: "mean"
224+
Specify the StatPoint field (e.g. aggregating function) to create the Series
225+
from. Must be one of "min", "mean", "max", "count", or "stddev". This
226+
argument is ignored if RawPoint values are passed into the function.
227+
172228
"""
173229
try:
174230
from tabulate import tabulate
175231
except ImportError:
176232
raise ImportError("Please install tabulate to use this transformation function.")
177233

178-
return tabulate(stream_set.to_dict(), headers="keys")
234+
return tabulate(streamset.to_dict(agg=agg), headers="keys")
179235

180236

181237
##########################################################################

0 commit comments

Comments
 (0)