Skip to content

Commit 54c092c

Browse files
improve: configurable transfermanager and handle max retries error
1 parent f2b9138 commit 54c092c

4 files changed

Lines changed: 14 additions & 7 deletions

File tree

fast_s3/fetcher.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import List, Union
44

55
from botocore.exceptions import ClientError
6+
from s3transfer.exceptions import RetriesExceededError
67

78
from .file import File, Status
89
from .transfer_manager import transfer_manager
@@ -20,6 +21,7 @@ def __init__(
2021
ordered=True,
2122
buffer_size=1024,
2223
n_workers=32,
24+
**transfer_manager_kwargs,
2325
):
2426
self.paths = paths
2527
self.ordered = ordered
@@ -30,6 +32,7 @@ def __init__(
3032
aws_secret_access_key=aws_secret_access_key,
3133
region_name=region_name,
3234
n_workers=n_workers,
35+
**transfer_manager_kwargs,
3336
)
3437
self.bucket_name = bucket_name
3538
self.files: List[File] = []
@@ -60,8 +63,8 @@ def process_index(self, index):
6063
try:
6164
file.future.result()
6265
return file.with_status(Status.done)
63-
except ClientError:
64-
return file.with_status(Status.error)
66+
except (ClientError, RetriesExceededError) as e:
67+
return file.with_status(Status.error, exception=e)
6568

6669
def queue_download_(self):
6770
if self.current_path_index < len(self):

fast_s3/file.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import io
22
from enum import Enum
33
from pathlib import Path
4-
from typing import Union
4+
from typing import Optional, Union
55

66
from pydantic import BaseModel
77
from s3transfer.futures import TransferFuture
@@ -18,11 +18,12 @@ class File(BaseModel):
1818
future: TransferFuture
1919
path: Union[str, Path]
2020
status: Status = Status.pending
21+
exception: Optional[Exception] = None
2122

2223
class Config:
2324
arbitrary_types_allowed = True
2425

25-
def with_status(self, status: Status):
26+
def with_status(self, status: Status, exception: Optional[Exception] = None):
2627
attributes = self.dict()
27-
attributes.update(status=status)
28+
attributes.update(status=status, exception=exception)
2829
return File(**attributes)

fast_s3/transfer_manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ def transfer_manager(
1010
aws_secret_access_key: str,
1111
region_name: str,
1212
n_workers=32,
13+
**kwargs,
1314
) -> TransferManager:
1415
session = boto3.Session(
1516
aws_access_key_id=aws_access_key_id,
@@ -22,6 +23,6 @@ def transfer_manager(
2223
config=botocore.config.Config(max_pool_connections=n_workers),
2324
)
2425
transfer_config = s3transfer.TransferConfig(
25-
use_threads=True, max_concurrency=n_workers
26+
use_threads=True, max_concurrency=n_workers, **kwargs
2627
)
2728
return s3transfer.create_transfer_manager(client, transfer_config)

fast_s3/uploader.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from pathlib import Path
2-
from typing import List, Tuple, Union
2+
from typing import List, Union
33

44
from s3transfer.futures import TransferFuture
55

@@ -15,13 +15,15 @@ def __init__(
1515
region_name: str,
1616
bucket_name: str,
1717
n_workers=32,
18+
**transfer_manager_kwargs,
1819
):
1920
self.transfer_manager = transfer_manager(
2021
endpoint_url=endpoint_url,
2122
aws_access_key_id=aws_access_key_id,
2223
aws_secret_access_key=aws_secret_access_key,
2324
region_name=region_name,
2425
n_workers=n_workers,
26+
**transfer_manager_kwargs,
2527
)
2628
self.bucket_name = bucket_name
2729
self.futures: List[TransferFuture] = []

0 commit comments

Comments
 (0)