Skip to content

Add s3 upload/download progress reporting callable support#1

Open
MHendricks wants to merge 1 commit into
masterfrom
mikeh/transfer_progress
Open

Add s3 upload/download progress reporting callable support#1
MHendricks wants to merge 1 commit into
masterfrom
mikeh/transfer_progress

Conversation

@MHendricks
Copy link
Copy Markdown

Example using rich to show download progress of multiple files at once.

import concurrent.futures
from functools import partial
from itertools import islice
from cloudpathlib import CloudPath, S3Client
from rich.progress import BarColumn, DownloadColumn, Progress, TransferSpeedColumn, TextColumn

callback_state: dict[CloudPath, dict] = {}


def progress_callback(progress, direction, state, cloud_path, bytes_sent):
    if state == "start":
        progress.start()
        size = cloud_path.stat().st_size
        task_id = progress.add_task("Downloading", total=size, filename=cloud_path.name)
        callback_state[cloud_path] = dict(task_id=task_id, size=size)
    elif state == "stop":
        if cloud_path in callback_state:
            del callback_state[cloud_path]
    else:
        info = callback_state[cloud_path]
        progress.update(info["task_id"], advance=bytes_sent)


def download_url(cloud_path):
    # Trigger download of non-cached files
    cloud_path.fspath


def download(urls) -> None:
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = {}
        for url in urls:
            future_to_url[executor.submit(download_url, url)] = url

        for future in concurrent.futures.as_completed(future_to_url):
            try:
                future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (future_to_url[future], exc))
        progress.stop()


if __name__ == "__main__":
    # Set up the progress bar and attach the callback to the S3Client
    progress = Progress(
        TextColumn("[blue]{task.fields[filename]}"),
        BarColumn(),
        DownloadColumn(),
        TransferSpeedColumn(),
    )
    client = S3Client(transfer_callable=partial(progress_callback, progress))

    # Download the first first 5 files from this resource.
    # ladi = CloudPath("s3://ladi/Images/FEMA_CAP/2020/70349", client=client)
    ladi = CloudPath("s3://blur-package-distribution-us/hab_distros", client=client)
    download(islice(ladi.iterdir(), 5))

Closes #ISSUE


Contributor checklist:

  • I have read and understood CONTRIBUTING.md
  • Confirmed an issue exists for the PR, and the text Closes #issue appears in the PR summary (e.g., Closes #123).
  • Confirmed PR is rebased onto the latest base
  • Confirmed failure before change and success after change
  • Any generic new functionality is replicated across cloud providers if necessary
  • Tested manually against live server backend for at least one provider
  • Added tests for any new functionality
  • Linting passes locally
  • Tests pass locally
  • Updated HISTORY.md with the issue that is addressed and the PR you are submitting. If the top section is not `## UNRELEASED``, then you need to add a new section to the top of the document for your change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant