Skip to content

Commit 6e2da08

Browse files
DBFS API 429 retries: Wrap retry logic in a class to ensure isolation between decorated function calls. (#327)
Refactor the `retry_429` decorator, changing it from a function to a class, such that we can persist the `time_for_last_retry` separately for each decorated function, instead of using a global variable.
1 parent aadfdd4 commit 6e2da08

2 files changed

Lines changed: 70 additions & 45 deletions

File tree

databricks_cli/dbfs/api.py

Lines changed: 69 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import tempfile
2929

3030
import re
31+
import functools
3132
import click
3233

3334
from tenacity import retry, wait_random_exponential, retry_if_exception_type, stop_after_attempt
@@ -39,10 +40,6 @@
3940
from databricks_cli.dbfs.exceptions import LocalFileExistsException, RateLimitException
4041

4142
BUFFER_SIZE_BYTES = 2**20
42-
EXPONENTIAL_BACKOFF_MULTIPLIER = 1
43-
MAX_SECONDS_WAIT = 60
44-
MAX_RETRY_ATTEMPTS = 8
45-
time_for_last_retry = 0
4643

4744

4845
class ParseException(Exception):
@@ -83,46 +80,75 @@ class DbfsErrorCodes(object):
8380
TOO_MANY_REQUESTS = 'TOO_MANY_REQUESTS'
8481

8582

86-
def before_sleep_on_429(retry_state):
87-
global time_for_last_retry
88-
if retry_state.attempt_number < 1:
89-
click.echo("Warning: Unexpected retry_state.attempt_number={}.".format(
90-
retry_state.attempt_number))
91-
click.echo("Received 429 REQUEST_LIMIT_EXCEEDED. Retrying with exponential backoff.")
92-
else:
93-
# Initialize time_for_last_retry on the first attempt.
94-
if retry_state.attempt_number == 1:
95-
time_for_last_retry = 0
96-
# Note: Here idle_for represents the total time spent sleeping in all retries so far +
97-
# the time that we will sleep until the next retry. We determined this empirically,
98-
# as it is not clearly stated in the Tenacity docs.
99-
time_until_next_retry = retry_state.idle_for - time_for_last_retry
83+
class CustomRetryState(object):
84+
def __init__(self):
85+
self.time_for_last_retry = 0
86+
87+
def reset(self):
88+
self.time_for_last_retry = 0
89+
90+
91+
class Retry429(object):
92+
EXPONENTIAL_BACKOFF_MULTIPLIER = 1
93+
MAX_SECONDS_WAIT = 60
94+
MAX_RETRY_ATTEMPTS = 8
95+
96+
def __init__(self, func):
97+
"""
98+
If there are no decorator arguments, the function to be decorated is passed to
99+
the constructor. It is called only once for each function decorated with it.
100+
"""
101+
self.retry_state_429 = CustomRetryState()
102+
103+
@retry(wait=wait_random_exponential(multiplier=self.EXPONENTIAL_BACKOFF_MULTIPLIER,
104+
max=self.MAX_SECONDS_WAIT), retry=retry_if_exception_type(RateLimitException),
105+
stop=stop_after_attempt(self.MAX_RETRY_ATTEMPTS), reraise=True,
106+
before_sleep=lambda retry_state: self.before_sleep_on_429(retry_state,
107+
self.retry_state_429))
108+
def wrapped_function(*args, **kwargs):
109+
try:
110+
return func(*args, **kwargs)
111+
except HTTPError as e:
112+
if e.response.status_code == 429:
113+
raise RateLimitException("429 Too Many Requests")
114+
raise e
115+
116+
self.func = wrapped_function
117+
118+
def __call__(self, *args, **kwargs):
119+
"""
120+
The __call__ method is called every time a decorated function is called.
121+
"""
122+
self.retry_state_429.reset()
123+
124+
return self.func(*args, **kwargs)
125+
126+
def __get__(self, obj, objtype):
127+
"""
128+
Making this decorator a descriptor such that we can use it on class methods.
129+
See https://stackoverflow.com/a/3296318/12359607
130+
"""
131+
return functools.partial(self.__call__, obj)
132+
133+
@staticmethod
134+
def before_sleep_on_429(retry_state, retry_state_429):
135+
"""
136+
Note: Here idle_for represents the total time spent sleeping in all retries so far +
137+
the time that we will sleep until the next retry. We determined this empirically,
138+
as it is not clearly stated in the Tenacity docs.
139+
"""
140+
time_until_next_retry = retry_state.idle_for - retry_state_429.time_for_last_retry
100141
click.echo(("Received 429 REQUEST_LIMIT_EXCEEDED for attempt {}. "
101142
"Retrying in {:.2f} seconds.").format(retry_state.attempt_number,
102143
time_until_next_retry))
103-
time_for_last_retry = retry_state.idle_for
104-
105-
106-
def retry_429(func):
107-
@retry(wait=wait_random_exponential(multiplier=EXPONENTIAL_BACKOFF_MULTIPLIER,
108-
max=MAX_SECONDS_WAIT), retry=retry_if_exception_type(RateLimitException),
109-
stop=stop_after_attempt(MAX_RETRY_ATTEMPTS), reraise=True,
110-
before_sleep=before_sleep_on_429)
111-
def wrapped_function(*args, **kwargs):
112-
try:
113-
return func(*args, **kwargs)
114-
except HTTPError as e:
115-
if e.response.status_code == 429:
116-
raise RateLimitException("429 Too Many Requests")
117-
raise e
118-
return wrapped_function
144+
retry_state_429.time_for_last_retry = retry_state.idle_for
119145

120146

121147
class DbfsApi(object):
122148
def __init__(self, api_client):
123149
self.client = DbfsService(api_client)
124150

125-
@retry_429
151+
@Retry429
126152
def list_files(self, dbfs_path, headers=None):
127153
list_response = self.client.list(dbfs_path.absolute_path, headers=headers)
128154
if 'files' in list_response:
@@ -143,20 +169,20 @@ def file_exists(self, dbfs_path, headers=None):
143169
raise e
144170
return True
145171

146-
@retry_429
172+
@Retry429
147173
def get_status(self, dbfs_path, headers=None):
148174
json = self.client.get_status(dbfs_path.absolute_path, headers=headers)
149175
return FileInfo.from_json(json)
150176

151-
@retry_429
177+
@Retry429
152178
def create(self, dbfs_path, overwrite, headers):
153179
return self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)
154180

155-
@retry_429
181+
@Retry429
156182
def add_block(self, handle, contents, headers):
157183
self.client.add_block(handle, contents, headers=headers)
158184

159-
@retry_429
185+
@Retry429
160186
def close(self, handle, headers):
161187
self.client.close(handle, headers=headers)
162188

@@ -171,7 +197,7 @@ def put_file(self, src_path, dbfs_path, overwrite, headers=None):
171197
self.add_block(handle, b64encode(contents).decode(), headers=headers)
172198
self.close(handle, headers=headers)
173199

174-
@retry_429
200+
@Retry429
175201
def read(self, dbfs_path, offset, headers):
176202
return self.client.read(dbfs_path.absolute_path, offset, BUFFER_SIZE_BYTES, headers=headers)
177203

@@ -204,7 +230,7 @@ def get_num_files_deleted(partial_delete_error):
204230
message))
205231
return int(m.group(1))
206232

207-
@retry_429
233+
@Retry429
208234
def delete(self, dbfs_path, recursive, headers=None):
209235
num_files_deleted = 0
210236
while True:
@@ -232,11 +258,11 @@ def delete(self, dbfs_path, recursive, headers=None):
232258
break
233259
click.echo("\rDelete finished successfully.\033[K")
234260

235-
@retry_429
261+
@Retry429
236262
def mkdirs(self, dbfs_path, headers=None):
237263
self.client.mkdirs(dbfs_path.absolute_path, headers=headers)
238264

239-
@retry_429
265+
@Retry429
240266
def move(self, dbfs_src, dbfs_dst, headers=None):
241267
self.client.move(dbfs_src.absolute_path, dbfs_dst.absolute_path, headers=headers)
242268

tests/dbfs/test_api.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
'file_size': 1
4141
}
4242
TEST_FILE_INFO = api.FileInfo(TEST_DBFS_PATH, False, 1)
43-
MAX_RETRY_ATTEMPTS = 8
4443

4544

4645
def get_resource_does_not_exist_exception():
@@ -141,7 +140,7 @@ def test_mkdirs_stop_retrying(self, dbfs_api):
141140
dbfs_api.client.mkdirs = mock.Mock(side_effect=exception_sequence)
142141
with pytest.raises(RateLimitException):
143142
dbfs_api.mkdirs(DbfsPath('dbfs:/test/mkdir'))
144-
assert dbfs_api.client.mkdirs.call_count == MAX_RETRY_ATTEMPTS
143+
assert dbfs_api.client.mkdirs.call_count == api.Retry429.MAX_RETRY_ATTEMPTS
145144

146145
def test_file_exists_true(self, dbfs_api):
147146
dbfs_api.client.get_status.return_value = TEST_FILE_JSON

0 commit comments

Comments
 (0)