Skip to content

Commit 37d51b1

Browse files
DBFS API 429 retries: Fix bug on computing the time until the next retry (#326)
In #319 we added 429 retries for the DBFS API. Computing the time until the next retry as `retry_state.idle_for - time_for_last_retry` is not going to work as expected in the case where more than 1 call to functions wrapped with `retry_429` happens (e.g. in the case of `put_file`). The problem is that `time_for_last_retry` is only initialised once per databricks-cli command, while it should be initialised every time a function wrapped with `retry_429` is called, in the first attempt. We fix this by initialising `time_for_last_retry` if `retry_state.attempt_number == 1`. This only works if the code is single-threaded, which is the case here. Additional improvements: * Remove unnecessary retry log, since we already have one. * Increase max number of retries to 8, so that we have one additional retry with long delay. * Use click.echo instead of logger, to maintain consistency with the rest of the code. * Add clarifying note on `retry_state.idle_for`. * Display seconds with 2 decimal places only.
1 parent 0f5e5fa commit 37d51b1

2 files changed

Lines changed: 17 additions & 17 deletions

File tree

databricks_cli/dbfs/api.py

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,9 @@
2828
import tempfile
2929

3030
import re
31-
import logging
32-
import sys
3331
import click
3432

35-
3633
from tenacity import retry, wait_random_exponential, retry_if_exception_type, stop_after_attempt
37-
3834
from requests.exceptions import HTTPError
3935

4036
from databricks_cli.sdk import DbfsService
@@ -45,13 +41,9 @@
4541
BUFFER_SIZE_BYTES = 2**20
4642
EXPONENTIAL_BACKOFF_MULTIPLIER = 1
4743
MAX_SECONDS_WAIT = 60
48-
MAX_RETRY_ATTEMPTS = 7
44+
MAX_RETRY_ATTEMPTS = 8
4945
time_for_last_retry = 0
5046

51-
logging.basicConfig(stream=sys.stderr, level=logging.WARNING)
52-
53-
logger = logging.getLogger(__name__)
54-
5547

5648
class ParseException(Exception):
5749
pass
@@ -94,13 +86,21 @@ class DbfsErrorCodes(object):
9486
def before_sleep_on_429(retry_state):
9587
global time_for_last_retry
9688
if retry_state.attempt_number < 1:
97-
loglevel = logging.INFO
89+
click.echo("Warning: Unexpected retry_state.attempt_number={}.".format(
90+
retry_state.attempt_number))
91+
click.echo("Received 429 REQUEST_LIMIT_EXCEEDED. Retrying with exponential backoff.")
9892
else:
99-
loglevel = logging.WARNING
100-
logger.log(
101-
loglevel, ' Received 429 REQUEST_LIMIT_EXCEEDED for attempt %s. Retrying in %s seconds.',
102-
retry_state.attempt_number, retry_state.idle_for - time_for_last_retry)
103-
time_for_last_retry = retry_state.idle_for
93+
# Initialize time_for_last_retry on the first attempt.
94+
if retry_state.attempt_number == 1:
95+
time_for_last_retry = 0
96+
# Note: Here idle_for represents the total time spent sleeping in all retries so far +
97+
# the time that we will sleep until the next retry. We determined this empirically,
98+
# as it is not clearly stated in the Tenacity docs.
99+
time_until_next_retry = retry_state.idle_for - time_for_last_retry
100+
click.echo(("Received 429 REQUEST_LIMIT_EXCEEDED for attempt {}. "
101+
"Retrying in {:.2f} seconds.").format(retry_state.attempt_number,
102+
time_until_next_retry))
103+
time_for_last_retry = retry_state.idle_for
104104

105105

106106
def retry_429(func):
@@ -113,7 +113,6 @@ def wrapped_function(*args, **kwargs):
113113
return func(*args, **kwargs)
114114
except HTTPError as e:
115115
if e.response.status_code == 429:
116-
click.echo("Rate limit exceeded. Retrying with exponential backoff.")
117116
raise RateLimitException("429 Too Many Requests")
118117
raise e
119118
return wrapped_function

tests/dbfs/test_api.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
'file_size': 1
4141
}
4242
TEST_FILE_INFO = api.FileInfo(TEST_DBFS_PATH, False, 1)
43+
MAX_RETRY_ATTEMPTS = 8
4344

4445

4546
def get_resource_does_not_exist_exception():
@@ -140,7 +141,7 @@ def test_mkdirs_stop_retrying(self, dbfs_api):
140141
dbfs_api.client.mkdirs = mock.Mock(side_effect=exception_sequence)
141142
with pytest.raises(RateLimitException):
142143
dbfs_api.mkdirs(DbfsPath('dbfs:/test/mkdir'))
143-
assert dbfs_api.client.mkdirs.call_count == 7
144+
assert dbfs_api.client.mkdirs.call_count == MAX_RETRY_ATTEMPTS
144145

145146
def test_file_exists_true(self, dbfs_api):
146147
dbfs_api.client.get_status.return_value = TEST_FILE_JSON

0 commit comments

Comments
 (0)