Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions waterbutler/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,78 @@ async def __anext__(self):
return path.path.replace(self.parent_path.path, '', 1), await self.provider.download(path)


class ZipStreamGeneratorPaginated:

def __init__(self, provider, root_path, **kwargs):
self.provider = provider
self.parent_path = root_path.parent if root_path.is_file else root_path
self.root_path = root_path
self.kwargs = kwargs

self.remaining = []
self.initialized = False

async def _initialize(self):
if self.initialized:
return

self.initialized = True

if self.root_path.is_file:
metadata = await self.provider.metadata(
self.root_path,
**self.kwargs,
)
self.remaining.append((self.root_path.parent, metadata))
return

async for page in self.provider.iter_children_pages(
self.root_path,
**self.kwargs,
):

self.remaining.extend(
(self.root_path, item)
for item in page
)

async def __aiter__(self):
return self

async def __anext__(self):
if not self.initialized:
await self._initialize()

if not self.remaining:
raise StopAsyncIteration

current = self.remaining.pop(0)
path = self.provider.path_from_metadata(*current)

if path.is_dir:
async for page in self.provider.iter_children_pages(
path,
**self.kwargs,
):
if not page:
return (
path.path.replace(self.parent_path.path, '', 1),
EmptyStream(),
)

self.remaining.extend(
(path, item)
for item in page
)

return await self.__anext__()

return (
path.path.replace(self.parent_path.path, '', 1),
await self.provider.download(path),
)


class RequestHandlerContext:

def __init__(self, request_coro):
Expand Down
57 changes: 55 additions & 2 deletions waterbutler/providers/osfstorage/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,14 @@ async def delete(self, path, confirm_delete=0, **kwargs):
)).release()

async def zip(self, path: wb_path.WaterButlerPath, **kwargs) -> asyncio.StreamReader:
# add query param 'minimal' to avoid unnecessary metadata in the response.
return await super().zip(path, **kwargs, minimal=True)
"""Streams a Zip archive of the given folder

:param path: ( :class:`.WaterButlerPath` ) The folder to compress
"""
kwargs['minimal'] = True
kwargs['limit'] = None

return streams.ZipStreamReader(utils.ZipStreamGeneratorPaginated(self, path, **kwargs)) # type: ignore

async def metadata(self, path, **kwargs):
if path.identifier is None:
Expand Down Expand Up @@ -519,6 +525,23 @@ async def copy(self,

return meta_data, created

async def iter_children_pages(self, path, **kwargs):
url = self.build_url(
path.identifier,
'children',
user_id=self.auth.get('id'),
minimal=kwargs.get('minimal'),
limit=kwargs.get('limit'),
orm=True
)

while True:
page, url = await self._fetch_metadata_page(url, path, **kwargs)
yield page

if not url:
break

# ========== private ==========

async def _item_metadata(self, path, revision=None):
Expand All @@ -545,6 +568,36 @@ async def _children_metadata(self, path, **kwargs):
ret.append(OsfStorageFileMetadata(item, str(path.child(item['name']))))
return ret

async def _fetch_metadata_page(self, url, path, **kwargs):
resp = await self.make_signed_request(
'GET',
url,
expects=(200,),
)
resp_json = await resp.json()

if kwargs.get('limit') and len(resp_json) < kwargs.get('limit'):
next_url = None
elif not kwargs.get('limit'):
next_url = None
else:
next_url = self.build_url(
path.identifier,
'children',
user_id=self.auth.get('id'),
minimal=kwargs.get('minimal'),
limit=kwargs.get('limit'),
after=resp_json[-1]['id'] if resp_json else None,
orm=True
)
ret = []
for item in resp_json:
if item['kind'] == 'folder':
ret.append(OsfStorageFolderMetadata(item, str(path.child(item['name'], folder=True))))
else:
ret.append(OsfStorageFileMetadata(item, str(path.child(item['name']))))
return ret, next_url

async def _delete_folder_contents(self, path, **kwargs):
"""Delete the contents of a folder. For use against provider root.

Expand Down
Loading