66
77from riak import RiakError
88from riak .codecs import Codec , Msg
9+ from riak .pb .messages import MSG_CODE_TS_TTB_MSG
910from riak .ts_object import TsColumns
1011from riak .util import bytes_to_str , unix_time_millis , \
1112 datetime_from_unix_time_millis
2324tsdelreq_a = Atom ('tsdelreq' )
2425timestamp_a = Atom ('timestamp' )
2526
26- # TODO RTS-842
27- MSG_CODE_TS_TTB = 104
28-
2927
3028class TtbCodec (Codec ):
3129 '''
@@ -36,7 +34,7 @@ def __init__(self, **unused_args):
3634 super (TtbCodec , self ).__init__ (** unused_args )
3735
3836 def parse_msg (self , msg_code , data ):
39- if msg_code != MSG_CODE_TS_TTB :
37+ if msg_code != MSG_CODE_TS_TTB_MSG :
4038 raise RiakError ("TTB can't parse code: {}" .format (msg_code ))
4139 if len (data ) > 0 :
4240 decoded = decode (data )
@@ -61,6 +59,7 @@ def encode_to_ts_cell(self, cell):
6159 else :
6260 if isinstance (cell , datetime .datetime ):
6361 ts = unix_time_millis (cell )
62+ # logging.debug('encoded datetime %s as %s', cell, ts)
6463 return ts
6564 elif isinstance (cell , bool ):
6665 return cell
@@ -84,19 +83,19 @@ def encode_timeseries_keyreq(self, table, key, is_delete=False):
8483 else :
8584 raise ValueError ("key must be a list" )
8685
87- mc = MSG_CODE_TS_TTB
88- rc = MSG_CODE_TS_TTB
86+ mc = MSG_CODE_TS_TTB_MSG
87+ rc = MSG_CODE_TS_TTB_MSG
8988 req_atom = tsgetreq_a
9089 if is_delete :
9190 req_atom = tsdelreq_a
9291
93- # TODO RTS-842 timeout is last
92+ # TODO FUTURE add timeout as last param
9493 req = req_atom , table .name , \
9594 [self .encode_to_ts_cell (k ) for k in key_vals ], udef_a
9695 return Msg (mc , encode (req ), rc )
9796
9897 def validate_timeseries_put_resp (self , resp_code , resp ):
99- if resp is None and resp_code == MSG_CODE_TS_TTB :
98+ if resp is None and resp_code == MSG_CODE_TS_TTB_MSG :
10099 return True
101100 if resp is not None :
102101 return True
@@ -123,8 +122,8 @@ def encode_timeseries_put(self, tsobj):
123122 req_r .append (self .encode_to_ts_cell (cell ))
124123 req_rows .append (tuple (req_r ))
125124 req = tsputreq_a , tsobj .table .name , [], req_rows
126- mc = MSG_CODE_TS_TTB
127- rc = MSG_CODE_TS_TTB
125+ mc = MSG_CODE_TS_TTB_MSG
126+ rc = MSG_CODE_TS_TTB_MSG
128127 return Msg (mc , encode (req ), rc )
129128 else :
130129 raise RiakError ("TsObject requires a list of rows" )
@@ -135,8 +134,8 @@ def encode_timeseries_query(self, table, query, interpolations=None):
135134 q = q .format (table = table .name )
136135 tsi = tsinterpolation_a , q , []
137136 req = tsqueryreq_a , tsi , False , []
138- mc = MSG_CODE_TS_TTB
139- rc = MSG_CODE_TS_TTB
137+ mc = MSG_CODE_TS_TTB_MSG
138+ rc = MSG_CODE_TS_TTB_MSG
140139 return Msg (mc , encode (req ), rc )
141140
142141 def decode_timeseries (self , resp_ttb , tsobj ):
0 commit comments