Skip to content

Commit ebf4879

Browse files
Add support for DBFS partial deletes (#280)
* Implemented retry logic for partial deletes * Added unit tests for partial delete handling * Changed expected partial delete request response. Added log to indicate progress on partial deletes. * Added progress message with total number of files deleted in partial deletes. * Fixed lint error: import order. * Add retries for other 503 errors in delete. * Added retry delay for delete. * Added more tests for dbfs delete.
1 parent 78c91f5 commit ebf4879

2 files changed

Lines changed: 120 additions & 2 deletions

File tree

databricks_cli/dbfs/api.py

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import shutil
2828
import tempfile
2929

30+
import re
31+
import time
3032
import click
3133

3234
from requests.exceptions import HTTPError
@@ -37,6 +39,12 @@
3739
from databricks_cli.dbfs.exceptions import LocalFileExistsException
3840

3941
BUFFER_SIZE_BYTES = 2**20
42+
DELETE_MAX_CONSECUTIVE_503_RETRIES = 3
43+
DELETE_503_RETRY_DELAY_MILLIS = 30 * 1000
44+
45+
46+
class ParseException(Exception):
47+
pass
4048

4149

4250
class FileInfo(object):
@@ -69,11 +77,14 @@ def __eq__(self, other):
6977
class DbfsErrorCodes(object):
7078
RESOURCE_DOES_NOT_EXIST = 'RESOURCE_DOES_NOT_EXIST'
7179
RESOURCE_ALREADY_EXISTS = 'RESOURCE_ALREADY_EXISTS'
80+
TEMPORARILY_UNAVAILABLE = 'TEMPORARILY_UNAVAILABLE'
81+
PARTIAL_DELETE = 'PARTIAL_DELETE'
7282

7383

7484
class DbfsApi(object):
75-
def __init__(self, api_client):
85+
def __init__(self, api_client, delete_retry_delay_millis=DELETE_503_RETRY_DELAY_MILLIS):
7686
self.client = DbfsService(api_client)
87+
self.delete_retry_delay_millis = delete_retry_delay_millis
7788

7889
def list_files(self, dbfs_path, headers=None):
7990
list_response = self.client.list(dbfs_path.absolute_path, headers=headers)
@@ -123,8 +134,52 @@ def get_file(self, dbfs_path, dst_path, overwrite, headers=None):
123134
offset += bytes_read
124135
local_file.write(b64decode(data))
125136

137+
@staticmethod
138+
def get_num_files_deleted(partial_delete_error):
139+
try:
140+
message = partial_delete_error.response.json()['message']
141+
except (AttributeError, KeyError):
142+
raise ParseException("Unable to retrieve the number of deleted files.")
143+
m = re.compile(r".*operation has deleted (\d+) files.*").match(message)
144+
if not m:
145+
raise ParseException(
146+
"Unable to retrieve the number of deleted files from the error message: {}".format(
147+
message))
148+
return int(m.group(1))
149+
126150
def delete(self, dbfs_path, recursive, headers=None):
127-
self.client.delete(dbfs_path.absolute_path, recursive=recursive, headers=headers)
151+
num_consecutive_503_retries = 0
152+
num_files_deleted = 0
153+
while True:
154+
try:
155+
self.client.delete(dbfs_path.absolute_path, recursive=recursive, headers=headers)
156+
except HTTPError as e:
157+
if e.response.status_code == 503:
158+
try:
159+
error_code = e.response.json()['error_code']
160+
except (AttributeError, KeyError):
161+
error_code = None
162+
# Handle partial delete exceptions: retry until all the files have been deleted
163+
if error_code == DbfsErrorCodes.PARTIAL_DELETE:
164+
try:
165+
num_files_deleted += DbfsApi.get_num_files_deleted(e)
166+
click.echo("\rDeleted {} files. Delete in progress...\033[K".format(
167+
num_files_deleted), nl=False)
168+
except ParseException:
169+
click.echo("\rDelete in progress...\033[K", nl=False)
170+
num_consecutive_503_retries = 0
171+
continue
172+
# Retry at most DELETE_MAX_CONSECUTIVE_503_ERRORS times for other 503 errors
173+
elif num_consecutive_503_retries < DELETE_MAX_CONSECUTIVE_503_RETRIES:
174+
num_consecutive_503_retries += 1
175+
time.sleep(float(self.delete_retry_delay_millis) / 1000)
176+
continue
177+
else:
178+
raise e
179+
else:
180+
raise e
181+
break
182+
click.echo("\rDelete finished successfully.\033[K")
128183

129184
def mkdirs(self, dbfs_path, headers=None):
130185
self.client.mkdirs(dbfs_path.absolute_path, headers=headers)

tests/dbfs/test_api.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,20 @@ def get_resource_does_not_exist_exception():
4848
return requests.exceptions.HTTPError(response=response)
4949

5050

51+
def get_temporarily_unavailable_exception():
52+
response = requests.Response()
53+
response.status_code = 503
54+
response._content = ('{{"error_code": "{}"}}'.format(api.DbfsErrorCodes.TEMPORARILY_UNAVAILABLE)).encode() # NOQA
55+
return requests.exceptions.HTTPError(response=response)
56+
57+
58+
def get_partial_delete_exception(message="[...] operation has deleted 10 files [...]"):
59+
response = requests.Response()
60+
response.status_code = 503
61+
response._content = ('{{"error_code": "{}","message": "{}"}}'.format(api.DbfsErrorCodes.PARTIAL_DELETE, message)).encode() # NOQA
62+
return requests.exceptions.HTTPError(response=response)
63+
64+
5165
class TestFileInfo(object):
5266
def test_to_row_not_long_form_not_absolute(self):
5367
file_info = api.FileInfo(TEST_DBFS_PATH, False, 1)
@@ -165,3 +179,52 @@ def test_cat(self, dbfs_api):
165179
with mock.patch('databricks_cli.dbfs.api.click') as click_mock:
166180
dbfs_api.cat('dbfs:/whatever-doesnt-matter')
167181
click_mock.echo.assert_called_with('a', nl=False)
182+
183+
def test_partial_delete(self, dbfs_api):
184+
e_partial_delete = get_partial_delete_exception()
185+
e_temporarily_unavailable = get_temporarily_unavailable_exception()
186+
# Simulate partial deletes and 503 exceptions followed by a full successful delete
187+
exception_sequence = \
188+
[e_temporarily_unavailable, e_partial_delete, e_partial_delete] + \
189+
[e_temporarily_unavailable] * api.DELETE_MAX_CONSECUTIVE_503_RETRIES + \
190+
[e_partial_delete, None]
191+
dbfs_api.client.delete = mock.Mock(side_effect=exception_sequence)
192+
dbfs_api.delete_retry_delay_millis = 1
193+
# Should succeed
194+
dbfs_api.delete(DbfsPath('dbfs:/whatever-doesnt-matter'), recursive=True)
195+
196+
def test_partial_delete_service_unavailable(self, dbfs_api):
197+
e_partial_delete = get_partial_delete_exception()
198+
e_temporarily_unavailable = get_temporarily_unavailable_exception()
199+
# Simulate more than api.DELETE_MAX_CONSECUTIVE_503_ERRORS 503 errors that are not partial
200+
# deletes (error_code != PARTIAL_DELETE)
201+
exception_sequence = \
202+
[e_partial_delete] + \
203+
[e_temporarily_unavailable] * (api.DELETE_MAX_CONSECUTIVE_503_RETRIES + 1) + \
204+
[e_partial_delete, None]
205+
dbfs_api.client.delete = mock.Mock(side_effect=exception_sequence)
206+
dbfs_api.delete_retry_delay_millis = 1
207+
with pytest.raises(e_temporarily_unavailable.__class__) as thrown:
208+
dbfs_api.delete(DbfsPath('dbfs:/whatever-doesnt-matter'), recursive=True)
209+
# Should raise the same e_temporarily_unavailable exception instance
210+
assert thrown.value == e_temporarily_unavailable
211+
212+
def test_partial_delete_exception_message_parse_error(self, dbfs_api):
213+
message = "unexpected partial delete exception message"
214+
e_partial_delete = get_partial_delete_exception(message)
215+
dbfs_api.client.delete = mock.Mock(side_effect=[e_partial_delete, None])
216+
dbfs_api.delete_retry_delay_millis = 1
217+
# Should succeed
218+
dbfs_api.delete(DbfsPath('dbfs:/whatever-doesnt-matter'), recursive=True)
219+
220+
def test_get_num_files_deleted(self):
221+
e_partial_delete = get_partial_delete_exception()
222+
# Should succeed
223+
api.DbfsApi.get_num_files_deleted(e_partial_delete)
224+
225+
def test_get_num_files_deleted_parse_error(self):
226+
message = "unexpected partial delete exception message"
227+
e_partial_delete = get_partial_delete_exception(message)
228+
# Should raise api.ParseException
229+
with pytest.raises(api.ParseException):
230+
api.DbfsApi.get_num_files_deleted(e_partial_delete)

0 commit comments

Comments
 (0)