66
77from riak import RiakError
88from riak .codecs import Codec , Msg
9+ from riak .pb .messages import MSG_CODE_TS_TTB_MSG
910from riak .ts_object import TsColumns
1011from riak .util import bytes_to_str , unix_time_millis , \
1112 datetime_from_unix_time_millis
2324tsdelreq_a = Atom ('tsdelreq' )
2425timestamp_a = Atom ('timestamp' )
2526
26- # TODO RTS-842
27- MSG_CODE_TS_TTB = 104
28-
2927
3028class TtbCodec (Codec ):
3129 '''
@@ -36,7 +34,7 @@ def __init__(self, **unused_args):
3634 super (TtbCodec , self ).__init__ (** unused_args )
3735
3836 def parse_msg (self , msg_code , data ):
39- if msg_code != MSG_CODE_TS_TTB :
37+ if msg_code != MSG_CODE_TS_TTB_MSG :
4038 raise RiakError ("TTB can't parse code: {}" .format (msg_code ))
4139 if len (data ) > 0 :
4240 decoded = decode (data )
@@ -61,6 +59,7 @@ def encode_to_ts_cell(self, cell):
6159 else :
6260 if isinstance (cell , datetime .datetime ):
6361 ts = unix_time_millis (cell )
62+ # logging.debug('encoded datetime %s as %s', cell, ts)
6463 return ts
6564 elif isinstance (cell , bool ):
6665 return cell
@@ -84,19 +83,19 @@ def encode_timeseries_keyreq(self, table, key, is_delete=False):
8483 else :
8584 raise ValueError ("key must be a list" )
8685
87- mc = MSG_CODE_TS_TTB
88- rc = MSG_CODE_TS_TTB
86+ mc = MSG_CODE_TS_TTB_MSG
87+ rc = MSG_CODE_TS_TTB_MSG
8988 req_atom = tsgetreq_a
9089 if is_delete :
9190 req_atom = tsdelreq_a
9291
93- # TODO RTS-842 timeout is last
92+ # TODO FUTURE add timeout as last param
9493 req = req_atom , table .name , \
9594 [self .encode_to_ts_cell (k ) for k in key_vals ], udef_a
9695 return Msg (mc , encode (req ), rc )
9796
9897 def validate_timeseries_put_resp (self , resp_code , resp ):
99- if resp is None and resp_code == MSG_CODE_TS_TTB :
98+ if resp is None and resp_code == MSG_CODE_TS_TTB_MSG :
10099 return True
101100 if resp is not None :
102101 return True
@@ -123,8 +122,8 @@ def encode_timeseries_put(self, tsobj):
123122 req_r .append (self .encode_to_ts_cell (cell ))
124123 req_rows .append (tuple (req_r ))
125124 req = tsputreq_a , tsobj .table .name , [], req_rows
126- mc = MSG_CODE_TS_TTB
127- rc = MSG_CODE_TS_TTB
125+ mc = MSG_CODE_TS_TTB_MSG
126+ rc = MSG_CODE_TS_TTB_MSG
128127 return Msg (mc , encode (req ), rc )
129128 else :
130129 raise RiakError ("TsObject requires a list of rows" )
@@ -135,11 +134,12 @@ def encode_timeseries_query(self, table, query, interpolations=None):
135134 q = q .format (table = table .name )
136135 tsi = tsinterpolation_a , q , []
137136 req = tsqueryreq_a , tsi , False , []
138- mc = MSG_CODE_TS_TTB
139- rc = MSG_CODE_TS_TTB
137+ mc = MSG_CODE_TS_TTB_MSG
138+ rc = MSG_CODE_TS_TTB_MSG
140139 return Msg (mc , encode (req ), rc )
141140
142- def decode_timeseries (self , resp_ttb , tsobj ):
141+ def decode_timeseries (self , resp_ttb , tsobj ,
142+ convert_timestamp = False ):
143143 """
144144 Fills an TsObject with the appropriate data and
145145 metadata from a TTB-encoded TsGetResp / TsQueryResp.
@@ -148,6 +148,8 @@ def decode_timeseries(self, resp_ttb, tsobj):
148148 :type resp_ttb: TTB-encoded tsqueryrsp or tsgetresp
149149 :param tsobj: a TsObject
150150 :type tsobj: TsObject
151+ :param convert_timestamp: Convert timestamps to datetime objects
152+ :type tsobj: boolean
151153 """
152154 if resp_ttb is None :
153155 return tsobj
@@ -170,7 +172,8 @@ def decode_timeseries(self, resp_ttb, tsobj):
170172 tsobj .rows = []
171173 for resp_row in resp_rows :
172174 tsobj .rows .append (
173- self .decode_timeseries_row (resp_row , resp_coltypes ))
175+ self .decode_timeseries_row (resp_row , resp_coltypes ,
176+ convert_timestamp ))
174177 else :
175178 raise RiakError (
176179 "Expected 3-tuple in response, got: {}" .format (resp_data ))
@@ -182,14 +185,16 @@ def decode_timeseries_cols(self, cnames, ctypes):
182185 ctypes = [str (ctype ) for ctype in ctypes ]
183186 return TsColumns (cnames , ctypes )
184187
185- def decode_timeseries_row (self , tsrow , tsct ):
188+ def decode_timeseries_row (self , tsrow , tsct , convert_timestamp = False ):
186189 """
187190 Decodes a TTB-encoded TsRow into a list
188191
189192 :param tsrow: the TTB decoded TsRow to decode.
190193 :type tsrow: TTB dncoded row
191194 :param tsct: the TTB decoded column types (atoms).
192195 :type tsct: list
196+ :param convert_timestamp: Convert timestamps to datetime objects
197+ :type tsobj: boolean
193198 :rtype list
194199 """
195200 row = []
@@ -199,7 +204,7 @@ def decode_timeseries_row(self, tsrow, tsct):
199204 elif isinstance (cell , list ) and len (cell ) == 0 :
200205 row .append (None )
201206 else :
202- if tsct [i ] == timestamp_a :
207+ if convert_timestamp and tsct [i ] == timestamp_a :
203208 row .append (datetime_from_unix_time_millis (cell ))
204209 else :
205210 row .append (cell )
0 commit comments