Skip to content

Commit 93253ef

Browse files
authored
Merge pull request #504 from basho/features/lrb/retry-on-closed-gh-503
Smart re-try for operations
2 parents 7885fb9 + 6cdafc6 commit 93253ef

9 files changed

Lines changed: 166 additions & 70 deletions

File tree

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,19 +88,19 @@ endif
8888

8989
.PHONY: unit-test
9090
unit-test:
91-
@$(PROJDIR)/runner unit-test
91+
@$(PROJDIR)/.runner unit-test
9292

9393
.PHONY: integration-test
9494
integration-test:
95-
@$(PROJDIR)/runner integration-test
95+
@$(PROJDIR)/.runner integration-test
9696

9797
.PHONY: security-test
9898
security-test:
99-
@$(PROJDIR)/runner security-test
99+
@$(PROJDIR)/.runner security-test
100100

101101
.PHONY: timeseries-test
102102
timeseries-test:
103-
@$(PROJDIR)/runner timeseries-test
103+
@$(PROJDIR)/.runner timeseries-test
104104

105105
.PHONY: test
106106
test: integration-test

docs/advanced.rst

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ Connection pool
1212

1313
.. currentmodule:: riak.transports.pool
1414

15-
.. autoexception:: BadResource
1615
.. autoclass:: Resource
1716
:members:
17+
1818
.. autoclass:: Pool
1919
:members:
2020

2121
.. autoclass:: PoolIterator
2222

23+
.. autoexception:: BadResource
24+
.. autoexception:: ConnectionClosed
25+
2326
-----------
2427
Retry logic
2528
-----------

riak/client/operations.py

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -87,24 +87,21 @@ def stream_buckets(self, bucket_type=None, timeout=None):
8787
8888
"""
8989
_validate_timeout(timeout)
90+
9091
if bucket_type:
9192
bucketfn = self._bucket_type_bucket_builder
9293
else:
9394
bucketfn = self._default_type_bucket_builder
9495

95-
resource = self._acquire()
96-
transport = resource.object
97-
stream = transport.stream_buckets(bucket_type=bucket_type,
98-
timeout=timeout)
99-
stream.attach(resource)
100-
try:
101-
for bucket_list in stream:
102-
bucket_list = [bucketfn(bytes_to_str(name), bucket_type)
103-
for name in bucket_list]
104-
if len(bucket_list) > 0:
105-
yield bucket_list
106-
finally:
107-
stream.close()
96+
def make_op(transport):
97+
return transport.stream_buckets(
98+
bucket_type=bucket_type, timeout=timeout)
99+
100+
for bucket_list in self._stream_with_retry(make_op):
101+
bucket_list = [bucketfn(bytes_to_str(name), bucket_type)
102+
for name in bucket_list]
103+
if len(bucket_list) > 0:
104+
yield bucket_list
108105

109106
@retryable
110107
def ping(self, transport):
@@ -266,6 +263,8 @@ def stream_index(self, bucket, index, startkey, endkey=None,
266263
:rtype: :class:`~riak.client.index_page.IndexPage`
267264
268265
"""
266+
# TODO FUTURE: implement "retry on connection closed"
267+
# as in stream_mapred
269268
_validate_timeout(timeout, infinity_ok=True)
270269

271270
page = IndexPage(self, bucket, index, startkey, endkey,
@@ -338,6 +337,8 @@ def paginate_stream_index(self, bucket, index, startkey, endkey=None,
338337
:class:`~riak.client.index_page.IndexPage`
339338
340339
"""
340+
# TODO FUTURE: implement "retry on connection closed"
341+
# as in stream_mapred
341342
page = self.stream_index(bucket, index, startkey,
342343
endkey=endkey,
343344
max_results=max_results,
@@ -489,19 +490,16 @@ def stream_keys(self, bucket, timeout=None):
489490
:rtype: iterator
490491
"""
491492
_validate_timeout(timeout)
492-
resource = self._acquire()
493-
transport = resource.object
494-
stream = transport.stream_keys(bucket, timeout=timeout)
495-
stream.attach(resource)
496-
try:
497-
for keylist in stream:
498-
if len(keylist) > 0:
499-
if six.PY2:
500-
yield keylist
501-
else:
502-
yield [bytes_to_str(item) for item in keylist]
503-
finally:
504-
stream.close()
493+
494+
def make_op(transport):
495+
return transport.stream_keys(bucket, timeout=timeout)
496+
497+
for keylist in self._stream_with_retry(make_op):
498+
if len(keylist) > 0:
499+
if six.PY2:
500+
yield keylist
501+
else:
502+
yield [bytes_to_str(item) for item in keylist]
505503

506504
@retryable
507505
def put(self, transport, robj, w=None, dw=None, pw=None, return_body=None,
@@ -799,15 +797,12 @@ def stream_mapred(self, inputs, query, timeout):
799797
:rtype: iterator
800798
"""
801799
_validate_timeout(timeout)
802-
resource = self._acquire()
803-
transport = resource.object
804-
stream = transport.stream_mapred(inputs, query, timeout)
805-
stream.attach(resource)
806-
try:
807-
for phase, data in stream:
808-
yield phase, data
809-
finally:
810-
stream.close()
800+
801+
def make_op(transport):
802+
return transport.stream_mapred(inputs, query, timeout)
803+
804+
for phase, data in self._stream_with_retry(make_op):
805+
yield phase, data
811806

812807
@retryable
813808
def create_search_index(self, transport, index, schema=None, n_val=None,

riak/client/transport.py

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from contextlib import contextmanager
2-
from riak.transports.pool import BadResource
2+
from riak.transports.pool import BadResource, ConnectionClosed
33
from riak.transports.tcp import is_retryable as is_tcp_retryable
44
from riak.transports.http import is_retryable as is_http_retryable
5-
import threading
65
from six import PY2
6+
7+
import threading
8+
79
if PY2:
810
from httplib import HTTPException
911
else:
@@ -84,7 +86,8 @@ def _transport(self):
8486
_transport()
8587
8688
Yields a single transport to the caller from the default pool,
87-
without retries.
89+
without retries. NB: no need to re-try as this method is only
90+
used by CRDT operations that should never be re-tried.
8891
"""
8992
pool = self._choose_pool()
9093
with pool.transaction() as transport:
@@ -98,6 +101,29 @@ def _acquire(self):
98101
"""
99102
return self._choose_pool().acquire()
100103

104+
def _stream_with_retry(self, make_op):
105+
first_try = True
106+
while True:
107+
resource = self._acquire()
108+
transport = resource.object
109+
streaming_op = make_op(transport)
110+
streaming_op.attach(resource)
111+
try:
112+
for item in streaming_op:
113+
yield item
114+
break
115+
except BadResource as e:
116+
resource.errored = True
117+
# NB: *only* re-try if connection closed happened
118+
# at the start of the streaming op
119+
if first_try and not e.mid_stream:
120+
continue
121+
else:
122+
raise
123+
finally:
124+
first_try = False
125+
streaming_op.close()
126+
101127
def _with_retries(self, pool, fn):
102128
"""
103129
Performs the passed function with retries against the given pool.
@@ -112,26 +138,38 @@ def _with_retries(self, pool, fn):
112138
def _skip_bad_nodes(transport):
113139
return transport._node not in skip_nodes
114140

115-
retry_count = self.retries
116-
117-
for retry in range(retry_count):
141+
retry_count = self.retries - 1
142+
first_try = True
143+
current_try = 0
144+
while True:
118145
try:
119-
with pool.transaction(_filter=_skip_bad_nodes) as transport:
146+
with pool.transaction(
147+
_filter=_skip_bad_nodes,
148+
yield_resource=True) as resource:
149+
transport = resource.object
120150
try:
121151
return fn(transport)
122-
except (IOError, HTTPException) as e:
152+
except (IOError, HTTPException, ConnectionClosed) as e:
153+
resource.errored = True
123154
if _is_retryable(e):
124155
transport._node.error_rate.incr(1)
125156
skip_nodes.append(transport._node)
126-
raise BadResource(e)
157+
if first_try:
158+
continue
159+
else:
160+
raise BadResource(e)
127161
else:
128162
raise
129163
except BadResource as e:
130-
if retry < (retry_count - 1):
164+
if current_try < retry_count:
165+
resource.errored = True
166+
current_try += 1
131167
continue
132168
else:
133169
# Re-raise the inner exception
134170
raise e.args[0]
171+
finally:
172+
first_try = False
135173

136174
def _choose_pool(self, protocol=None):
137175
"""
@@ -168,6 +206,7 @@ def _is_retryable(error):
168206
return is_tcp_retryable(error) or is_http_retryable(error)
169207

170208

209+
# http://thecodeship.com/patterns/guide-to-python-function-decorators/
171210
def retryable(fn, protocol=None):
172211
"""
173212
Wraps a client operation that can be retried according to the set

riak/transports/pool.py

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,31 @@
55
from contextlib import contextmanager
66

77

8-
# This file is a rough port of the Innertube Ruby library
98
class BadResource(Exception):
109
"""
1110
Users of a :class:`Pool` should raise this error when the pool
1211
resource currently in-use is bad and should be removed from the
1312
pool.
13+
14+
:param mid_stream: did this exception happen mid-streaming op?
15+
:type mid_stream: boolean
16+
"""
17+
def __init__(self, ex, mid_stream=False):
18+
super(BadResource, self).__init__(ex)
19+
self.mid_stream = mid_stream
20+
21+
22+
class ConnectionClosed(BadResource):
1423
"""
15-
pass
24+
Users of a :class:`Pool` should raise this error when the pool
25+
resource currently in-use has been closed and should be removed
26+
from the pool.
27+
28+
:param mid_stream: did this exception happen mid-streaming op?
29+
:type mid_stream: boolean
30+
"""
31+
def __init__(self, ex, mid_stream=False):
32+
super(ConnectionClosed, self).__init__(ex, mid_stream)
1633

1734

1835
class Resource(object):
@@ -30,20 +47,26 @@ def __init__(self, obj, pool):
3047
:type obj: object
3148
"""
3249

33-
self.object = obj
3450
"""The wrapped pool resource."""
51+
self.object = obj
3552

36-
self.claimed = False
3753
"""Whether the resource is currently in use."""
54+
self.claimed = False
3855

39-
self.pool = pool
4056
"""The pool that this resource belongs to."""
57+
self.pool = pool
58+
59+
"""True if this Resource errored."""
60+
self.errored = False
4161

4262
def release(self):
4363
"""
4464
Releases this resource back to the pool it came from.
4565
"""
46-
self.pool.release(self)
66+
if self.errored:
67+
self.pool.delete_resource(self)
68+
else:
69+
self.pool.release(self)
4770

4871

4972
class Pool(object):
@@ -60,7 +83,7 @@ class Pool(object):
6083
6184
Example::
6285
63-
from riak.Pool import Pool, BadResource
86+
from riak.transports.pool import Pool
6487
class ListPool(Pool):
6588
def create_resource(self):
6689
return []
@@ -74,7 +97,6 @@ def destroy_resource(self):
7497
resource.append(1)
7598
with pool.transaction() as resource2:
7699
print(repr(resource2)) # should be [1]
77-
78100
"""
79101

80102
def __init__(self):
@@ -138,7 +160,7 @@ def release(self, resource):
138160
self.releaser.notify_all()
139161

140162
@contextmanager
141-
def transaction(self, _filter=None, default=None):
163+
def transaction(self, _filter=None, default=None, yield_resource=False):
142164
"""
143165
transaction(_filter=None, default=None)
144166
@@ -152,10 +174,18 @@ def transaction(self, _filter=None, default=None):
152174
:type _filter: callable
153175
:param default: a value that will be used instead of calling
154176
:meth:`create_resource` if a new resource needs to be created
177+
:param yield_resource: set to True to yield the Resource object
178+
itself
179+
:type yield_resource: boolean
155180
"""
156181
resource = self.acquire(_filter=_filter, default=default)
157182
try:
158-
yield resource.object
183+
if yield_resource:
184+
yield resource
185+
else:
186+
yield resource.object
187+
if resource.errored:
188+
self.delete_resource(resource)
159189
except BadResource:
160190
self.delete_resource(resource)
161191
raise

riak/transports/tcp/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import errno
22
import socket
33

4-
from riak.transports.pool import Pool
4+
from riak.transports.pool import Pool, ConnectionClosed
55
from riak.transports.tcp.transport import TcpTransport
66

77

@@ -48,7 +48,13 @@ def is_retryable(err):
4848
4949
:rtype: boolean
5050
"""
51-
if isinstance(err, socket.error):
51+
if isinstance(err, ConnectionClosed):
52+
# NB: only retryable if we're not mid-streaming
53+
if err.mid_stream:
54+
return False
55+
else:
56+
return True
57+
elif isinstance(err, socket.error):
5258
code = err.args[0]
5359
return code in CONN_CLOSED_ERRORS
5460
else:

0 commit comments

Comments
 (0)