Skip to content

Commit 532f84f

Browse files
committed
Merge pull request #453 from basho/features/lrb/socket-enhancements-gh-399
READY: Socket enhancements
2 parents 2de89c4 + 60496d7 commit 532f84f

8 files changed

Lines changed: 91 additions & 29 deletions

File tree

riak/client/transport.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from contextlib import contextmanager
22
from riak.transports.pool import BadResource
3-
from riak.transports.tcp import is_retryable as is_pbc_retryable
3+
from riak.transports.tcp import is_retryable as is_tcp_retryable
44
from riak.transports.http import is_retryable as is_http_retryable
55
import threading
66
from six import PY2
@@ -162,7 +162,7 @@ def _is_retryable(error):
162162
:type error: Exception
163163
:rtype: boolean
164164
"""
165-
return is_pbc_retryable(error) or is_http_retryable(error)
165+
return is_tcp_retryable(error) or is_http_retryable(error)
166166

167167

168168
def retryable(fn, protocol=None):

riak/riak_error.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,12 @@ class RiakError(Exception):
2121
"""
2222
Base class for exceptions generated in the Riak API.
2323
"""
24-
def __init__(self, value):
25-
self.value = value
24+
def __init__(self, *args, **kwargs):
25+
super(RiakError, self).__init__(*args, **kwargs)
26+
if len(args) > 0:
27+
self.value = args[0]
28+
else:
29+
self.value = 'unknown'
2630

2731
def __str__(self):
2832
return repr(self.value)
@@ -34,5 +38,5 @@ class ConflictError(RiakError):
3438
:class:`~riak.riak_object.RiakObject` that has more than one
3539
sibling.
3640
"""
37-
def __init__(self, message="Object in conflict"):
41+
def __init__(self, message='Object in conflict'):
3842
super(ConflictError, self).__init__(message)

riak/tests/test_btypes.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,18 @@ def test_multiget_bucket_types(self):
151151
self.assertEqual(btype, mobj.bucket.bucket_type)
152152

153153
def test_write_once_bucket_type(self):
154-
btype = self.client.bucket_type('write_once')
155-
bucket = btype.bucket(self.bucket_name)
156-
157-
for i in range(100):
158-
obj = bucket.new(self.key_name + str(i))
159-
obj.data = {'id': i}
160-
obj.store()
154+
bt = 'write_once'
155+
skey = 'write_once-init'
156+
btype = self.client.bucket_type(bt)
157+
bucket = btype.bucket(bt)
158+
sobj = bucket.get(skey)
159+
if not sobj.exists:
160+
for i in range(100):
161+
o = bucket.new(self.key_name + str(i))
162+
o.data = {'id': i}
163+
o.store()
164+
o = bucket.new(skey, data={'id': skey})
165+
o.store()
161166

162167
mget = bucket.multiget([self.key_name + str(i) for i in range(100)])
163168
for mobj in mget:

riak/tests/test_client.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from six import PY2
44
from threading import Thread
55
from riak.riak_object import RiakObject
6+
from riak.transports.tcp import TcpTransport
67
from riak.tests import DUMMY_HTTP_PORT, DUMMY_PB_PORT, RUN_POOL
78
from riak.tests.base import IntegrationTestBase
89

@@ -13,6 +14,17 @@
1314

1415

1516
class ClientTests(IntegrationTestBase, unittest.TestCase):
17+
def test_can_set_tcp_keepalive(self):
18+
if self.protocol == 'pbc':
19+
topts = {'socket_keepalive': True}
20+
c = self.create_client(transport_options=topts)
21+
for i, r in enumerate(c._tcp_pool.resources):
22+
self.assertIsInstance(r, TcpTransport)
23+
self.assertTrue(r._socket_keepalive)
24+
c.close()
25+
else:
26+
pass
27+
1628
def test_uses_client_id_if_given(self):
1729
if self.protocol == 'pbc':
1830
zero_client_id = "\0\0\0\0"

riak/tests/test_kv.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,17 +180,29 @@ def test_string_bucket_name(self):
180180
def test_generate_key(self):
181181
# Ensure that Riak generates a random key when
182182
# the key passed to bucket.new() is None.
183-
bucket = self.client.bucket('random_key_bucket')
184-
existing_keys = bucket.get_keys()
183+
bucket = self.client.bucket(self.bucket_name)
185184
o = bucket.new(None, data={})
186185
self.assertIsNone(o.key)
187186
o.store()
188187
self.assertIsNotNone(o.key)
189188
self.assertNotIn('/', o.key)
190-
self.assertNotIn(o.key, existing_keys)
191-
self.assertEqual(len(bucket.get_keys()), len(existing_keys) + 1)
189+
existing_keys = bucket.get_keys()
190+
self.assertEqual(len(existing_keys), 1)
191+
192+
def maybe_store_keys(self):
193+
skey = 'rkb-init'
194+
bucket = self.client.bucket('random_key_bucket')
195+
sobj = bucket.get(skey)
196+
if sobj.exists:
197+
return
198+
for key in range(1, 1000):
199+
o = bucket.new(None, data={})
200+
o.store()
201+
o = bucket.new(skey, data={})
202+
o.store()
192203

193204
def test_stream_keys(self):
205+
self.maybe_store_keys()
194206
bucket = self.client.bucket('random_key_bucket')
195207
regular_keys = bucket.get_keys()
196208
self.assertNotEqual(len(regular_keys), 0)
@@ -203,10 +215,8 @@ def test_stream_keys(self):
203215
self.assertEqual(sorted(regular_keys), sorted(streamed_keys))
204216

205217
def test_stream_keys_timeout(self):
218+
self.maybe_store_keys()
206219
bucket = self.client.bucket('random_key_bucket')
207-
for key in range(1, 1000):
208-
o = bucket.new(None, data={})
209-
o.store()
210220
streamed_keys = []
211221
with self.assertRaises(RiakError):
212222
for keylist in self.client.stream_keys(bucket, timeout=1):
@@ -216,6 +226,7 @@ def test_stream_keys_timeout(self):
216226
streamed_keys += keylist
217227

218228
def test_stream_keys_abort(self):
229+
self.maybe_store_keys()
219230
bucket = self.client.bucket('random_key_bucket')
220231
regular_keys = bucket.get_keys()
221232
self.assertNotEqual(len(regular_keys), 0)

riak/transports/tcp/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def destroy_resource(self, tcp):
4242
def is_retryable(err):
4343
"""
4444
Determines if the given exception is something that is
45-
network/socket-related and should thus cause the PBC connection to
45+
network/socket-related and should thus cause the TCP connection to
4646
close and the operation retried on another node.
4747
4848
:rtype: boolean

riak/transports/tcp/connection.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import errno
12
import socket
23
import struct
34

@@ -7,16 +8,23 @@
78
from riak import RiakError
89
from riak.codecs.pbuf import PbufCodec
910
from riak.security import SecurityError, USE_STDLIB_SSL
11+
from riak.transports.pool import BadResource
1012

11-
if not USE_STDLIB_SSL:
12-
from OpenSSL.SSL import Connection
13-
from riak.transports.security import configure_pyopenssl_context
14-
else:
13+
if USE_STDLIB_SSL:
1514
import ssl
1615
from riak.transports.security import configure_ssl_context
16+
else:
17+
from OpenSSL.SSL import Connection
18+
from riak.transports.security import configure_pyopenssl_context
1719

1820

1921
class TcpConnection(object):
22+
# These are set in the TcpTransport initializer
23+
_address = None
24+
_timeout = None
25+
_socket_keepalive = None
26+
_socket_tcp_options = None
27+
2028
"""
2129
Connection-related methods for TcpTransport.
2230
"""
@@ -174,6 +182,10 @@ def _recv(self, msglen):
174182
toread = msglen
175183
while toread:
176184
nbytes = self._socket.recv_into(view, toread)
185+
# https://docs.python.org/2/howto/sockets.html#using-a-socket
186+
# https://github.com/basho/riak-python-client/issues/399
187+
if nbytes == 0:
188+
raise BadResource('recv_into returned zero bytes unexpectedly')
177189
view = view[nbytes:] # slicing views is cheap
178190
toread -= nbytes
179191
nread += nbytes
@@ -189,6 +201,13 @@ def _connect(self):
189201
self._timeout)
190202
else:
191203
self._socket = socket.create_connection(self._address)
204+
if self._socket_tcp_options:
205+
ka_opts = self._socket_tcp_options
206+
for k, v in ka_opts.iteritems():
207+
self._socket.setsockopt(socket.SOL_TCP, k, v)
208+
if self._socket_keepalive:
209+
self._socket.setsockopt(
210+
socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
192211
if self._client._credentials:
193212
self._init_security()
194213

@@ -197,9 +216,15 @@ def close(self):
197216
Closes the underlying socket of the PB connection.
198217
"""
199218
if self._socket:
219+
if USE_STDLIB_SSL:
220+
# NB: Python 2.7.8 and earlier does not have a compatible
221+
# shutdown() method due to the SSL lib
222+
try:
223+
self._socket.shutdown(socket.SHUT_RDWR)
224+
except IOError as e:
225+
# NB: sometimes this is the exception if the initial
226+
# connection didn't succeed correctly
227+
if e.errno != errno.EBADF:
228+
raise
200229
self._socket.close()
201230
del self._socket
202-
203-
# These are set in the TcpTransport initializer
204-
_address = None
205-
_timeout = None

riak/transports/tcp/transport.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@ def __init__(self,
3535
self._socket = None
3636
self._pbuf_c = None
3737
self._ttb_c = None
38-
self._use_ttb = kwargs.get('use_ttb', True)
38+
self._socket_tcp_options = \
39+
kwargs.get('socket_tcp_options', {})
40+
self._socket_keepalive = \
41+
kwargs.get('socket_keepalive', False)
42+
self._use_ttb = \
43+
kwargs.get('use_ttb', True)
3944

4045
def _get_pbuf_codec(self):
4146
if not self._pbuf_c:

0 commit comments

Comments
 (0)