From 4bc456b9a9ac461ba02dfe34c0cae60ed934cf19 Mon Sep 17 00:00:00 2001 From: Anton Krytskyi Date: Tue, 23 Jun 2026 20:08:36 +0300 Subject: [PATCH 1/4] add cache to metadata calls for ZipStreamGenerator --- waterbutler/core/provider.py | 10 ++----- waterbutler/core/utils.py | 57 ++++++++++++++++++++++++++++-------- 2 files changed, 48 insertions(+), 19 deletions(-) diff --git a/waterbutler/core/provider.py b/waterbutler/core/provider.py index 9e5101063..5f42f6cda 100644 --- a/waterbutler/core/provider.py +++ b/waterbutler/core/provider.py @@ -683,13 +683,9 @@ async def zip(self, path: wb_path.WaterButlerPath, **kwargs) -> asyncio.StreamRe :param path: ( :class:`.WaterButlerPath` ) The folder to compress """ - - meta_data = await self.metadata(path, **kwargs) # type: ignore - if path.is_file: - meta_data = [meta_data] # type: ignore - path = path.parent - - return streams.ZipStreamReader(ZipStreamGenerator(self, path, *meta_data, **kwargs)) # type: ignore + return streams.ZipStreamReader( + await ZipStreamGenerator.create(self, path, **kwargs) + ) # type: ignore def shares_storage_root(self, other: 'BaseProvider') -> bool: """Returns True if ``self`` and ``other`` both point to the same storage root. Used to diff --git a/waterbutler/core/utils.py b/waterbutler/core/utils.py index 6c28abf75..7b04167fa 100644 --- a/waterbutler/core/utils.py +++ b/waterbutler/core/utils.py @@ -193,14 +193,50 @@ def make_disposition(filename): class ZipStreamGenerator: - def __init__(self, provider, parent_path, *metadata_objs, **kwargs): + # TODO: add docs + + def __init__(self, provider, parent_path, **kwargs): self.provider = provider self.parent_path = parent_path - self.remaining = [ - (parent_path, metadata) - for metadata in metadata_objs - ] + self.remaining = [] self.kwargs = kwargs + self.__cache = dict[str, asyncio.Task]() + + @classmethod + async def create(cls, provider, path, **kwargs): + path = path.parent if (path.is_file and path.parent is not None) else path + self = cls( + provider, + path, + **kwargs + ) + self.remaining = await self.list_metadata(path, **kwargs) + + return self + + def metadata_task(self, path, **kwargs): + key = path.identifier or str(path) + if key not in self.__cache: + self.__cache[key] = asyncio.create_task( + self.provider.metadata(path, **kwargs) + ) + return self.__cache[key] + + async def list_metadata(self, path, **kwargs): + if path.is_file: + return [(path.parent, await self.provider.metadata(path, **kwargs))] + + items = await self.metadata_task(path, **kwargs) + + items = await self.metadata_task(path, **kwargs) + remaining = [] + for item in items: + remaining.append((path, item)) + if item.is_folder: + child = self.provider.path_from_metadata(path, item) + self.metadata_task(child, **kwargs) + + return remaining async def __aiter__(self): return self @@ -211,14 +247,11 @@ async def __anext__(self): current = self.remaining.pop(0) path = self.provider.path_from_metadata(*current) if path.is_dir: - items = await self.provider.metadata(path, **self.kwargs) - if items: - self.remaining.extend([ - (path, item) for item in items - ]) + entries = await self.list_metadata(path, **self.kwargs) + if entries: + self.remaining.extend(entries) return await self.__anext__() - else: - return path.path.replace(self.parent_path.path, '', 1), EmptyStream() + return path.path.replace(self.parent_path.path, '', 1), EmptyStream() return path.path.replace(self.parent_path.path, '', 1), await self.provider.download(path) From 745473fbdc455f4f9f289ba8e75e18f5bcd3c3bc Mon Sep 17 00:00:00 2001 From: Anton Krytskyi Date: Thu, 25 Jun 2026 14:56:43 +0300 Subject: [PATCH 2/4] implement cache as provider proxy; revert zip generator --- waterbutler/core/cache.py | 45 +++++++++++++++++++++++++++++ waterbutler/core/provider.py | 7 +++-- waterbutler/core/utils.py | 56 +++++++++++------------------------- 3 files changed, 66 insertions(+), 42 deletions(-) create mode 100644 waterbutler/core/cache.py diff --git a/waterbutler/core/cache.py b/waterbutler/core/cache.py new file mode 100644 index 000000000..397e74d0f --- /dev/null +++ b/waterbutler/core/cache.py @@ -0,0 +1,45 @@ +import asyncio + +from waterbutler.core.provider import BaseProvider + + +class CacheableMetadataProviderProxy: + """Wraps a provider so that folder ``metadata`` requests are cached and the + metadata of any child folders is prefetched concurrently. + + When a folder is listed, a metadata task is scheduled for each of its child + folders without awaiting them. Those tasks run on the event loop while files + are being downloaded, so the generator can issue all of its metadata requests + up front instead of fetching folder listings strictly one-at-a-time. + + Any attribute that is not overridden here (``download``, ``path_from_metadata``, + etc.) is delegated to the wrapped provider. + """ + + def __init__(self, provider: BaseProvider): + self.provider = provider + self.__cache = dict[str, asyncio.Task]() + + def metadata_task(self, path, **kwargs): + key = path.identifier or str(path) + if key not in self.__cache: + self.__cache[key] = asyncio.create_task( + self.provider.metadata(path, **kwargs) + ) + return self.__cache[key] + + async def metadata(self, path, **kwargs): + if path.is_file: + return await self.provider.metadata(path, **kwargs) + + items = await self.metadata_task(path, **kwargs) + + for item in items: + if item.is_folder: + child = self.provider.path_from_metadata(path, item) + self.metadata_task(child, **kwargs) + + return items + + def __getattr__(self, name): + return getattr(self.provider, name) diff --git a/waterbutler/core/provider.py b/waterbutler/core/provider.py index 5f42f6cda..57eece3a7 100644 --- a/waterbutler/core/provider.py +++ b/waterbutler/core/provider.py @@ -18,6 +18,7 @@ from waterbutler import settings as wb_settings from waterbutler.core.metrics import MetricsRecord from waterbutler.core import metadata as wb_metadata +from waterbutler.core.cache import CacheableMetadataProviderProxy from waterbutler.core.utils import ZipStreamGenerator from waterbutler.core.utils import RequestHandlerContext @@ -678,13 +679,15 @@ async def revalidate_path(self, """ return base.child(path, folder=folder) - async def zip(self, path: wb_path.WaterButlerPath, **kwargs) -> asyncio.StreamReader: + async def zip(self, path: wb_path.WaterButlerPath, cache: bool = True, **kwargs) -> asyncio.StreamReader: """Streams a Zip archive of the given folder :param path: ( :class:`.WaterButlerPath` ) The folder to compress """ + provider = CacheableMetadataProviderProxy(self) if cache else self + return streams.ZipStreamReader( - await ZipStreamGenerator.create(self, path, **kwargs) + await ZipStreamGenerator.create(provider, path, **kwargs) ) # type: ignore def shares_storage_root(self, other: 'BaseProvider') -> bool: diff --git a/waterbutler/core/utils.py b/waterbutler/core/utils.py index 7b04167fa..512e95e2c 100644 --- a/waterbutler/core/utils.py +++ b/waterbutler/core/utils.py @@ -193,50 +193,23 @@ def make_disposition(filename): class ZipStreamGenerator: - # TODO: add docs - def __init__(self, provider, parent_path, **kwargs): + def __init__(self, provider, parent_path, *metadata_objs, **kwargs): self.provider = provider self.parent_path = parent_path - self.remaining = [] + self.remaining = [ + (parent_path, metadata) + for metadata in metadata_objs + ] self.kwargs = kwargs - self.__cache = dict[str, asyncio.Task]() @classmethod async def create(cls, provider, path, **kwargs): - path = path.parent if (path.is_file and path.parent is not None) else path - self = cls( - provider, - path, - **kwargs - ) - self.remaining = await self.list_metadata(path, **kwargs) - - return self - - def metadata_task(self, path, **kwargs): - key = path.identifier or str(path) - if key not in self.__cache: - self.__cache[key] = asyncio.create_task( - self.provider.metadata(path, **kwargs) - ) - return self.__cache[key] - - async def list_metadata(self, path, **kwargs): + meta_data = await provider.metadata(path, **kwargs) if path.is_file: - return [(path.parent, await self.provider.metadata(path, **kwargs))] - - items = await self.metadata_task(path, **kwargs) - - items = await self.metadata_task(path, **kwargs) - remaining = [] - for item in items: - remaining.append((path, item)) - if item.is_folder: - child = self.provider.path_from_metadata(path, item) - self.metadata_task(child, **kwargs) - - return remaining + meta_data = [meta_data] + path = path.parent + return cls(provider, path, *meta_data, **kwargs) async def __aiter__(self): return self @@ -247,11 +220,14 @@ async def __anext__(self): current = self.remaining.pop(0) path = self.provider.path_from_metadata(*current) if path.is_dir: - entries = await self.list_metadata(path, **self.kwargs) - if entries: - self.remaining.extend(entries) + items = await self.provider.metadata(path, **self.kwargs) + if items: + self.remaining.extend([ + (path, item) for item in items + ]) return await self.__anext__() - return path.path.replace(self.parent_path.path, '', 1), EmptyStream() + else: + return path.path.replace(self.parent_path.path, '', 1), EmptyStream() return path.path.replace(self.parent_path.path, '', 1), await self.provider.download(path) From 8babf2329ee8b7f32a83e8dfebeb9f666704235e Mon Sep 17 00:00:00 2001 From: Anton Krytskyi Date: Thu, 25 Jun 2026 15:33:02 +0300 Subject: [PATCH 3/4] fix circular import error --- waterbutler/core/cache.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/waterbutler/core/cache.py b/waterbutler/core/cache.py index 397e74d0f..59e71cf87 100644 --- a/waterbutler/core/cache.py +++ b/waterbutler/core/cache.py @@ -1,14 +1,12 @@ import asyncio -from waterbutler.core.provider import BaseProvider - class CacheableMetadataProviderProxy: """Wraps a provider so that folder ``metadata`` requests are cached and the metadata of any child folders is prefetched concurrently. When a folder is listed, a metadata task is scheduled for each of its child - folders without awaiting them. Those tasks run on the event loop while files + folders without awaiting them. Those tasks run on the event loop while files are being downloaded, so the generator can issue all of its metadata requests up front instead of fetching folder listings strictly one-at-a-time. @@ -16,7 +14,7 @@ class CacheableMetadataProviderProxy: etc.) is delegated to the wrapped provider. """ - def __init__(self, provider: BaseProvider): + def __init__(self, provider): self.provider = provider self.__cache = dict[str, asyncio.Task]() From a06d0bab43441979882c15ff400a2aa94dfa3ee5 Mon Sep 17 00:00:00 2001 From: Longze Chen Date: Mon, 29 Jun 2026 09:50:34 -0400 Subject: [PATCH 4/4] Apply suggestions from code review Minor code style update + docstring Co-authored-by: Longze Chen --- waterbutler/core/provider.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/waterbutler/core/provider.py b/waterbutler/core/provider.py index 57eece3a7..70cdf9185 100644 --- a/waterbutler/core/provider.py +++ b/waterbutler/core/provider.py @@ -679,12 +679,13 @@ async def revalidate_path(self, """ return base.child(path, folder=folder) - async def zip(self, path: wb_path.WaterButlerPath, cache: bool = True, **kwargs) -> asyncio.StreamReader: + async def zip(self, path: wb_path.WaterButlerPath, use_cache: bool = True, **kwargs) -> asyncio.StreamReader: """Streams a Zip archive of the given folder :param path: ( :class:`.WaterButlerPath` ) The folder to compress + :param use_cache: ( `bool` ) Whether to prefetch metadata requests for nested folders during zip """ - provider = CacheableMetadataProviderProxy(self) if cache else self + provider = CacheableMetadataProviderProxy(self) if use_cache else self return streams.ZipStreamReader( await ZipStreamGenerator.create(provider, path, **kwargs)