-
Notifications
You must be signed in to change notification settings - Fork 2
Feature/cursor paging #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
8a8bc0f
de1e246
34084ba
19929f1
8c70514
411628f
8f4b98e
a2dfb6c
01d36fd
c9f6397
ef7282d
cf0b236
a495ec0
36157dc
2f0050d
68401f2
b3b708c
d622074
749cfc0
8672da0
db06877
493f998
776c3ba
b1a83ec
aca665f
1376c31
f2efff8
75c42b9
76052ab
8762b2c
e6b684e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,9 @@ | |
| if TYPE_CHECKING: | ||
| from edfi_api_client.edfi_client import EdFiClient | ||
|
|
||
| from joblib import Parallel, delayed | ||
| from functools import partial | ||
|
|
||
|
|
||
| class EdFiEndpoint: | ||
| """ | ||
|
|
@@ -123,30 +126,30 @@ def ping(self, *, params: Optional[dict] = None, **kwargs) -> requests.Response: | |
| return res | ||
|
|
||
|
|
||
| def get(self, limit: Optional[int] = None, *, params: Optional[dict] = None, **kwargs) -> List[dict]: | ||
| def get(self, url: Optional[str] = None, limit: Optional[int] = None, *, params: Optional[dict] = None, **kwargs) -> List[dict]: | ||
|
gnguyen87 marked this conversation as resolved.
Outdated
|
||
| """ | ||
| This method returns the rows from a single GET request using the exact params passed by the user. | ||
|
|
||
| :return: | ||
| """ | ||
| logging.info(f"[Get {self.component}] Endpoint: {self.url}") | ||
| end_point = url or self.url | ||
| logging.info(f"[Get {self.component}] Endpoint: {end_point}") | ||
|
|
||
| # Override init params if passed | ||
| params = EdFiParams(params or self.params).copy() | ||
| if limit: # Override limit if passed | ||
| params['limit'] = limit | ||
|
|
||
| logging.info(f"[Get {self.component}] Parameters: {params}") | ||
| return self.client.session.get_response(self.url, params=params, **kwargs).json() | ||
|
|
||
| resp = self.client.session.get_response(end_point, params=params, **kwargs).json() | ||
| return resp | ||
|
|
||
|
|
||
| def get_rows(self, | ||
| *, | ||
| params: Optional[dict] = None, # Optional alternative params | ||
| page_size: int = 100, | ||
| reverse_paging: bool = True, | ||
|
gnguyen87 marked this conversation as resolved.
|
||
| step_change_version: bool = False, | ||
| change_version_step_size: int = 50000, | ||
| **kwargs | ||
| ) -> Iterator[dict]: | ||
| """ | ||
|
|
@@ -163,20 +166,22 @@ def get_rows(self, | |
| :param max_wait: | ||
| :return: | ||
| """ | ||
| paged_result_iter = self.get_pages( | ||
|
|
||
| paged_result_iter = self.get_pages_cursor( | ||
|
gnguyen87 marked this conversation as resolved.
Outdated
|
||
| params=params, | ||
| page_size=page_size, reverse_paging=reverse_paging, | ||
| step_change_version=step_change_version, change_version_step_size=change_version_step_size, | ||
| page_size=page_size, | ||
| **kwargs | ||
| ) | ||
|
|
||
| for paged_result in paged_result_iter: | ||
| yield from paged_result | ||
|
|
||
|
|
||
| def get_pages(self, | ||
| def get_pages_offset(self, | ||
|
gnguyen87 marked this conversation as resolved.
Outdated
|
||
| *, | ||
| url: Optional[str] = None, | ||
|
gnguyen87 marked this conversation as resolved.
Outdated
|
||
| params: Optional[dict] = None, # Optional alternative params | ||
| limit: Optional[int] = None, | ||
| page_size: int = 100, | ||
| reverse_paging: bool = True, | ||
| step_change_version: bool = False, | ||
|
|
@@ -199,6 +204,11 @@ def get_pages(self, | |
| """ | ||
| # Override init params if passed | ||
| paged_params = EdFiParams(params or self.params).copy() | ||
| end_point = url or self.url | ||
| logging.info(f"[Get {self.component}] Endpoint: {end_point}") | ||
|
|
||
| if limit: # Override limit if passed | ||
|
gnguyen87 marked this conversation as resolved.
Outdated
|
||
| paged_params['limit'] = limit | ||
|
|
||
| ### Prepare pagination variables, depending on type of pagination being used | ||
| if step_change_version and reverse_paging: | ||
|
|
@@ -210,19 +220,20 @@ def get_pages(self, | |
| elif step_change_version: | ||
| logging.info(f"[Paged Get {self.component}] Pagination Method: Change Version Stepping") | ||
| paged_params.init_page_by_offset(page_size) | ||
| paged_params.init_page_by_change_version_step(change_version_step_size) | ||
|
|
||
| paged_params.init_page_by_change_version_step(change_version_step_size) | ||
| else: | ||
| logging.info(f"[Paged Get {self.component}] Pagination Method: Offset Pagination") | ||
| paged_params.init_page_by_offset(page_size) | ||
|
|
||
| total_count = 0 | ||
|
gnguyen87 marked this conversation as resolved.
Outdated
|
||
| # Begin pagination-loop | ||
| while True: | ||
| logging.info(f"[Get {self.component}] Parameters: {paged_params}") | ||
|
|
||
| ### GET from the API and yield the resulting JSON payload | ||
| paged_rows = self.get(params=paged_params, **kwargs) | ||
| logging.info(f"[Get {self.component}] Retrieved {len(paged_rows)} rows.") | ||
| paged_rows = self.client.session.get_response(end_point, params=paged_params, **kwargs).json() | ||
| yield paged_rows | ||
| logging.info(f"[Get {self.component}] Retrieved {len(paged_rows)} rows.") | ||
|
gnguyen87 marked this conversation as resolved.
Outdated
|
||
|
|
||
| ### Paginate, depending on the method specified in arguments | ||
| # Reverse offset pagination is only applicable during change-version stepping. | ||
|
|
@@ -239,7 +250,7 @@ def get_pages(self, | |
| except StopIteration: | ||
| logging.info(f"[Paged Get {self.component}] @ Change version exceeded max. Ending pagination.") | ||
| break | ||
|
|
||
| else: | ||
| # If no rows are returned, end pagination. | ||
| if len(paged_rows) == 0: | ||
|
|
@@ -259,6 +270,67 @@ def get_pages(self, | |
| else: | ||
| logging.info(f"@ Paginating offset...") | ||
| paged_params.page_by_offset() | ||
|
|
||
| def get_pages_cursor(self, | ||
| *, | ||
| url: Optional[str] = None, | ||
| params: Optional[dict] = None, # Optional alternative params | ||
| limit: Optional[int] = None, | ||
| page_size: int = 100, | ||
| **kwargs | ||
| ) -> Iterator[List[dict]]: | ||
|
|
||
| # Override init params if passed | ||
| paged_params = EdFiParams(params or self.params).copy() | ||
| end_point = url or self.url | ||
| logging.info(f"[Get {self.component}] Endpoint: {end_point}") | ||
|
|
||
| if limit: # Override limit if passed | ||
| paged_params['limit'] = limit | ||
|
|
||
| # Fall back to reverse-offset paging if incompatible with cursor paging | ||
| def _fall_back_to_pages_by_offset(): | ||
|
jayckaiser marked this conversation as resolved.
Outdated
|
||
| return self.get_pages_offset( | ||
| url = url, | ||
| params = params, | ||
| limit = limit, | ||
| page_size=page_size, | ||
| **kwargs | ||
| ) | ||
|
|
||
| # Check ODS version compatibility for cursor paging | ||
| ods_version = tuple(map(int, self.client.get_ods_version().split(".")[:2])) | ||
| if ods_version < (7,3): | ||
| logging.warning(f"ODS {self.client.get_ods_version()} is incompatible. Cursor Paging requires v.7.3 or higher. Falling back to another paging method") | ||
| yield from _fall_back_to_pages_by_offset() | ||
| return | ||
| # deletes/key_changes cannot be retrieved with cursor paging | ||
| if self.get_deletes or self.get_key_changes: | ||
| logging.warning(f"Cursor Paging does not support deletes/key_changes. Falling back to another paging method") | ||
| yield from _fall_back_to_pages_by_offset() | ||
| return | ||
|
|
||
| logging.info(f"[Paged Get {self.component}] Pagination Method: Cursor Paging") | ||
|
|
||
| ### Prepare pagination variables | ||
| ### First request should not have any `page_token` and `page_size` defined | ||
| paged_params.init_page_by_token(page_token = None, page_size = None) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Page_size should be defined regardless, since we always set a default value.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's consolidate 317 and 332 and put it at the start of the while-loop. We can define |
||
|
|
||
| # Begin pagination loop | ||
| while True: | ||
| logging.info(f"[Get {self.component}] Parameters: {paged_params}") | ||
|
|
||
| result = self.client.session.get_response(end_point, params = paged_params, **kwargs) | ||
| paged_rows = result.json() | ||
| logging.info(f"[Get {self.component}] Retrieved {len(paged_rows)} rows") | ||
| yield paged_rows | ||
|
|
||
| logging.info(f"[Paged Get {self.component}] @ Cursor paging ...") | ||
|
gnguyen87 marked this conversation as resolved.
Outdated
|
||
| if not result.headers.get("Next-Page-Token"): | ||
| logging.info(f"[Paged Get {self.component}] @ Retrieved zero rows. Ending pagination.") | ||
|
gnguyen87 marked this conversation as resolved.
Outdated
|
||
| break | ||
| paged_params.init_page_by_token(page_token = result.headers.get("Next-Page-Token"), page_size = page_size) | ||
|
gnguyen87 marked this conversation as resolved.
Outdated
|
||
|
|
||
|
|
||
|
|
||
| def get_total_count(self, *, params: Optional[dict] = None, **kwargs) -> int: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,8 @@ def __init__(self, | |
| # These parameters are only used during pagination. They must be explicitly initialized. | ||
| self.page_size = None | ||
| self.change_version_step_size = None | ||
| self.page_token = None | ||
| self.number = None | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is this |
||
|
|
||
|
|
||
| def copy(self) -> 'EdFiParams': | ||
|
|
@@ -190,3 +192,28 @@ def reverse_page_by_offset(self): | |
|
|
||
| if self['offset'] < 0: | ||
| raise StopIteration | ||
|
|
||
| def init_page_by_token(self, page_token: str, page_size: int): | ||
|
gnguyen87 marked this conversation as resolved.
Outdated
|
||
| """ | ||
|
|
||
| :param page_size: | ||
| :param page_token: | ||
| :return: | ||
| """ | ||
|
|
||
| # Cursor paging behavior: page_token is required when page_size is specified. | ||
| # - If page_token is None: first request, do NOT include page_size | ||
| # - If page_token is present: include page_token and page_size | ||
| self.page_size = page_size | ||
| self.page_token = page_token | ||
|
|
||
| if page_token is None: | ||
| self.pop("pageToken", None) | ||
| self.pop("page_size", None) | ||
| else: | ||
| self["pageToken"] = self.page_token | ||
| self["page_size"] = self.page_size | ||
|
|
||
|
|
||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.