diff --git a/waterbutler/core/provider.py b/waterbutler/core/provider.py index 70cdf9185..2c6afea9e 100644 --- a/waterbutler/core/provider.py +++ b/waterbutler/core/provider.py @@ -19,7 +19,7 @@ from waterbutler.core.metrics import MetricsRecord from waterbutler.core import metadata as wb_metadata from waterbutler.core.cache import CacheableMetadataProviderProxy -from waterbutler.core.utils import ZipStreamGenerator +from waterbutler.core.utils import ZipStreamGenerator, ZipStreamGeneratorPaginated from waterbutler.core.utils import RequestHandlerContext @@ -679,17 +679,17 @@ async def revalidate_path(self, """ return base.child(path, folder=folder) - async def zip(self, path: wb_path.WaterButlerPath, use_cache: bool = True, **kwargs) -> asyncio.StreamReader: + async def zip(self, path: wb_path.WaterButlerPath, use_cache: bool = True, paginated: bool = False, **kwargs) -> asyncio.StreamReader: """Streams a Zip archive of the given folder :param path: ( :class:`.WaterButlerPath` ) The folder to compress - :param use_cache: ( `bool` ) Whether to prefetch metadata requests for nested folders during zip + :param use_cache: ( :class:`bool` ) Whether to prefetch metadata requests for nested folders during zip, + does not work for paginated metadata requests + :param paginated: ( :class:`bool` ) Whether to use the paginated zip stream generator """ provider = CacheableMetadataProviderProxy(self) if use_cache else self - - return streams.ZipStreamReader( - await ZipStreamGenerator.create(provider, path, **kwargs) - ) # type: ignore + generator = ZipStreamGeneratorPaginated if paginated else ZipStreamGenerator + return streams.ZipStreamReader(generator(provider, path, **kwargs)) # type: ignore def shares_storage_root(self, other: 'BaseProvider') -> bool: """Returns True if ``self`` and ``other`` both point to the same storage root. Used to @@ -763,6 +763,20 @@ async def metadata(self, path: wb_path.WaterButlerPath, **kwargs) \ """ raise NotImplementedError + async def iter_children_pages(self, path: wb_path.WaterButlerPath, **kwargs): + """Yield a folder's children metadata one page at a time. + + The default implementation yields a single page containing the entire folder + listing, since most providers return all children in one request. Providers + with server-side pagination should override this to lazily yield successive + pages, so callers (e.g. the zip stream generators) don't hold an entire large + folder listing in memory at once. + + :param path: ( :class:`.WaterButlerPath` ) The folder to list + :param kwargs: ( :class:`dict` ) Arguments to be parsed by child classes + """ + yield await self.metadata(path, **kwargs) + @abc.abstractmethod async def validate_v1_path(self, path: str, **kwargs) -> wb_path.WaterButlerPath: """API v1 requires that requests against folder endpoints always end with a slash, and diff --git a/waterbutler/core/utils.py b/waterbutler/core/utils.py index 6a42af3ae..d319d5b84 100644 --- a/waterbutler/core/utils.py +++ b/waterbutler/core/utils.py @@ -1,4 +1,5 @@ import re +import abc import json import pytz import asyncio @@ -192,29 +193,49 @@ def make_disposition(filename): encoded_filename) -class ZipStreamGenerator: +class BaseZipStreamGenerator(abc.ABC): - def __init__(self, provider, parent_path, *metadata_objs, **kwargs): + def __init__(self, provider, root_path, **kwargs): self.provider = provider - self.parent_path = parent_path - self.remaining = [ - (parent_path, metadata) - for metadata in metadata_objs - ] + self.root_path = root_path + self.parent_path = root_path.parent if root_path.is_file else root_path self.kwargs = kwargs - @classmethod - async def create(cls, provider, path, **kwargs): - meta_data = await provider.metadata(path, **kwargs) - if path.is_file: - meta_data = [meta_data] - path = path.parent - return cls(provider, path, *meta_data, **kwargs) + self.remaining = [] + self.initialized = False async def __aiter__(self): return self + @abc.abstractmethod + async def _initialize(self): + raise NotImplementedError + + @abc.abstractmethod + async def __anext__(self): + raise NotImplementedError + + +class ZipStreamGenerator(BaseZipStreamGenerator): + + async def _initialize(self): + if self.initialized: + return + + if self.root_path.is_file: + metadata = await self.provider.metadata(self.root_path, **self.kwargs) + self.remaining.append((self.root_path.parent, metadata)) + return + + items = await self.provider.metadata(self.root_path, **self.kwargs) + self.remaining.extend((self.root_path, item) for item in items) + + self.initialized = True + async def __anext__(self): + if not self.initialized: + await self._initialize() + if not self.remaining: raise StopAsyncIteration current = self.remaining.pop(0) @@ -232,23 +253,12 @@ async def __anext__(self): return path.path.replace(self.parent_path.path, '', 1), await self.provider.download(path) -class ZipStreamGeneratorPaginated: - - def __init__(self, provider, root_path, **kwargs): - self.provider = provider - self.parent_path = root_path.parent if root_path.is_file else root_path - self.root_path = root_path - self.kwargs = kwargs - - self.remaining = [] - self.initialized = False +class ZipStreamGeneratorPaginated(BaseZipStreamGenerator): async def _initialize(self): if self.initialized: return - self.initialized = True - if self.root_path.is_file: metadata = await self.provider.metadata( self.root_path, @@ -261,14 +271,12 @@ async def _initialize(self): self.root_path, **self.kwargs, ): - self.remaining.extend( (self.root_path, item) for item in page ) - async def __aiter__(self): - return self + self.initialized = True async def __anext__(self): if not self.initialized: diff --git a/waterbutler/providers/osfstorage/provider.py b/waterbutler/providers/osfstorage/provider.py index 360b18466..77b6e1215 100644 --- a/waterbutler/providers/osfstorage/provider.py +++ b/waterbutler/providers/osfstorage/provider.py @@ -321,10 +321,7 @@ async def zip(self, path: wb_path.WaterButlerPath, **kwargs) -> asyncio.StreamRe :param path: ( :class:`.WaterButlerPath` ) The folder to compress """ - kwargs['minimal'] = True - kwargs['limit'] = None - - return streams.ZipStreamReader(utils.ZipStreamGeneratorPaginated(self, path, **kwargs)) # type: ignore + return await super().zip(path, **kwargs, minimal=True) async def metadata(self, path, **kwargs): if path.identifier is None: @@ -526,22 +523,18 @@ async def copy(self, return meta_data, created async def iter_children_pages(self, path, **kwargs): - url = self.build_url( - path.identifier, - 'children', - user_id=self.auth.get('id'), - minimal=kwargs.get('minimal'), - limit=kwargs.get('limit'), - orm=True - ) + limit = kwargs.pop('limit', None) + after = None while True: - page, url = await self._fetch_metadata_page(url, path, **kwargs) + page = await self.metadata(path, limit=limit, after=after, **kwargs) yield page - if not url: + if not limit or len(page) < int(limit): break + after = page[-1].raw['id'] + # ========== private ========== async def _item_metadata(self, path, revision=None): @@ -552,10 +545,20 @@ async def _item_metadata(self, path, revision=None): ) return OsfStorageFileMetadata((await resp.json()), str(path)) - async def _children_metadata(self, path, **kwargs): + async def _children_metadata(self, path, limit=None, after=None, **kwargs): + query = { + 'user_id': self.auth.get('id'), + 'minimal': kwargs.get('minimal', False), + } + if limit is not None: + query['orm'] = True + query['limit'] = limit + if after is not None: + query['after'] = after + resp = await self.make_signed_request( 'GET', - self.build_url(path.identifier, 'children', user_id=self.auth.get('id'), minimal=kwargs.get('minimal', False)), + self.build_url(path.identifier, 'children', **query), expects=(200, ) ) resp_json = await resp.json() @@ -568,36 +571,6 @@ async def _children_metadata(self, path, **kwargs): ret.append(OsfStorageFileMetadata(item, str(path.child(item['name'])))) return ret - async def _fetch_metadata_page(self, url, path, **kwargs): - resp = await self.make_signed_request( - 'GET', - url, - expects=(200,), - ) - resp_json = await resp.json() - - if kwargs.get('limit') and len(resp_json) < kwargs.get('limit'): - next_url = None - elif not kwargs.get('limit'): - next_url = None - else: - next_url = self.build_url( - path.identifier, - 'children', - user_id=self.auth.get('id'), - minimal=kwargs.get('minimal'), - limit=kwargs.get('limit'), - after=resp_json[-1]['id'] if resp_json else None, - orm=True - ) - ret = [] - for item in resp_json: - if item['kind'] == 'folder': - ret.append(OsfStorageFolderMetadata(item, str(path.child(item['name'], folder=True)))) - else: - ret.append(OsfStorageFileMetadata(item, str(path.child(item['name'])))) - return ret, next_url - async def _delete_folder_contents(self, path, **kwargs): """Delete the contents of a folder. For use against provider root. diff --git a/waterbutler/server/api/v1/provider/metadata.py b/waterbutler/server/api/v1/provider/metadata.py index befe3352d..4c376922e 100644 --- a/waterbutler/server/api/v1/provider/metadata.py +++ b/waterbutler/server/api/v1/provider/metadata.py @@ -131,6 +131,6 @@ async def download_folder_as_zip(self): self.set_header('Content-Type', 'application/zip') self.set_header('Content-Disposition', make_disposition(zipfile_name + '.zip')) - result = await self.provider.zip(self.path) + result = await self.provider.zip(self.path, **self.arguments) await self.write_stream(result)