Skip to content

Commit 0f5e5fa

Browse files
authored
Adding retries for DBFS API on Error Code 429 (#319)
This PR attempts to use the well received [tenacity library ](https://github.com/jd/tenacity) to add retries for the DBFS cli commands. This is needed since we are imposing request limiting for these APIs. The retries are done with an exponential backoff only for error code 429. The exponential multiplier is 1 sec and the max wait is 30 seconds. We will randomly wait up to 2^x * 1 seconds between each retry until the range reaches 60 seconds, then randomly up to 60 seconds afterwards where x is the number of retry. We will try a maximum for 7 times. Manually testing by using the following script produced this error. ``` Rate limit exceeded. Retrying with exponential backoff. WARNING:databricks_cli.dbfs.api: Received 429 REQUEST_LIMIT_EXCEEDED for attempt 1. Retrying in 0.2678150505033847 seconds. Rate limit exceeded. Retrying with exponential backoff. WARNING:databricks_cli.dbfs.api: Received 429 REQUEST_LIMIT_EXCEEDED for attempt 2. Retrying in 1.4529390049015964 seconds. Rate limit exceeded. Retrying with exponential backoff. WARNING:databricks_cli.dbfs.api: Received 429 REQUEST_LIMIT_EXCEEDED for attempt 3. Retrying in 1.4825864280042993 seconds. Rate limit exceeded. Retrying with exponential backoff. WARNING:databricks_cli.dbfs.api: Received 429 REQUEST_LIMIT_EXCEEDED for attempt 4. Retrying in 3.79336630796632 seconds. Rate limit exceeded. Retrying with exponential backoff. WARNING:databricks_cli.dbfs.api: Received 429 REQUEST_LIMIT_EXCEEDED for attempt 5. Retrying in 19.54595718573239 seconds. Rate limit exceeded. Retrying with exponential backoff. WARNING:databricks_cli.dbfs.api: Received 429 REQUEST_LIMIT_EXCEEDED for attempt 6. Retrying in 32.52860286202574 seconds. DbfsTestingRateLimitSuite-c5458501-c08b-4add-b782-d3198fee31a2 bogdan-ghita databricks-results test-dir-1 tmp DbfsTestingRateLimitSuite-c5458501-c08b-4add-b782-d3198fee31a2 bogdan-ghita databricks-results test-dir-1 tmp DbfsTestingRateLimitSuite-c5458501-c08b-4add-b782-d3198fee31a2 bogdan-ghita databricks-results test-dir-1 tmp ``` In case of an error, we see the following output. ``` Rate limit exceeded. Retrying with exponential backoff. WARNING:databricks_cli.dbfs.api: Received 429 REQUEST_LIMIT_EXCEEDED for attempt 1. Retrying in 0.25618165976582064 seconds. Rate limit exceeded. Retrying with exponential backoff. WARNING:databricks_cli.dbfs.api: Received 429 REQUEST_LIMIT_EXCEEDED for attempt 2. Retrying in 1.3924883604006733 seconds. Rate limit exceeded. Retrying with exponential backoff. Error: RateLimitException: 429 Too Many Requests ``` Script: ``` #!/bin/bash CONCURRENT_REQ_LIMIT=30 NUM_REQUESTS_TO_EXCEED=1 NUM_REQUESTS=$((CONCURRENT_REQ_LIMIT + NUM_REQUESTS_TO_EXCEED)) for (( i=1; i<=$NUM_REQUESTS; i++ )); do echo "$(date) Sending request $i" dbfs ls dbfs:/ & done wait ``` Unit tests were added to test the changes.
1 parent bf2951f commit 0f5e5fa

4 files changed

Lines changed: 205 additions & 9 deletions

File tree

databricks_cli/dbfs/api.py

Lines changed: 70 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,29 @@
2828
import tempfile
2929

3030
import re
31+
import logging
32+
import sys
3133
import click
3234

35+
36+
from tenacity import retry, wait_random_exponential, retry_if_exception_type, stop_after_attempt
37+
3338
from requests.exceptions import HTTPError
3439

3540
from databricks_cli.sdk import DbfsService
3641
from databricks_cli.utils import error_and_quit
3742
from databricks_cli.dbfs.dbfs_path import DbfsPath
38-
from databricks_cli.dbfs.exceptions import LocalFileExistsException
43+
from databricks_cli.dbfs.exceptions import LocalFileExistsException, RateLimitException
3944

4045
BUFFER_SIZE_BYTES = 2**20
46+
EXPONENTIAL_BACKOFF_MULTIPLIER = 1
47+
MAX_SECONDS_WAIT = 60
48+
MAX_RETRY_ATTEMPTS = 7
49+
time_for_last_retry = 0
50+
51+
logging.basicConfig(stream=sys.stderr, level=logging.WARNING)
52+
53+
logger = logging.getLogger(__name__)
4154

4255

4356
class ParseException(Exception):
@@ -75,12 +88,42 @@ class DbfsErrorCodes(object):
7588
RESOURCE_DOES_NOT_EXIST = 'RESOURCE_DOES_NOT_EXIST'
7689
RESOURCE_ALREADY_EXISTS = 'RESOURCE_ALREADY_EXISTS'
7790
PARTIAL_DELETE = 'PARTIAL_DELETE'
91+
TOO_MANY_REQUESTS = 'TOO_MANY_REQUESTS'
92+
93+
94+
def before_sleep_on_429(retry_state):
95+
global time_for_last_retry
96+
if retry_state.attempt_number < 1:
97+
loglevel = logging.INFO
98+
else:
99+
loglevel = logging.WARNING
100+
logger.log(
101+
loglevel, ' Received 429 REQUEST_LIMIT_EXCEEDED for attempt %s. Retrying in %s seconds.',
102+
retry_state.attempt_number, retry_state.idle_for - time_for_last_retry)
103+
time_for_last_retry = retry_state.idle_for
104+
105+
106+
def retry_429(func):
107+
@retry(wait=wait_random_exponential(multiplier=EXPONENTIAL_BACKOFF_MULTIPLIER,
108+
max=MAX_SECONDS_WAIT), retry=retry_if_exception_type(RateLimitException),
109+
stop=stop_after_attempt(MAX_RETRY_ATTEMPTS), reraise=True,
110+
before_sleep=before_sleep_on_429)
111+
def wrapped_function(*args, **kwargs):
112+
try:
113+
return func(*args, **kwargs)
114+
except HTTPError as e:
115+
if e.response.status_code == 429:
116+
click.echo("Rate limit exceeded. Retrying with exponential backoff.")
117+
raise RateLimitException("429 Too Many Requests")
118+
raise e
119+
return wrapped_function
78120

79121

80122
class DbfsApi(object):
81123
def __init__(self, api_client):
82124
self.client = DbfsService(api_client)
83125

126+
@retry_429
84127
def list_files(self, dbfs_path, headers=None):
85128
list_response = self.client.list(dbfs_path.absolute_path, headers=headers)
86129
if 'files' in list_response:
@@ -101,20 +144,37 @@ def file_exists(self, dbfs_path, headers=None):
101144
raise e
102145
return True
103146

147+
@retry_429
104148
def get_status(self, dbfs_path, headers=None):
105149
json = self.client.get_status(dbfs_path.absolute_path, headers=headers)
106150
return FileInfo.from_json(json)
107151

152+
@retry_429
153+
def create(self, dbfs_path, overwrite, headers):
154+
return self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)
155+
156+
@retry_429
157+
def add_block(self, handle, contents, headers):
158+
self.client.add_block(handle, contents, headers=headers)
159+
160+
@retry_429
161+
def close(self, handle, headers):
162+
self.client.close(handle, headers=headers)
163+
108164
def put_file(self, src_path, dbfs_path, overwrite, headers=None):
109-
handle = self.client.create(dbfs_path.absolute_path, overwrite, headers=headers)['handle']
165+
handle = self.create(dbfs_path, overwrite, headers=headers)['handle']
110166
with open(src_path, 'rb') as local_file:
111167
while True:
112168
contents = local_file.read(BUFFER_SIZE_BYTES)
113169
if len(contents) == 0:
114170
break
115171
# add_block should not take a bytes object.
116-
self.client.add_block(handle, b64encode(contents).decode(), headers=headers)
117-
self.client.close(handle, headers=headers)
172+
self.add_block(handle, b64encode(contents).decode(), headers=headers)
173+
self.close(handle, headers=headers)
174+
175+
@retry_429
176+
def read(self, dbfs_path, offset, headers):
177+
return self.client.read(dbfs_path.absolute_path, offset, BUFFER_SIZE_BYTES, headers=headers)
118178

119179
def get_file(self, dbfs_path, dst_path, overwrite, headers=None):
120180
if os.path.exists(dst_path) and not overwrite:
@@ -126,8 +186,7 @@ def get_file(self, dbfs_path, dst_path, overwrite, headers=None):
126186
offset = 0
127187
with open(dst_path, 'wb') as local_file:
128188
while offset < length:
129-
response = self.client.read(dbfs_path.absolute_path, offset, BUFFER_SIZE_BYTES,
130-
headers=headers)
189+
response = self.read(dbfs_path, offset, headers=headers)
131190
bytes_read = response['bytes_read']
132191
data = response['data']
133192
offset += bytes_read
@@ -146,11 +205,13 @@ def get_num_files_deleted(partial_delete_error):
146205
message))
147206
return int(m.group(1))
148207

208+
@retry_429
149209
def delete(self, dbfs_path, recursive, headers=None):
150210
num_files_deleted = 0
151211
while True:
152212
try:
153-
self.client.delete(dbfs_path.absolute_path, recursive=recursive, headers=headers)
213+
self.client.delete(dbfs_path.absolute_path,
214+
recursive=recursive, headers=headers)
154215
except HTTPError as e:
155216
if e.response.status_code == 503:
156217
try:
@@ -172,9 +233,11 @@ def delete(self, dbfs_path, recursive, headers=None):
172233
break
173234
click.echo("\rDelete finished successfully.\033[K")
174235

236+
@retry_429
175237
def mkdirs(self, dbfs_path, headers=None):
176238
self.client.mkdirs(dbfs_path.absolute_path, headers=headers)
177239

240+
@retry_429
178241
def move(self, dbfs_src, dbfs_dst, headers=None):
179242
self.client.move(dbfs_src.absolute_path, dbfs_dst.absolute_path, headers=headers)
180243

databricks_cli/dbfs/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,7 @@
2424

2525
class LocalFileExistsException(Exception):
2626
pass
27+
28+
29+
class RateLimitException(Exception):
30+
pass

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
'tabulate>=0.7.7',
4040
'six>=1.10.0',
4141
'configparser>=0.3.5;python_version < "3.6"',
42+
'tenacity>=6.2.0'
4243
],
4344
entry_points='''
4445
[console_scripts]

tests/dbfs/test_api.py

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
import databricks_cli.dbfs.api as api
3333
from databricks_cli.dbfs.dbfs_path import DbfsPath
34-
from databricks_cli.dbfs.exceptions import LocalFileExistsException
34+
from databricks_cli.dbfs.exceptions import LocalFileExistsException, RateLimitException
3535

3636
TEST_DBFS_PATH = DbfsPath('dbfs:/test')
3737
TEST_FILE_JSON = {
@@ -55,6 +55,13 @@ def get_partial_delete_exception(message="[...] operation has deleted 10 files [
5555
return requests.exceptions.HTTPError(response=response)
5656

5757

58+
def get_rate_limit_exception():
59+
response = requests.Response()
60+
response.status_code = 429
61+
response._content = ('{{"error_code": "{}"}}'.format(api.DbfsErrorCodes.TOO_MANY_REQUESTS)).encode() # NOQA
62+
return requests.exceptions.HTTPError(response=response)
63+
64+
5865
class TestFileInfo(object):
5966
def test_to_row_not_long_form_not_absolute(self):
6067
file_info = api.FileInfo(TEST_DBFS_PATH, False, 1)
@@ -103,6 +110,38 @@ def test_list_files_does_not_exist(self, dbfs_api):
103110

104111
assert len(files) == 0
105112

113+
def test_list_files_rate_limited(self, dbfs_api):
114+
rate_limit_exception = get_rate_limit_exception()
115+
# Simulate 2 rate limit exceptions followed by a full successful operation
116+
exception_sequence = [rate_limit_exception, rate_limit_exception, None]
117+
dbfs_api.client.list_files = mock.Mock(side_effect=exception_sequence)
118+
# Should succeed
119+
files = dbfs_api.list_files(TEST_DBFS_PATH)
120+
121+
assert len(files) == 0
122+
123+
def test_mkdirs_rate_limited(self, dbfs_api):
124+
rate_limit_exception = get_rate_limit_exception()
125+
# Simulate 2 rate limit exceptions followed by a full successful operation
126+
exception_sequence = [rate_limit_exception, rate_limit_exception, None]
127+
dbfs_api.client.mkdirs = mock.Mock(side_effect=exception_sequence)
128+
# Should succeed
129+
dbfs_api.mkdirs(DbfsPath('dbfs:/test/mkdir'))
130+
files = dbfs_api.client.list(DbfsPath('dbfs:/test/mkdir'))
131+
132+
assert len(files) == 0
133+
134+
def test_mkdirs_stop_retrying(self, dbfs_api):
135+
rate_limit_exception = get_rate_limit_exception()
136+
# Simulate 9 rate limit exceptions which will fail eventually
137+
exception_sequence = [rate_limit_exception, rate_limit_exception, rate_limit_exception,
138+
rate_limit_exception, rate_limit_exception, rate_limit_exception,
139+
rate_limit_exception, rate_limit_exception, rate_limit_exception]
140+
dbfs_api.client.mkdirs = mock.Mock(side_effect=exception_sequence)
141+
with pytest.raises(RateLimitException):
142+
dbfs_api.mkdirs(DbfsPath('dbfs:/test/mkdir'))
143+
assert dbfs_api.client.mkdirs.call_count == 7
144+
106145
def test_file_exists_true(self, dbfs_api):
107146
dbfs_api.client.get_status.return_value = TEST_FILE_JSON
108147
assert dbfs_api.file_exists(TEST_DBFS_PATH)
@@ -122,6 +161,16 @@ def test_get_status_fail(self, dbfs_api):
122161
with pytest.raises(exception.__class__):
123162
dbfs_api.get_status(TEST_DBFS_PATH)
124163

164+
def test_get_status_rate_limited(self, dbfs_api):
165+
exception = get_rate_limit_exception()
166+
dbfs_api.client.get_status = mock.Mock(side_effect=[exception, {
167+
'path': '/test',
168+
'is_dir': False,
169+
'file_size': 1
170+
}])
171+
# Should succeed
172+
assert dbfs_api.get_status(TEST_DBFS_PATH) is not None
173+
125174
def test_put_file(self, dbfs_api, tmpdir):
126175
test_file_path = os.path.join(tmpdir.strpath, 'test')
127176
with open(test_file_path, 'wt') as f:
@@ -138,6 +187,26 @@ def test_put_file(self, dbfs_api, tmpdir):
138187
assert api_mock.close.call_count == 1
139188
assert test_handle == api_mock.close.call_args[0][0]
140189

190+
def test_put_file_rate_limited(self, dbfs_api, tmpdir):
191+
test_file_path = os.path.join(tmpdir.strpath, 'test')
192+
with open(test_file_path, 'wt') as f:
193+
f.write('test')
194+
195+
exception = get_rate_limit_exception()
196+
api_mock = dbfs_api.client
197+
test_handle = 0
198+
api_mock.create = mock.Mock(side_effect=[exception, {'handle': test_handle}])
199+
api_mock.add_block = mock.Mock(side_effect=[exception, None])
200+
api_mock.close = mock.Mock(side_effect=[exception, None])
201+
dbfs_api.put_file(test_file_path, TEST_DBFS_PATH, True)
202+
203+
assert api_mock.create.call_count == 2
204+
assert api_mock.add_block.call_count == 2
205+
assert test_handle == api_mock.add_block.call_args[0][0]
206+
assert b64encode(b'test').decode() == api_mock.add_block.call_args[0][1]
207+
assert api_mock.close.call_count == 2
208+
assert test_handle == api_mock.close.call_args[0][0]
209+
141210
def test_get_file_check_overwrite(self, dbfs_api, tmpdir):
142211
test_file_path = os.path.join(tmpdir.strpath, 'test')
143212
with open(test_file_path, 'w') as f:
@@ -159,6 +228,22 @@ def test_get_file(self, dbfs_api, tmpdir):
159228
with open(test_file_path, 'r') as f:
160229
assert f.read() == 'x'
161230

231+
def test_get_file_rate_limited(self, dbfs_api, tmpdir):
232+
api_mock = dbfs_api.client
233+
api_mock.get_status.return_value = TEST_FILE_JSON
234+
rate_limit_exception = get_rate_limit_exception()
235+
236+
api_mock.read = mock.Mock(side_effect=[rate_limit_exception, {
237+
'bytes_read': 1,
238+
'data': b64encode(b'x'),
239+
}])
240+
241+
test_file_path = os.path.join(tmpdir.strpath, 'test')
242+
dbfs_api.get_file(TEST_DBFS_PATH, test_file_path, True)
243+
244+
with open(test_file_path, 'r') as f:
245+
assert f.read() == 'x'
246+
162247
def test_cat(self, dbfs_api):
163248
dbfs_api.client.get_status.return_value = {
164249
'path': '/test',
@@ -173,15 +258,58 @@ def test_cat(self, dbfs_api):
173258
dbfs_api.cat('dbfs:/whatever-doesnt-matter')
174259
click_mock.echo.assert_called_with('a', nl=False)
175260

261+
def test_cat_rate_limited(self, dbfs_api):
262+
rate_limit_exception = get_rate_limit_exception()
263+
# Simulate 2 rate limit exceptions followed by a full successful operation
264+
get_status_exception_sequence = [rate_limit_exception, rate_limit_exception, {
265+
'path': '/test',
266+
'is_dir': False,
267+
'file_size': 1
268+
}]
269+
read_exception_sequence = [rate_limit_exception, rate_limit_exception, {
270+
'bytes_read': 1,
271+
'data': b64encode(b'a'),
272+
}]
273+
274+
dbfs_api.client.get_status = mock.Mock(side_effect=get_status_exception_sequence)
275+
dbfs_api.client.read = mock.Mock(side_effect=read_exception_sequence)
276+
277+
with mock.patch('databricks_cli.dbfs.api.click') as click_mock:
278+
dbfs_api.cat('dbfs:/whatever-doesnt-matter')
279+
click_mock.echo.assert_called_with('a', nl=False)
280+
176281
def test_partial_delete(self, dbfs_api):
177282
e_partial_delete = get_partial_delete_exception()
178-
# Simulate 3 partial deletes followed by a full successful delete
283+
# Simulate 3 partial deletes followed by a full successful operation
179284
exception_sequence = [e_partial_delete, e_partial_delete, e_partial_delete, None]
180285
dbfs_api.client.delete = mock.Mock(side_effect=exception_sequence)
181286
dbfs_api.delete_retry_delay_millis = 1
182287
# Should succeed
183288
dbfs_api.delete(DbfsPath('dbfs:/whatever-doesnt-matter'), recursive=True)
184289

290+
def test_partial_delete_with_rate_limit(self, dbfs_api):
291+
rate_limit_exception = get_rate_limit_exception()
292+
e_partial_delete = get_partial_delete_exception()
293+
# Simulate 3 partial deletes interspersed with rate limiting exceptions
294+
# followed by a full successful delete
295+
exception_sequence = [e_partial_delete, rate_limit_exception,
296+
e_partial_delete, rate_limit_exception,
297+
e_partial_delete, None]
298+
299+
dbfs_api.client.delete = mock.Mock(side_effect=exception_sequence)
300+
dbfs_api.delete_retry_delay_millis = 1
301+
# Should succeed
302+
dbfs_api.delete(DbfsPath('dbfs:/whatever-doesnt-matter'), recursive=True)
303+
304+
def test_delete_with_rate_limit(self, dbfs_api):
305+
rate_limit_exception = get_rate_limit_exception()
306+
# Simulate a rate limit exception followed by a full successful delete
307+
exception_sequence = [rate_limit_exception, None]
308+
dbfs_api.client.delete = mock.Mock(side_effect=exception_sequence)
309+
dbfs_api.delete_retry_delay_millis = 1
310+
# Should succeed
311+
dbfs_api.delete(DbfsPath('dbfs:/whatever-doesnt-matter'), recursive=True)
312+
185313
def test_partial_delete_exception_message_parse_error(self, dbfs_api):
186314
message = "unexpected partial delete exception message"
187315
e_partial_delete = get_partial_delete_exception(message)

0 commit comments

Comments
 (0)