Skip to content

Commit cabb373

Browse files
author
Luke Bakken
committed
Add ts_convert_timestamp transport option
1 parent 001865c commit cabb373

6 files changed

Lines changed: 42 additions & 18 deletions

File tree

riak/codecs/pbuf.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,8 @@ def encode_timeseries_query(self, table, query, interpolations=None):
773773
rc = riak.pb.messages.MSG_CODE_TS_QUERY_RESP
774774
return Msg(mc, req.SerializeToString(), rc)
775775

776-
def decode_timeseries(self, resp, tsobj):
776+
def decode_timeseries(self, resp, tsobj,
777+
convert_timestamp=False):
777778
"""
778779
Fills an TsObject with the appropriate data and
779780
metadata from a TsGetResp / TsQueryResp.
@@ -783,6 +784,8 @@ def decode_timeseries(self, resp, tsobj):
783784
riak.pb.riak_ts_pb2.TsGetResp
784785
:param tsobj: a TsObject
785786
:type tsobj: TsObject
787+
:param convert_timestamp: Convert timestamps to datetime objects
788+
:type tsobj: boolean
786789
"""
787790
if resp.columns is not None:
788791
col_names = []
@@ -798,7 +801,7 @@ def decode_timeseries(self, resp, tsobj):
798801
for row in resp.rows:
799802
tsobj.rows.append(
800803
self.decode_timeseries_row(
801-
row, resp.columns))
804+
row, resp.columns, convert_timestamp))
802805

803806
def decode_timeseries_col_type(self, col_type):
804807
# NB: these match the atom names for column types
@@ -816,7 +819,8 @@ def decode_timeseries_col_type(self, col_type):
816819
msg = 'could not decode column type: {}'.format(col_type)
817820
raise RiakError(msg)
818821

819-
def decode_timeseries_row(self, tsrow, tscols=None):
822+
def decode_timeseries_row(self, tsrow, tscols=None,
823+
convert_timestamp=False):
820824
"""
821825
Decodes a TsRow into a list
822826
@@ -850,8 +854,10 @@ def decode_timeseries_row(self, tsrow, tscols=None):
850854
if col and col.type != TsColumnType.Value('TIMESTAMP'):
851855
raise TypeError('expected TIMESTAMP column')
852856
else:
853-
dt = datetime_from_unix_time_millis(
854-
cell.timestamp_value)
857+
dt = cell.timestamp_value
858+
if convert_timestamp:
859+
dt = datetime_from_unix_time_millis(
860+
cell.timestamp_value)
855861
row.append(dt)
856862
elif cell.HasField('boolean_value'):
857863
if col and col.type != TsColumnType.Value('BOOLEAN'):

riak/codecs/ttb.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,8 @@ def encode_timeseries_query(self, table, query, interpolations=None):
138138
rc = MSG_CODE_TS_TTB_MSG
139139
return Msg(mc, encode(req), rc)
140140

141-
def decode_timeseries(self, resp_ttb, tsobj):
141+
def decode_timeseries(self, resp_ttb, tsobj,
142+
convert_timestamp=False):
142143
"""
143144
Fills an TsObject with the appropriate data and
144145
metadata from a TTB-encoded TsGetResp / TsQueryResp.
@@ -147,6 +148,8 @@ def decode_timeseries(self, resp_ttb, tsobj):
147148
:type resp_ttb: TTB-encoded tsqueryrsp or tsgetresp
148149
:param tsobj: a TsObject
149150
:type tsobj: TsObject
151+
:param convert_timestamp: Convert timestamps to datetime objects
152+
:type tsobj: boolean
150153
"""
151154
if resp_ttb is None:
152155
return tsobj
@@ -169,7 +172,8 @@ def decode_timeseries(self, resp_ttb, tsobj):
169172
tsobj.rows = []
170173
for resp_row in resp_rows:
171174
tsobj.rows.append(
172-
self.decode_timeseries_row(resp_row, resp_coltypes))
175+
self.decode_timeseries_row(resp_row, resp_coltypes,
176+
convert_timestamp))
173177
else:
174178
raise RiakError(
175179
"Expected 3-tuple in response, got: {}".format(resp_data))
@@ -181,14 +185,16 @@ def decode_timeseries_cols(self, cnames, ctypes):
181185
ctypes = [str(ctype) for ctype in ctypes]
182186
return TsColumns(cnames, ctypes)
183187

184-
def decode_timeseries_row(self, tsrow, tsct):
188+
def decode_timeseries_row(self, tsrow, tsct, convert_timestamp=False):
185189
"""
186190
Decodes a TTB-encoded TsRow into a list
187191
188192
:param tsrow: the TTB decoded TsRow to decode.
189193
:type tsrow: TTB dncoded row
190194
:param tsct: the TTB decoded column types (atoms).
191195
:type tsct: list
196+
:param convert_timestamp: Convert timestamps to datetime objects
197+
:type tsobj: boolean
192198
:rtype list
193199
"""
194200
row = []
@@ -198,7 +204,7 @@ def decode_timeseries_row(self, tsrow, tsct):
198204
elif isinstance(cell, list) and len(cell) == 0:
199205
row.append(None)
200206
else:
201-
if tsct[i] == timestamp_a:
207+
if convert_timestamp and tsct[i] == timestamp_a:
202208
row.append(datetime_from_unix_time_millis(cell))
203209
else:
204210
row.append(cell)

riak/tests/test_timeseries_pbuf.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ def test_decode_data_from_query(self):
161161

162162
tsobj = TsObject(None, self.table)
163163
c = PbufCodec()
164-
c.decode_timeseries(tqr, tsobj)
164+
c.decode_timeseries(tqr, tsobj, True)
165165

166166
self.assertEqual(len(tsobj.rows), len(self.rows))
167167
self.assertEqual(len(tsobj.columns.names), len(tqr.columns))
@@ -197,7 +197,8 @@ def test_decode_data_from_query(self):
197197
@unittest.skipUnless(is_timeseries_supported() and RUN_TIMESERIES,
198198
'Timeseries not supported or RUN_TIMESERIES is 0')
199199
class TimeseriesPbufTests(IntegrationTestBase, unittest.TestCase):
200-
client_options = {'transport_options': {'use_ttb': False}}
200+
client_options = {'transport_options':
201+
{'use_ttb': False, 'ts_convert_timestamp': True}}
201202

202203
@classmethod
203204
def setUpClass(cls):

riak/tests/test_timeseries_ttb.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,9 @@ def test_decode_data_from_get(self):
9292
self.assertEqual(r[0], dr[0].encode('utf-8'))
9393
self.assertEqual(r[1], dr[1])
9494
self.assertEqual(r[2], dr[2])
95-
dt = datetime_from_unix_time_millis(dr[3])
96-
self.assertEqual(r[3], dt)
95+
# NB *not* decoding timestamps
96+
# dt = datetime_from_unix_time_millis(dr[3])
97+
self.assertEqual(r[3], dr[3])
9798
if i == 0:
9899
self.assertEqual(r[4], True)
99100
else:
@@ -123,7 +124,8 @@ def test_encode_data_for_put(self):
123124
@unittest.skipUnless(is_timeseries_supported() and RUN_TIMESERIES,
124125
'Timeseries not supported or RUN_TIMESERIES is 0')
125126
class TimeseriesTtbTests(IntegrationTestBase, unittest.TestCase):
126-
client_options = {'transport_options': {'use_ttb': True}}
127+
client_options = {'transport_options':
128+
{'use_ttb': True, 'ts_convert_timestamp': True}}
127129

128130
@classmethod
129131
def setUpClass(cls):

riak/transports/tcp/stream.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,10 @@ class PbufTsKeyStream(PbufStream, TtbCodec):
174174

175175
_expect = riak.pb.messages.MSG_CODE_TS_LIST_KEYS_RESP
176176

177+
def __init__(self, transport, codec, convert_timestamp=False):
178+
super(PbufTsKeyStream, self).__init__(transport, codec)
179+
self._convert_timestamp = convert_timestamp
180+
177181
def next(self):
178182
response = super(PbufTsKeyStream, self).next()
179183

@@ -182,7 +186,8 @@ def next(self):
182186

183187
keys = []
184188
for tsrow in response.keys:
185-
keys.append(self.codec.decode_timeseries_row(tsrow))
189+
keys.append(self.codec.decode_timeseries_row(tsrow,
190+
convert_timestamp=self._convert_timestamp))
186191

187192
return keys
188193

riak/transports/tcp/transport.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ def __init__(self,
3838
self._pbuf_c = None
3939
self._ttb_c = None
4040
self._use_ttb = kwargs.get('use_ttb', True)
41+
self._ts_convert_timestamp = \
42+
kwargs.get('ts_convert_timestamp', False)
4143

4244
def _get_pbuf_codec(self):
4345
if not self._pbuf_c:
@@ -147,7 +149,8 @@ def ts_get(self, table, key):
147149
msg = codec.encode_timeseries_keyreq(table, key)
148150
resp_code, resp = self._request(msg, codec)
149151
tsobj = TsObject(self._client, table)
150-
codec.decode_timeseries(resp, tsobj)
152+
codec.decode_timeseries(resp, tsobj,
153+
self._ts_convert_timestamp)
151154
return tsobj
152155

153156
def ts_put(self, tsobj):
@@ -173,7 +176,8 @@ def ts_query(self, table, query, interpolations=None):
173176
msg = codec.encode_timeseries_query(table, query, interpolations)
174177
resp_code, resp = self._request(msg, codec)
175178
tsobj = TsObject(self._client, table)
176-
codec.decode_timeseries(resp, tsobj)
179+
codec.decode_timeseries(resp, tsobj,
180+
self._ts_convert_timestamp)
177181
return tsobj
178182

179183
def ts_stream_keys(self, table, timeout=None):
@@ -185,7 +189,7 @@ def ts_stream_keys(self, table, timeout=None):
185189
codec = self._get_codec(msg_code)
186190
msg = codec.encode_timeseries_listkeysreq(table, timeout)
187191
self._send_msg(msg.msg_code, msg.data)
188-
return PbufTsKeyStream(self, codec)
192+
return PbufTsKeyStream(self, codec, self._ts_convert_timestamp)
189193

190194
def delete(self, robj, rw=None, r=None, w=None, dw=None,
191195
pr=None, pw=None, timeout=None):

0 commit comments

Comments
 (0)