From cb27c62c54e358d85b119fa664581840cc7e8686 Mon Sep 17 00:00:00 2001 From: Ostap Zherebetskyi Date: Wed, 24 Jun 2026 13:36:49 +0300 Subject: [PATCH 1/2] Add pagination support to ZipStreamGenerator and update OSFStorageProvider for minimal metadata retrieval --- waterbutler/core/utils.py | 72 ++++++++++++++++++++ waterbutler/providers/osfstorage/provider.py | 63 ++++++++++++++++- 2 files changed, 133 insertions(+), 2 deletions(-) diff --git a/waterbutler/core/utils.py b/waterbutler/core/utils.py index 6c28abf75..80ebfd84a 100644 --- a/waterbutler/core/utils.py +++ b/waterbutler/core/utils.py @@ -223,6 +223,78 @@ async def __anext__(self): return path.path.replace(self.parent_path.path, '', 1), await self.provider.download(path) +class ZipStreamGeneratorPaginated: + + def __init__(self, provider, root_path, **kwargs): + self.provider = provider + self.parent_path = root_path.parent if root_path.is_file else root_path + self.root_path = root_path + self.kwargs = kwargs + + self.remaining = [] + self.initialized = False + + async def _initialize(self): + if self.initialized: + return + + self.initialized = True + + if self.root_path.is_file: + metadata = await self.provider.metadata( + self.root_path, + **self.kwargs, + ) + self.remaining.append((self.root_path.parent, metadata)) + return + + async for page in self.provider.iter_children_pages( + self.root_path, + **self.kwargs, + ): + + self.remaining.extend( + (self.root_path, item) + for item in page + ) + + async def __aiter__(self): + return self + + async def __anext__(self): + if not self.initialized: + await self._initialize() + + if not self.remaining: + raise StopAsyncIteration + + current = self.remaining.pop(0) + path = self.provider.path_from_metadata(*current) + + if path.is_dir: + async for page in self.provider.iter_children_pages( + path, + **self.kwargs, + ): + if not page: + return ( + path.path.replace(self.parent_path.path, '', 1), + EmptyStream(), + ) + + self.remaining.extend( + (path, item) + for item in page + ) + + return await self.__anext__() + + return ( + path.path.replace(self.parent_path.path, '', 1), + await self.provider.download(path), + ) + + class RequestHandlerContext: def __init__(self, request_coro): diff --git a/waterbutler/providers/osfstorage/provider.py b/waterbutler/providers/osfstorage/provider.py index 9244737ab..2820c490c 100644 --- a/waterbutler/providers/osfstorage/provider.py +++ b/waterbutler/providers/osfstorage/provider.py @@ -317,8 +317,13 @@ async def delete(self, path, confirm_delete=0, **kwargs): )).release() async def zip(self, path: wb_path.WaterButlerPath, **kwargs) -> asyncio.StreamReader: - # add query param 'minimal' to avoid unnecessary metadata in the response. - return await super().zip(path, **kwargs, minimal=True) + """Streams a Zip archive of the given folder + + :param path: ( :class:`.WaterButlerPath` ) The folder to compress + """ + kwargs['minimal'] = True + + return streams.ZipStreamReader(utils.ZipStreamGeneratorPaginated(self, path, **kwargs)) # type: ignore async def metadata(self, path, **kwargs): if path.identifier is None: @@ -519,6 +524,32 @@ async def copy(self, return meta_data, created + async def iter_children_pages(self, path, **kwargs): + url = self.build_url( + path.identifier, + 'children', + user_id=self.auth.get('id'), + minimal=kwargs.get('minimal'), + limit=kwargs.get('limit', 5), + orm=True + ) + + page, next_url = await self._fetch_metadata_page(url, path, **kwargs) + while True: + next_task = None + + if next_url: + next_task = asyncio.create_task( + self._fetch_metadata_page(next_url, path, **kwargs) + ) + + yield page + + if next_task is None: + break + + page, next_url = await next_task + # ========== private ========== async def _item_metadata(self, path, revision=None): @@ -545,6 +576,34 @@ async def _children_metadata(self, path, **kwargs): ret.append(OsfStorageFileMetadata(item, str(path.child(item['name'])))) return ret + async def _fetch_metadata_page(self, url, path, **kwargs): + resp = await self.make_signed_request( + 'GET', + url, + expects=(200,), + ) + resp_json = await resp.json() + + if resp_json: + next_url = self.build_url( + path.identifier, + 'children', + user_id=self.auth.get('id'), + minimal=kwargs.get('minimal'), + limit=kwargs.get('limit', 5), + after=resp_json[-1]['id'] if resp_json else None, + orm=True + ) + else: + return [], None + ret = [] + for item in resp_json: + if item['kind'] == 'folder': + ret.append(OsfStorageFolderMetadata(item, str(path.child(item['name'], folder=True)))) + else: + ret.append(OsfStorageFileMetadata(item, str(path.child(item['name'])))) + return ret, next_url + async def _delete_folder_contents(self, path, **kwargs): """Delete the contents of a folder. For use against provider root. From 2d5464e7fcd4b943587bd7e5a4b8c4240476c284 Mon Sep 17 00:00:00 2001 From: Ostap Zherebetskyi Date: Fri, 26 Jun 2026 13:07:13 +0300 Subject: [PATCH 2/2] Refactor OSFStorageProvider to enhance pagination handling and limit parameter for metadata retrieval --- waterbutler/providers/osfstorage/provider.py | 26 ++++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/waterbutler/providers/osfstorage/provider.py b/waterbutler/providers/osfstorage/provider.py index 2820c490c..360b18466 100644 --- a/waterbutler/providers/osfstorage/provider.py +++ b/waterbutler/providers/osfstorage/provider.py @@ -322,6 +322,7 @@ async def zip(self, path: wb_path.WaterButlerPath, **kwargs) -> asyncio.StreamRe :param path: ( :class:`.WaterButlerPath` ) The folder to compress """ kwargs['minimal'] = True + kwargs['limit'] = None return streams.ZipStreamReader(utils.ZipStreamGeneratorPaginated(self, path, **kwargs)) # type: ignore @@ -530,26 +531,17 @@ async def iter_children_pages(self, path, **kwargs): 'children', user_id=self.auth.get('id'), minimal=kwargs.get('minimal'), - limit=kwargs.get('limit', 5), + limit=kwargs.get('limit'), orm=True ) - page, next_url = await self._fetch_metadata_page(url, path, **kwargs) while True: - next_task = None - - if next_url: - next_task = asyncio.create_task( - self._fetch_metadata_page(next_url, path, **kwargs) - ) - + page, url = await self._fetch_metadata_page(url, path, **kwargs) yield page - if next_task is None: + if not url: break - page, next_url = await next_task - # ========== private ========== async def _item_metadata(self, path, revision=None): @@ -584,18 +576,20 @@ async def _fetch_metadata_page(self, url, path, **kwargs): ) resp_json = await resp.json() - if resp_json: + if kwargs.get('limit') and len(resp_json) < kwargs.get('limit'): + next_url = None + elif not kwargs.get('limit'): + next_url = None + else: next_url = self.build_url( path.identifier, 'children', user_id=self.auth.get('id'), minimal=kwargs.get('minimal'), - limit=kwargs.get('limit', 5), + limit=kwargs.get('limit'), after=resp_json[-1]['id'] if resp_json else None, orm=True ) - else: - return [], None ret = [] for item in resp_json: if item['kind'] == 'folder':