Skip to content

Commit 0d2855c

Browse files
committed
Merge pull request #448 from basho/features/lrb/rts-842-perf-test-t2b-encoding
READY: Add support for Erlang term-to-binary encoding
2 parents 50b9eb7 + c6ecbfb commit 0d2855c

55 files changed

Lines changed: 3049 additions & 2278 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,17 @@
1+
README.rst
12
*.pyc
23
.python-version
34
__pycache__/
4-
55
.tox/
6-
7-
.tox/
8-
96
docs/_build
10-
117
.*.swp
128
.coverage
13-
9+
riak-*/
1410
py-build/
1511
dist/
16-
1712
riak.egg-info/
1813
*.egg
1914
.eggs/
20-
2115
#*#
2216
*~
17+
*.ps1

MANIFEST.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
include docs/*
22
include riak/erl_src/*
33
include README.md
4+
include README.rst
45
include LICENSE
56
include RELNOTES.md
67
include version.py

buildbot/Makefile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ compile:
2626

2727
lint:
2828
@pip install --upgrade pep8 flake8
29-
@cd ..; pep8 --exclude=riak/pb riak *.py
3029
@cd ..; flake8 --exclude=riak/pb riak *.py
3130

3231
test: setup test_normal test_security
@@ -46,7 +45,7 @@ test_security:
4645
test_timeseries:
4746
@echo "Testing Riak Python Client (timeseries)"
4847
@$(RIAK_ADMIN) security disable
49-
@RIAK_TEST_PROTOCOL='pbc' RUN_YZ=0 RUN_DATATYPES=0 RUN_INDEXES=1 RUN_TIMESERIES=1 ./tox_runner.sh ..
48+
@RIAK_TEST_PROTOCOL='pbc' RIAK_TEST_PB_PORT=8087 RUN_YZ=0 RUN_DATATYPES=0 RUN_INDEXES=1 RUN_TIMESERIES=1 ./tox_runner.sh ..
5049

5150
setup:
5251
./tox_setup.sh

docs/advanced.rst

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ Transports
9393

9494
.. currentmodule:: riak.transports.transport
9595

96-
.. autoclass:: RiakTransport
96+
.. autoclass:: Transport
9797
:members:
9898
:private-members:
9999

@@ -124,20 +124,24 @@ HTTP Transport
124124

125125
.. currentmodule:: riak.transports.http
126126

127-
.. autoclass:: RiakHttpPool
127+
.. autoclass:: HttpPool
128128

129129
.. autofunction:: is_retryable
130130

131-
.. autoclass:: RiakHttpTransport
131+
.. autoclass:: HttpTransport
132132
:members:
133133

134-
^^^^^^^^^^^^^^^^^^^^^^^^^^
135-
Protocol Buffers Transport
136-
^^^^^^^^^^^^^^^^^^^^^^^^^^
134+
^^^^^^^^^^^^^
135+
TCP Transport
136+
^^^^^^^^^^^^^
137+
138+
.. currentmodule:: riak.transports.tcp
137139

138-
.. currentmodule:: riak.transports.pbc
140+
.. autoclass:: TcpPool
141+
142+
.. autofunction:: is_retryable
139143

140-
.. autoclass:: RiakPbcTransport
144+
.. autoclass:: TcpTransport
141145
:members:
142146

143147
---------

riak/benchmark.py

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,9 @@
1-
"""
2-
Copyright 2013 Basho Technologies, Inc.
3-
4-
This file is provided to you under the Apache License,
5-
Version 2.0 (the "License"); you may not use this file
6-
except in compliance with the License. You may obtain
7-
a copy of the License at
8-
9-
http://www.apache.org/licenses/LICENSE-2.0
10-
11-
Unless required by applicable law or agreed to in writing,
12-
software distributed under the License is distributed on an
13-
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14-
KIND, either express or implied. See the License for the
15-
specific language governing permissions and limitations
16-
under the License.
17-
"""
18-
191
from __future__ import print_function
2+
203
import os
214
import gc
5+
import sys
6+
import traceback
227

238
__all__ = ['measure', 'measure_with_rehearsal']
249

@@ -172,5 +157,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
172157
elif exc_type is KeyboardInterrupt:
173158
return False
174159
else:
175-
print("EXCEPTION! %r" % ((exc_type, exc_val, exc_tb),))
176-
return True
160+
msg = "EXCEPTION! type: %r val: %r" % (exc_type, exc_val)
161+
print(msg, file=sys.stderr)
162+
traceback.print_tb(exc_tb)
163+
return True if exc_type is None else False

riak/benchmarks/timeseries.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import datetime
2+
import random
3+
import sys
4+
5+
import riak.benchmark as benchmark
6+
7+
from multiprocessing import cpu_count
8+
from riak import RiakClient
9+
10+
# logger = logging.getLogger()
11+
# logger.level = logging.DEBUG
12+
# logger.addHandler(logging.StreamHandler(sys.stdout))
13+
14+
# batch sizes 8, 16, 32, 64, 128, 256
15+
if len(sys.argv) != 3:
16+
raise AssertionError(
17+
'first arg is batch size, second arg is true / false'
18+
'for use_ttb')
19+
20+
rowcount = 32768
21+
batchsz = int(sys.argv[1])
22+
if rowcount % batchsz != 0:
23+
raise AssertionError('rowcount must be divisible by batchsz')
24+
use_ttb = sys.argv[2].lower() == 'true'
25+
26+
epoch = datetime.datetime.utcfromtimestamp(0)
27+
onesec = datetime.timedelta(0, 1)
28+
29+
weather = ['typhoon', 'hurricane', 'rain', 'wind', 'snow']
30+
rows = []
31+
for i in range(rowcount):
32+
ts = datetime.datetime(2016, 1, 1, 12, 0, 0) + \
33+
datetime.timedelta(seconds=i)
34+
family_idx = i % batchsz
35+
series_idx = i % batchsz
36+
family = 'hash{:d}'.format(family_idx)
37+
series = 'user{:d}'.format(series_idx)
38+
w = weather[i % len(weather)]
39+
temp = (i % 100) + random.random()
40+
row = [family, series, ts, w, temp]
41+
key = [family, series, ts]
42+
rows.append(row)
43+
44+
print("Benchmarking timeseries:")
45+
print(" Use TTB: {}".format(use_ttb))
46+
print("Batch Size: {}".format(batchsz))
47+
print(" CPUs: {}".format(cpu_count()))
48+
print(" Rows: {}".format(len(rows)))
49+
print()
50+
51+
tbl = 'GeoCheckin'
52+
h = 'riak-test'
53+
n = [
54+
{'host': h, 'pb_port': 10017},
55+
{'host': h, 'pb_port': 10027},
56+
{'host': h, 'pb_port': 10037},
57+
{'host': h, 'pb_port': 10047},
58+
{'host': h, 'pb_port': 10057}
59+
]
60+
client = RiakClient(nodes=n, protocol='pbc',
61+
transport_options={'use_ttb': use_ttb})
62+
table = client.table(tbl)
63+
64+
with benchmark.measure() as b:
65+
for i in (1, 2, 3):
66+
with b.report('populate-%d' % i):
67+
for i in range(0, rowcount, batchsz):
68+
x = i
69+
y = i + batchsz
70+
r = rows[x:y]
71+
ts_obj = table.new(r)
72+
result = ts_obj.store()
73+
if result is not True:
74+
raise AssertionError("expected success")

riak/client/__init__.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
from riak.mapreduce import RiakMapReduceChain
1212
from riak.resolver import default_resolver
1313
from riak.table import Table
14-
from riak.transports.http import RiakHttpPool
15-
from riak.transports.pbc import RiakPbcPool
14+
from riak.transports.http import HttpPool
15+
from riak.transports.tcp import TcpPool
1616
from riak.security import SecurityCreds
1717
from riak.util import lazy_property, bytes_to_str, str_to_bytes
1818
from six import string_types, PY2
@@ -68,7 +68,7 @@ class RiakClient(RiakMapReduceChain, RiakClientOperations):
6868
PROTOCOLS = ['http', 'pbc']
6969

7070
def __init__(self, protocol='pbc', transport_options={}, nodes=None,
71-
credentials=None, multiget_pool_size=None, **unused_args):
71+
credentials=None, multiget_pool_size=None, **kwargs):
7272
"""
7373
Construct a new ``RiakClient`` object.
7474
@@ -88,19 +88,19 @@ def __init__(self, protocol='pbc', transport_options={}, nodes=None,
8888
CPUs in the system
8989
:type multiget_pool_size: int
9090
"""
91-
unused_args = unused_args.copy()
91+
kwargs = kwargs.copy()
9292

9393
if nodes is None:
94-
self.nodes = [self._create_node(unused_args), ]
94+
self.nodes = [self._create_node(kwargs), ]
9595
else:
9696
self.nodes = [self._create_node(n) for n in nodes]
9797

9898
self._multiget_pool_size = multiget_pool_size
9999
self.protocol = protocol or 'pbc'
100100
self._resolver = None
101101
self._credentials = self._create_credentials(credentials)
102-
self._http_pool = RiakHttpPool(self, **transport_options)
103-
self._pb_pool = RiakPbcPool(self, **transport_options)
102+
self._http_pool = HttpPool(self, **transport_options)
103+
self._tcp_pool = TcpPool(self, **transport_options)
104104

105105
if PY2:
106106
self._encoders = {'application/json': default_encoder,
@@ -167,7 +167,7 @@ def _get_client_id(self):
167167
def _set_client_id(self, client_id):
168168
for http in self._http_pool:
169169
http.client_id = client_id
170-
for pb in self._pb_pool:
170+
for pb in self._tcp_pool:
171171
pb.client_id = client_id
172172

173173
client_id = property(_get_client_id, _set_client_id,
@@ -298,8 +298,8 @@ def close(self):
298298
"""
299299
if self._http_pool is not None:
300300
self._http_pool.clear()
301-
if self._pb_pool is not None:
302-
self._pb_pool.clear()
301+
if self._tcp_pool is not None:
302+
self._tcp_pool.clear()
303303

304304
def _create_node(self, n):
305305
if isinstance(n, RiakNode):

riak/client/multiget.py

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,9 @@
1-
"""
2-
Copyright 2013 Basho Technologies, Inc.
3-
4-
This file is provided to you under the Apache License,
5-
Version 2.0 (the "License"); you may not use this file
6-
except in compliance with the License. You may obtain
7-
a copy of the License at
8-
9-
http://www.apache.org/licenses/LICENSE-2.0
10-
11-
Unless required by applicable law or agreed to in writing,
12-
software distributed under the License is distributed on an
13-
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14-
KIND, either express or implied. See the License for the
15-
specific language governing permissions and limitations
16-
under the License.
17-
"""
18-
191
from __future__ import print_function
202
from collections import namedtuple
213
from threading import Thread, Lock, Event
224
from multiprocessing import cpu_count
235
from six import PY2
6+
247
if PY2:
258
from Queue import Queue
269
else:
@@ -177,8 +160,8 @@ def multiget(client, keys, **options):
177160
:meth:`RiakBucket.get <riak.bucket.RiakBucket.get>`
178161
:type options: dict
179162
:rtype: list
180-
181163
"""
164+
182165
outq = Queue()
183166

184167
if 'pool' in options:

riak/client/transport.py

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,6 @@
1-
"""
2-
Copyright 2012 Basho Technologies, Inc.
3-
4-
This file is provided to you under the Apache License,
5-
Version 2.0 (the "License"); you may not use this file
6-
except in compliance with the License. You may obtain
7-
a copy of the License at
8-
9-
http://www.apache.org/licenses/LICENSE-2.0
10-
11-
Unless required by applicable law or agreed to in writing,
12-
software distributed under the License is distributed on an
13-
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14-
KIND, either express or implied. See the License for the
15-
specific language governing permissions and limitations
16-
under the License.
17-
"""
181
from contextlib import contextmanager
192
from riak.transports.pool import BadResource
20-
from riak.transports.pbc import is_retryable as is_pbc_retryable
3+
from riak.transports.tcp import is_retryable as is_pbc_retryable
214
from riak.transports.http import is_retryable as is_http_retryable
225
import threading
236
from six import PY2
@@ -49,7 +32,7 @@ class RiakClientTransport(object):
4932
# These will be set or redefined by the RiakClient initializer
5033
protocol = 'pbc'
5134
_http_pool = None
52-
_pb_pool = None
35+
_tcp_pool = None
5336
_locals = _client_locals()
5437

5538
def _get_retry_count(self):
@@ -163,8 +146,8 @@ def _choose_pool(self, protocol=None):
163146
protocol = self.protocol
164147
if protocol == 'http':
165148
pool = self._http_pool
166-
elif protocol == 'pbc':
167-
pool = self._pb_pool
149+
elif protocol == 'tcp' or protocol == 'pbc':
150+
pool = self._tcp_pool
168151
else:
169152
raise ValueError("invalid protocol %s" % protocol)
170153
return pool

riak/codecs/__init__.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import collections
2+
3+
from riak import RiakError
4+
5+
Msg = collections.namedtuple('Msg',
6+
['msg_code', 'data', 'resp_code'],
7+
verbose=False)
8+
9+
10+
class Codec(object):
11+
def parse_msg(self):
12+
raise NotImplementedError('parse_msg not implemented')
13+
14+
def maybe_incorrect_code(self, resp_code, expect=None):
15+
if expect and resp_code != expect:
16+
raise RiakError("unexpected message code: %d, expected %d"
17+
% (resp_code, expect))
18+
19+
def maybe_riak_error(self, err_code, msg_code, data=None):
20+
if msg_code == err_code:
21+
if data is None:
22+
raise RiakError('no error provided!')
23+
return data
24+
else:
25+
return None

0 commit comments

Comments
 (0)