Skip to content

Commit 0e4d702

Browse files
committed
add exception wrapping to autods + fix pkl wrap
1 parent 95497f5 commit 0e4d702

2 files changed

Lines changed: 38 additions & 17 deletions

File tree

src/pasteur/kedro/dataset/auto.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import os
33
from copy import deepcopy
4+
from functools import wraps
45
from io import BytesIO
56
from pathlib import PurePosixPath
67
from typing import Any, Callable
@@ -19,11 +20,38 @@
1920
)
2021

2122
from ...utils import LazyDataset, LazyFrame, LazyPartition
22-
from ...utils.progress import get_node_name, process, process_in_parallel
23+
from ...utils.progress import get_node_name, process, process_in_parallel, DEBUG
2324

2425
logger = logging.getLogger(__name__)
2526

2627

28+
def _wrap_retry(f):
29+
if DEBUG:
30+
# Skip wrapping if DEBUG is on so exceptions break correctly
31+
return f
32+
33+
@wraps(f)
34+
def _wrap(*args, **kwargs):
35+
ex = None
36+
for i in range(3):
37+
try:
38+
return f(*args, **kwargs)
39+
except Exception as e:
40+
import time
41+
42+
# Prevents pipeline crashing on unreliable network shares
43+
logger.warn(
44+
f"Failed fs function '{f.__name__}(path={kwargs.get('path', str(args[0]) if args else 'None')})' (attempt {i + 1}/3). Waiting 1 second and retrying..."
45+
)
46+
time.sleep(1)
47+
ex = e
48+
if ex:
49+
raise ex
50+
51+
return _wrap
52+
53+
54+
@_wrap_retry
2755
def _save_worker(
2856
pid: str | None,
2957
path: str,
@@ -132,6 +160,7 @@ def _save_worker(
132160
fs_file.write(bytes_buffer.getvalue())
133161

134162

163+
@_wrap_retry
135164
def _load_worker(
136165
path: str,
137166
protocol: str,
@@ -155,6 +184,7 @@ def _load_worker(
155184
return pd.read_parquet(load_path, storage_options=storage_options, **load_args)
156185

157186

187+
@_wrap_retry
158188
def _load_merged_worker(
159189
load_path: str, filesystem, load_args, columns: list[str] | None = None
160190
):
@@ -191,6 +221,7 @@ def _load_merged_worker(
191221
return out
192222

193223

224+
@_wrap_retry
194225
def _load_shape_worker(load_path: str, filesystem, *_, **__):
195226
# TODO: verify this returns correct numbers (esp. columns)
196227
data = pq.ParquetDataset(load_path, filesystem=filesystem, use_legacy_dataset=False)

src/pasteur/kedro/dataset/modified.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from kedro.io.partitioned_dataset import PartitionedDataset
2020

2121
from ...utils import LazyDataset, LazyPartition
22+
from .auto import _wrap_retry
2223

2324
logger = logging.getLogger(__name__)
2425

@@ -418,26 +419,15 @@ def _describe(self) -> dict[str, Any]:
418419
"version": self._version,
419420
}
420421

422+
@_wrap_retry
421423
def _load(self) -> Any:
422424
load_path = get_filepath_str(self._get_load_path(), self._protocol)
423425

424-
e = None
425-
for i in range(5):
426-
try:
427-
with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:
428-
imported_backend = importlib.import_module(self._backend)
429-
return imported_backend.load(fs_file, **self._load_args) # type: ignore
430-
except Exception as e:
431-
import time
432-
433-
# Prevents pipeline crashing on unreliable network shares
434-
logger.warn(
435-
f"Failed loading file '{load_path}' (attempt {i + 1}/5). Waiting 1 second and retrying..."
436-
)
437-
time.sleep(1)
438-
if e:
439-
raise e
426+
with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:
427+
imported_backend = importlib.import_module(self._backend)
428+
return imported_backend.load(fs_file, **self._load_args) # type: ignore
440429

430+
@_wrap_retry
441431
def _save(self, data: Any) -> None:
442432
save_path = get_filepath_str(self._get_save_path(), self._protocol)
443433

0 commit comments

Comments
 (0)