diff --git a/waterbutler/core/utils.py b/waterbutler/core/utils.py index 6c28abf75..80ebfd84a 100644 --- a/waterbutler/core/utils.py +++ b/waterbutler/core/utils.py @@ -223,6 +223,78 @@ async def __anext__(self): return path.path.replace(self.parent_path.path, '', 1), await self.provider.download(path) +class ZipStreamGeneratorPaginated: + + def __init__(self, provider, root_path, **kwargs): + self.provider = provider + self.parent_path = root_path.parent if root_path.is_file else root_path + self.root_path = root_path + self.kwargs = kwargs + + self.remaining = [] + self.initialized = False + + async def _initialize(self): + if self.initialized: + return + + self.initialized = True + + if self.root_path.is_file: + metadata = await self.provider.metadata( + self.root_path, + **self.kwargs, + ) + self.remaining.append((self.root_path.parent, metadata)) + return + + async for page in self.provider.iter_children_pages( + self.root_path, + **self.kwargs, + ): + + self.remaining.extend( + (self.root_path, item) + for item in page + ) + + async def __aiter__(self): + return self + + async def __anext__(self): + if not self.initialized: + await self._initialize() + + if not self.remaining: + raise StopAsyncIteration + + current = self.remaining.pop(0) + path = self.provider.path_from_metadata(*current) + + if path.is_dir: + async for page in self.provider.iter_children_pages( + path, + **self.kwargs, + ): + if not page: + return ( + path.path.replace(self.parent_path.path, '', 1), + EmptyStream(), + ) + + self.remaining.extend( + (path, item) + for item in page + ) + + return await self.__anext__() + + return ( + path.path.replace(self.parent_path.path, '', 1), + await self.provider.download(path), + ) + + class RequestHandlerContext: def __init__(self, request_coro): diff --git a/waterbutler/providers/osfstorage/provider.py b/waterbutler/providers/osfstorage/provider.py index 9244737ab..360b18466 100644 --- a/waterbutler/providers/osfstorage/provider.py +++ b/waterbutler/providers/osfstorage/provider.py @@ -317,8 +317,14 @@ async def delete(self, path, confirm_delete=0, **kwargs): )).release() async def zip(self, path: wb_path.WaterButlerPath, **kwargs) -> asyncio.StreamReader: - # add query param 'minimal' to avoid unnecessary metadata in the response. - return await super().zip(path, **kwargs, minimal=True) + """Streams a Zip archive of the given folder + + :param path: ( :class:`.WaterButlerPath` ) The folder to compress + """ + kwargs['minimal'] = True + kwargs['limit'] = None + + return streams.ZipStreamReader(utils.ZipStreamGeneratorPaginated(self, path, **kwargs)) # type: ignore async def metadata(self, path, **kwargs): if path.identifier is None: @@ -519,6 +525,23 @@ async def copy(self, return meta_data, created + async def iter_children_pages(self, path, **kwargs): + url = self.build_url( + path.identifier, + 'children', + user_id=self.auth.get('id'), + minimal=kwargs.get('minimal'), + limit=kwargs.get('limit'), + orm=True + ) + + while True: + page, url = await self._fetch_metadata_page(url, path, **kwargs) + yield page + + if not url: + break + # ========== private ========== async def _item_metadata(self, path, revision=None): @@ -545,6 +568,36 @@ async def _children_metadata(self, path, **kwargs): ret.append(OsfStorageFileMetadata(item, str(path.child(item['name'])))) return ret + async def _fetch_metadata_page(self, url, path, **kwargs): + resp = await self.make_signed_request( + 'GET', + url, + expects=(200,), + ) + resp_json = await resp.json() + + if kwargs.get('limit') and len(resp_json) < kwargs.get('limit'): + next_url = None + elif not kwargs.get('limit'): + next_url = None + else: + next_url = self.build_url( + path.identifier, + 'children', + user_id=self.auth.get('id'), + minimal=kwargs.get('minimal'), + limit=kwargs.get('limit'), + after=resp_json[-1]['id'] if resp_json else None, + orm=True + ) + ret = [] + for item in resp_json: + if item['kind'] == 'folder': + ret.append(OsfStorageFolderMetadata(item, str(path.child(item['name'], folder=True)))) + else: + ret.append(OsfStorageFileMetadata(item, str(path.child(item['name'])))) + return ret, next_url + async def _delete_folder_contents(self, path, **kwargs): """Delete the contents of a folder. For use against provider root.