Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions waterbutler/core/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from waterbutler.core.metrics import MetricsRecord
from waterbutler.core import metadata as wb_metadata
from waterbutler.core.cache import CacheableMetadataProviderProxy
from waterbutler.core.utils import ZipStreamGenerator
from waterbutler.core.utils import ZipStreamGenerator, ZipStreamGeneratorPaginated
from waterbutler.core.utils import RequestHandlerContext


Expand Down Expand Up @@ -679,17 +679,17 @@ async def revalidate_path(self,
"""
return base.child(path, folder=folder)

async def zip(self, path: wb_path.WaterButlerPath, use_cache: bool = True, **kwargs) -> asyncio.StreamReader:
async def zip(self, path: wb_path.WaterButlerPath, use_cache: bool = True, paginated: bool = False, **kwargs) -> asyncio.StreamReader:
"""Streams a Zip archive of the given folder

:param path: ( :class:`.WaterButlerPath` ) The folder to compress
:param use_cache: ( `bool` ) Whether to prefetch metadata requests for nested folders during zip
:param use_cache: ( :class:`bool` ) Whether to prefetch metadata requests for nested folders during zip,
does not work for paginated metadata requests
:param paginated: ( :class:`bool` ) Whether to use the paginated zip stream generator
"""
provider = CacheableMetadataProviderProxy(self) if use_cache else self

return streams.ZipStreamReader(
await ZipStreamGenerator.create(provider, path, **kwargs)
) # type: ignore
generator = ZipStreamGeneratorPaginated if paginated else ZipStreamGenerator
return streams.ZipStreamReader(generator(provider, path, **kwargs)) # type: ignore

def shares_storage_root(self, other: 'BaseProvider') -> bool:
"""Returns True if ``self`` and ``other`` both point to the same storage root. Used to
Expand Down Expand Up @@ -763,6 +763,20 @@ async def metadata(self, path: wb_path.WaterButlerPath, **kwargs) \
"""
raise NotImplementedError

async def iter_children_pages(self, path: wb_path.WaterButlerPath, **kwargs):
"""Yield a folder's children metadata one page at a time.

The default implementation yields a single page containing the entire folder
listing, since most providers return all children in one request. Providers
with server-side pagination should override this to lazily yield successive
pages, so callers (e.g. the zip stream generators) don't hold an entire large
folder listing in memory at once.

:param path: ( :class:`.WaterButlerPath` ) The folder to list
:param kwargs: ( :class:`dict` ) Arguments to be parsed by child classes
"""
yield await self.metadata(path, **kwargs)

@abc.abstractmethod
async def validate_v1_path(self, path: str, **kwargs) -> wb_path.WaterButlerPath:
"""API v1 requires that requests against folder endpoints always end with a slash, and
Expand Down
66 changes: 37 additions & 29 deletions waterbutler/core/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
import abc
import json
import pytz
import asyncio
Expand Down Expand Up @@ -192,29 +193,49 @@ def make_disposition(filename):
encoded_filename)


class ZipStreamGenerator:
class BaseZipStreamGenerator(abc.ABC):

def __init__(self, provider, parent_path, *metadata_objs, **kwargs):
def __init__(self, provider, root_path, **kwargs):
self.provider = provider
self.parent_path = parent_path
self.remaining = [
(parent_path, metadata)
for metadata in metadata_objs
]
self.root_path = root_path
self.parent_path = root_path.parent if root_path.is_file else root_path
self.kwargs = kwargs

@classmethod
async def create(cls, provider, path, **kwargs):
meta_data = await provider.metadata(path, **kwargs)
if path.is_file:
meta_data = [meta_data]
path = path.parent
return cls(provider, path, *meta_data, **kwargs)
self.remaining = []
self.initialized = False

async def __aiter__(self):
return self

@abc.abstractmethod
async def _initialize(self):
raise NotImplementedError

@abc.abstractmethod
async def __anext__(self):
raise NotImplementedError


class ZipStreamGenerator(BaseZipStreamGenerator):

async def _initialize(self):
if self.initialized:
return

if self.root_path.is_file:
metadata = await self.provider.metadata(self.root_path, **self.kwargs)
self.remaining.append((self.root_path.parent, metadata))
return

items = await self.provider.metadata(self.root_path, **self.kwargs)
self.remaining.extend((self.root_path, item) for item in items)

self.initialized = True

async def __anext__(self):
if not self.initialized:
await self._initialize()

if not self.remaining:
raise StopAsyncIteration
current = self.remaining.pop(0)
Expand All @@ -232,23 +253,12 @@ async def __anext__(self):
return path.path.replace(self.parent_path.path, '', 1), await self.provider.download(path)


class ZipStreamGeneratorPaginated:

def __init__(self, provider, root_path, **kwargs):
self.provider = provider
self.parent_path = root_path.parent if root_path.is_file else root_path
self.root_path = root_path
self.kwargs = kwargs

self.remaining = []
self.initialized = False
class ZipStreamGeneratorPaginated(BaseZipStreamGenerator):

async def _initialize(self):
if self.initialized:
return

self.initialized = True

if self.root_path.is_file:
metadata = await self.provider.metadata(
self.root_path,
Expand All @@ -261,14 +271,12 @@ async def _initialize(self):
self.root_path,
**self.kwargs,
):

self.remaining.extend(
(self.root_path, item)
for item in page
)

async def __aiter__(self):
return self
self.initialized = True

async def __anext__(self):
if not self.initialized:
Expand Down
65 changes: 19 additions & 46 deletions waterbutler/providers/osfstorage/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,7 @@ async def zip(self, path: wb_path.WaterButlerPath, **kwargs) -> asyncio.StreamRe

:param path: ( :class:`.WaterButlerPath` ) The folder to compress
"""
kwargs['minimal'] = True
kwargs['limit'] = None

return streams.ZipStreamReader(utils.ZipStreamGeneratorPaginated(self, path, **kwargs)) # type: ignore
return await super().zip(path, **kwargs, minimal=True)

async def metadata(self, path, **kwargs):
if path.identifier is None:
Expand Down Expand Up @@ -526,22 +523,18 @@ async def copy(self,
return meta_data, created

async def iter_children_pages(self, path, **kwargs):
url = self.build_url(
path.identifier,
'children',
user_id=self.auth.get('id'),
minimal=kwargs.get('minimal'),
limit=kwargs.get('limit'),
orm=True
)
limit = kwargs.pop('limit', None)
after = None

while True:
page, url = await self._fetch_metadata_page(url, path, **kwargs)
page = await self.metadata(path, limit=limit, after=after, **kwargs)
yield page

if not url:
if not limit or len(page) < int(limit):
break

after = page[-1].raw['id']

# ========== private ==========

async def _item_metadata(self, path, revision=None):
Expand All @@ -552,10 +545,20 @@ async def _item_metadata(self, path, revision=None):
)
return OsfStorageFileMetadata((await resp.json()), str(path))

async def _children_metadata(self, path, **kwargs):
async def _children_metadata(self, path, limit=None, after=None, **kwargs):
query = {
'user_id': self.auth.get('id'),
'minimal': kwargs.get('minimal', False),
}
if limit is not None:
query['orm'] = True
query['limit'] = limit
if after is not None:
query['after'] = after

resp = await self.make_signed_request(
'GET',
self.build_url(path.identifier, 'children', user_id=self.auth.get('id'), minimal=kwargs.get('minimal', False)),
self.build_url(path.identifier, 'children', **query),
expects=(200, )
)
resp_json = await resp.json()
Expand All @@ -568,36 +571,6 @@ async def _children_metadata(self, path, **kwargs):
ret.append(OsfStorageFileMetadata(item, str(path.child(item['name']))))
return ret

async def _fetch_metadata_page(self, url, path, **kwargs):
resp = await self.make_signed_request(
'GET',
url,
expects=(200,),
)
resp_json = await resp.json()

if kwargs.get('limit') and len(resp_json) < kwargs.get('limit'):
next_url = None
elif not kwargs.get('limit'):
next_url = None
else:
next_url = self.build_url(
path.identifier,
'children',
user_id=self.auth.get('id'),
minimal=kwargs.get('minimal'),
limit=kwargs.get('limit'),
after=resp_json[-1]['id'] if resp_json else None,
orm=True
)
ret = []
for item in resp_json:
if item['kind'] == 'folder':
ret.append(OsfStorageFolderMetadata(item, str(path.child(item['name'], folder=True))))
else:
ret.append(OsfStorageFileMetadata(item, str(path.child(item['name']))))
return ret, next_url

async def _delete_folder_contents(self, path, **kwargs):
"""Delete the contents of a folder. For use against provider root.

Expand Down
2 changes: 1 addition & 1 deletion waterbutler/server/api/v1/provider/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,6 @@ async def download_folder_as_zip(self):
self.set_header('Content-Type', 'application/zip')
self.set_header('Content-Disposition', make_disposition(zipfile_name + '.zip'))

result = await self.provider.zip(self.path)
result = await self.provider.zip(self.path, **self.arguments)

await self.write_stream(result)
Loading