Skip to content

Commit 52ffba7

Browse files
author
FelixAbrahamsson
committed
feature: fetcher and uploader
0 parents  commit 52ffba7

12 files changed

Lines changed: 2609 additions & 0 deletions

File tree

.gitignore

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
.venv
2+
.cache
3+
__pycache__
4+
.pytest_cache
5+
*.egg-info
6+
.vscode
7+
dist
8+
.eggs/
9+
build/
10+
_build/
11+
*.pyc
12+
rclone.conf
13+
*.json
14+
scratch.py
15+
16+
AUTHORS
17+
ChangeLog
18+
19+
*.env*

.python-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.8.8

README.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Download/upload images from/to s3 very quickly
2+
3+
## Setup
4+
5+
```
6+
poetry install
7+
```
8+
9+
## Usage
10+
11+
Download image files
12+
13+
```python
14+
from PIL import Image
15+
from fast_s3 import Fetcher
16+
17+
18+
large_list_of_image_paths = [...]
19+
20+
fetcher = Fetcher(
21+
paths=large_list_of_image_paths,
22+
endpoint_url="https://s3.my-path-to-s3",
23+
aws_access_key_id="my-key-id",
24+
aws_secret_access_key="my-secret-key",
25+
region_name="my-region",
26+
bucket_name="my-bucket",
27+
ordered=True, # returns files in the same order as paths
28+
buffer_size=1024,
29+
n_workers=32,
30+
)
31+
32+
for file in fetcher:
33+
Image.open(file.buffer).save(file.path)
34+
```
35+
36+
Upload files
37+
38+
```python
39+
from fast_s3 import Uploader
40+
41+
42+
large_list_of_files = [...]
43+
large_list_of_paths = [...]
44+
45+
uploader = Uploader(
46+
endpoint_url="https://s3.my-path-to-s3",
47+
aws_access_key_id="my-key-id",
48+
aws_secret_access_key="my-secret-key",
49+
region_name="my-region",
50+
bucket_name="my-bucket",
51+
)
52+
53+
uploader.upload_files(
54+
large_list_of_files,
55+
large_list_of_paths,
56+
)
57+
```

fast_s3/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .fetcher import Fetcher
2+
from .uploader import Uploader

fast_s3/fetcher.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import io
2+
from pathlib import Path
3+
from typing import List, Union
4+
5+
from .file import File
6+
from .transfer_manager import transfer_manager
7+
8+
9+
class Fetcher:
10+
def __init__(
11+
self,
12+
paths: List[Union[str, Path]],
13+
endpoint_url: str,
14+
aws_access_key_id: str,
15+
aws_secret_access_key: str,
16+
region_name: str,
17+
bucket_name: str,
18+
ordered=True,
19+
buffer_size=1024,
20+
n_workers=32,
21+
):
22+
self.paths = paths
23+
self.ordered = ordered
24+
self.buffer_size = buffer_size
25+
self.transfer_manager = transfer_manager(
26+
endpoint_url=endpoint_url,
27+
aws_access_key_id=aws_access_key_id,
28+
aws_secret_access_key=aws_secret_access_key,
29+
region_name=region_name,
30+
n_workers=n_workers,
31+
)
32+
self.bucket_name = bucket_name
33+
self.files: List[File] = []
34+
self.current_path_index = 0
35+
36+
def __len__(self):
37+
return len(self.paths)
38+
39+
def __iter__(self):
40+
for _ in range(self.buffer_size):
41+
self.queue_download_()
42+
43+
if self.ordered:
44+
for _ in range(len(self)):
45+
file = self.files.pop(0)
46+
file.future.result()
47+
yield file
48+
self.queue_download_()
49+
else:
50+
for _ in range(len(self)):
51+
for index, file in enumerate(self.files):
52+
if file.future.done():
53+
break
54+
else:
55+
index = 0
56+
file = self.files.pop(index)
57+
file.future.result()
58+
yield file
59+
self.queue_download_()
60+
61+
def queue_download_(self):
62+
if self.current_path_index < len(self):
63+
buffer = io.BytesIO()
64+
path = self.paths[self.current_path_index]
65+
self.files.append(
66+
File(
67+
buffer=buffer,
68+
future=self.transfer_manager.download(
69+
fileobj=buffer,
70+
bucket=self.bucket_name,
71+
key=str(path),
72+
),
73+
path=path,
74+
)
75+
)
76+
self.current_path_index += 1
77+
78+
def close(self):
79+
self.transfer_manager.shutdown()

fast_s3/file.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import io
2+
from pathlib import Path
3+
from typing import Union
4+
5+
from pydantic import BaseModel
6+
from s3transfer.futures import TransferFuture
7+
8+
9+
class File(BaseModel):
10+
buffer: io.BytesIO
11+
future: TransferFuture
12+
path: Union[str, Path]
13+
14+
class Config:
15+
arbitrary_types_allowed = True

fast_s3/transfer_manager.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import boto3
2+
import boto3.s3.transfer as s3transfer
3+
import botocore
4+
from s3transfer.manager import TransferManager
5+
6+
7+
def transfer_manager(
8+
endpoint_url: str,
9+
aws_access_key_id: str,
10+
aws_secret_access_key: str,
11+
region_name: str,
12+
n_workers=32,
13+
) -> TransferManager:
14+
session = boto3.Session(
15+
aws_access_key_id=aws_access_key_id,
16+
aws_secret_access_key=aws_secret_access_key,
17+
region_name=region_name,
18+
)
19+
client = session.client(
20+
"s3",
21+
endpoint_url=endpoint_url,
22+
config=botocore.config.Config(max_pool_connections=n_workers),
23+
)
24+
transfer_config = s3transfer.TransferConfig(
25+
use_threads=True, max_concurrency=n_workers
26+
)
27+
return s3transfer.create_transfer_manager(client, transfer_config)

fast_s3/uploader.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from pathlib import Path
2+
from typing import List, Tuple, Union
3+
4+
from s3transfer.futures import TransferFuture
5+
6+
from .transfer_manager import transfer_manager
7+
8+
9+
class Uploader:
10+
def __init__(
11+
self,
12+
endpoint_url: str,
13+
aws_access_key_id: str,
14+
aws_secret_access_key: str,
15+
region_name: str,
16+
bucket_name: str,
17+
n_workers=32,
18+
):
19+
self.transfer_manager = transfer_manager(
20+
endpoint_url=endpoint_url,
21+
aws_access_key_id=aws_access_key_id,
22+
aws_secret_access_key=aws_secret_access_key,
23+
region_name=region_name,
24+
n_workers=n_workers,
25+
)
26+
self.bucket_name = bucket_name
27+
self.futures: List[TransferFuture] = []
28+
29+
def upload_files(
30+
self,
31+
source: List[Union[str, bytes]],
32+
destination: List[Union[str, Path]],
33+
):
34+
if len(source) != len(destination):
35+
raise ValueError(
36+
"The number of source files and destination paths must be equal."
37+
)
38+
for file, path in zip(source, destination):
39+
self.futures.append(
40+
self.transfer_manager.upload(
41+
fileobj=file,
42+
bucket=self.bucket_name,
43+
key=str(path),
44+
)
45+
)
46+
47+
def await_futures(self):
48+
for future in self.futures:
49+
future.result()
50+
self.futures = []
51+
52+
def close(self):
53+
self.transfer_manager.shutdown()

notebooks/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Ignore everything in this directory
2+
*
3+
# Except this file
4+
!.gitignore

0 commit comments

Comments
 (0)