Skip to content

Commit 945690f

Browse files
Reimplement retry mechanism on 429 at the http request level and for all databricks-cli operations (#343)
Revert retry logic on 429 responses for DBFS API requests (#319, #326, #327) and reimplement it using [urllib3.util.Retry](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html), for all the requests made by `databricks-cli`. * We perform a maximum number of 6 retries with exponential backoff, resulting in the following delays between them: 0.5, 1, 2, 4, 8, 16 (seconds). The `Retry-After` header is respected if present. * There is no message logged for each retry (I could not find a way to do it using `urllib3.util.Retry`). * If all retry attempts fail, the original 429 http response is forwarded by the retry utility.
1 parent 6677bb0 commit 945690f

5 files changed

Lines changed: 20 additions & 231 deletions

File tree

databricks_cli/dbfs/api.py

Lines changed: 7 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,14 @@
2828
import tempfile
2929

3030
import re
31-
import functools
3231
import click
3332

34-
from tenacity import retry, wait_random_exponential, retry_if_exception_type, stop_after_attempt
3533
from requests.exceptions import HTTPError
3634

3735
from databricks_cli.sdk import DbfsService
3836
from databricks_cli.utils import error_and_quit
3937
from databricks_cli.dbfs.dbfs_path import DbfsPath
40-
from databricks_cli.dbfs.exceptions import LocalFileExistsException, RateLimitException
38+
from databricks_cli.dbfs.exceptions import LocalFileExistsException
4139

4240
BUFFER_SIZE_BYTES = 2**20
4341

@@ -77,78 +75,12 @@ class DbfsErrorCodes(object):
7775
RESOURCE_DOES_NOT_EXIST = 'RESOURCE_DOES_NOT_EXIST'
7876
RESOURCE_ALREADY_EXISTS = 'RESOURCE_ALREADY_EXISTS'
7977
PARTIAL_DELETE = 'PARTIAL_DELETE'
80-
TOO_MANY_REQUESTS = 'TOO_MANY_REQUESTS'
81-
82-
83-
class CustomRetryState(object):
84-
def __init__(self):
85-
self.time_for_last_retry = 0
86-
87-
def reset(self):
88-
self.time_for_last_retry = 0
89-
90-
91-
class Retry429(object):
92-
EXPONENTIAL_BACKOFF_MULTIPLIER = 1
93-
MAX_SECONDS_WAIT = 60
94-
MAX_RETRY_ATTEMPTS = 8
95-
96-
def __init__(self, func):
97-
"""
98-
If there are no decorator arguments, the function to be decorated is passed to
99-
the constructor. It is called only once for each function decorated with it.
100-
"""
101-
self.retry_state_429 = CustomRetryState()
102-
103-
@retry(wait=wait_random_exponential(multiplier=self.EXPONENTIAL_BACKOFF_MULTIPLIER,
104-
max=self.MAX_SECONDS_WAIT), retry=retry_if_exception_type(RateLimitException),
105-
stop=stop_after_attempt(self.MAX_RETRY_ATTEMPTS), reraise=True,
106-
before_sleep=lambda retry_state: self.before_sleep_on_429(retry_state,
107-
self.retry_state_429))
108-
def wrapped_function(*args, **kwargs):
109-
try:
110-
return func(*args, **kwargs)
111-
except HTTPError as e:
112-
if e.response.status_code == 429:
113-
raise RateLimitException("429 Too Many Requests")
114-
raise e
115-
116-
self.func = wrapped_function
117-
118-
def __call__(self, *args, **kwargs):
119-
"""
120-
The __call__ method is called every time a decorated function is called.
121-
"""
122-
self.retry_state_429.reset()
123-
124-
return self.func(*args, **kwargs)
125-
126-
def __get__(self, obj, objtype):
127-
"""
128-
Making this decorator a descriptor such that we can use it on class methods.
129-
See https://stackoverflow.com/a/3296318/12359607
130-
"""
131-
return functools.partial(self.__call__, obj)
132-
133-
@staticmethod
134-
def before_sleep_on_429(retry_state, retry_state_429):
135-
"""
136-
Note: Here idle_for represents the total time spent sleeping in all retries so far +
137-
the time that we will sleep until the next retry. We determined this empirically,
138-
as it is not clearly stated in the Tenacity docs.
139-
"""
140-
time_until_next_retry = retry_state.idle_for - retry_state_429.time_for_last_retry
141-
click.echo(("Received 429 REQUEST_LIMIT_EXCEEDED for attempt {}. "
142-
"Retrying in {:.2f} seconds.").format(retry_state.attempt_number,
143-
time_until_next_retry))
144-
retry_state_429.time_for_last_retry = retry_state.idle_for
14578

14679

14780
class DbfsApi(object):
14881
def __init__(self, api_client):
14982
self.client = DbfsService(api_client)
15083

151-
@Retry429
15284
def list_files(self, dbfs_path, headers=None):
15385
list_response = self.client.list(dbfs_path.absolute_path, headers=headers)
15486
if 'files' in list_response:
@@ -169,37 +101,20 @@ def file_exists(self, dbfs_path, headers=None):
169101
raise e
170102
return True
171103

172-
@Retry429
173104
def get_status(self, dbfs_path, headers=None):
174105
json = self.client.get_status(dbfs_path.absolute_path, headers=headers)
175106
return FileInfo.from_json(json)
176107

177-
@Retry429
178-
def create(self, dbfs_path, overwrite, headers):
179-
return self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)
180-
181-
@Retry429
182-
def add_block(self, handle, contents, headers):
183-
self.client.add_block(handle, contents, headers=headers)
184-
185-
@Retry429
186-
def close(self, handle, headers):
187-
self.client.close(handle, headers=headers)
188-
189108
def put_file(self, src_path, dbfs_path, overwrite, headers=None):
190-
handle = self.create(dbfs_path, overwrite, headers=headers)['handle']
109+
handle = self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)['handle']
191110
with open(src_path, 'rb') as local_file:
192111
while True:
193112
contents = local_file.read(BUFFER_SIZE_BYTES)
194113
if len(contents) == 0:
195114
break
196115
# add_block should not take a bytes object.
197-
self.add_block(handle, b64encode(contents).decode(), headers=headers)
198-
self.close(handle, headers=headers)
199-
200-
@Retry429
201-
def read(self, dbfs_path, offset, headers):
202-
return self.client.read(dbfs_path.absolute_path, offset, BUFFER_SIZE_BYTES, headers=headers)
116+
self.client.add_block(handle, b64encode(contents).decode(), headers=headers)
117+
self.client.close(handle, headers=headers)
203118

204119
def get_file(self, dbfs_path, dst_path, overwrite, headers=None):
205120
if os.path.exists(dst_path) and not overwrite:
@@ -211,7 +126,8 @@ def get_file(self, dbfs_path, dst_path, overwrite, headers=None):
211126
offset = 0
212127
with open(dst_path, 'wb') as local_file:
213128
while offset < length:
214-
response = self.read(dbfs_path, offset, headers=headers)
129+
response = self.client.read(dbfs_path.absolute_path, offset, BUFFER_SIZE_BYTES,
130+
headers=headers)
215131
bytes_read = response['bytes_read']
216132
data = response['data']
217133
offset += bytes_read
@@ -230,13 +146,11 @@ def get_num_files_deleted(partial_delete_error):
230146
message))
231147
return int(m.group(1))
232148

233-
@Retry429
234149
def delete(self, dbfs_path, recursive, headers=None):
235150
num_files_deleted = 0
236151
while True:
237152
try:
238-
self.client.delete(dbfs_path.absolute_path,
239-
recursive=recursive, headers=headers)
153+
self.client.delete(dbfs_path.absolute_path, recursive=recursive, headers=headers)
240154
except HTTPError as e:
241155
if e.response.status_code == 503:
242156
try:
@@ -258,11 +172,9 @@ def delete(self, dbfs_path, recursive, headers=None):
258172
break
259173
click.echo("\rDelete finished successfully.\033[K")
260174

261-
@Retry429
262175
def mkdirs(self, dbfs_path, headers=None):
263176
self.client.mkdirs(dbfs_path.absolute_path, headers=headers)
264177

265-
@Retry429
266178
def move(self, dbfs_src, dbfs_dst, headers=None):
267179
self.client.move(dbfs_src.absolute_path, dbfs_dst.absolute_path, headers=headers)
268180

databricks_cli/dbfs/exceptions.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,3 @@
2424

2525
class LocalFileExistsException(Exception):
2626
pass
27-
28-
29-
class RateLimitException(Exception):
30-
pass

databricks_cli/sdk/api_client.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,11 @@
4444
try:
4545
from requests.packages.urllib3.poolmanager import PoolManager
4646
from requests.packages.urllib3 import exceptions
47+
from requests.packages.urllib3.util.retry import Retry
4748
except ImportError:
4849
from urllib3.poolmanager import PoolManager
4950
from urllib3 import exceptions
51+
from urllib3.util.retry import Retry
5052

5153
from databricks_cli.version import version as databricks_cli_version
5254

@@ -70,8 +72,16 @@ def __init__(self, user=None, password=None, host=None, token=None,
7072
if host[-1] == "/":
7173
host = host[:-1]
7274

75+
retries = Retry(
76+
total=6,
77+
backoff_factor=1,
78+
status_forcelist=[429],
79+
method_whitelist=set({'POST'}) | set(Retry.DEFAULT_METHOD_WHITELIST),
80+
respect_retry_after_header=True,
81+
raise_on_status=False # return original response when retries have been exhausted
82+
)
7383
self.session = requests.Session()
74-
self.session.mount('https://', TlsV1HttpAdapter())
84+
self.session.mount('https://', TlsV1HttpAdapter(max_retries=retries))
7585

7686
parsed_url = urlparse(host)
7787
scheme = parsed_url.scheme

setup.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
'tabulate>=0.7.7',
4040
'six>=1.10.0',
4141
'configparser>=0.3.5;python_version < "3.6"',
42-
'tenacity>=6.2.0'
4342
],
4443
entry_points='''
4544
[console_scripts]

0 commit comments

Comments
 (0)