Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8a8bc0f
first pass at adding cursor paging
Jan 9, 2026
de1e246
draft
Jan 13, 2026
34084ba
first draft at parallel partitioning
Jan 13, 2026
19929f1
code clean up
Jan 14, 2026
8c70514
move partitioning logic
Jan 14, 2026
411628f
change default
Jan 15, 2026
8f4b98e
add condition to skip first fetch when partitioning
Jan 20, 2026
a2dfb6c
flatten final return list
Jan 21, 2026
01d36fd
revert default paging method
Jan 22, 2026
c9f6397
add docs
Jan 22, 2026
ef7282d
add check for endpoint type since cursor paging does not support dele…
Jan 22, 2026
cf0b236
remove partitioning for now
Jan 26, 2026
a495ec0
decouple and refactor get_pages() into 2 smaller pagination generators
Jan 26, 2026
36157dc
default to cursor paging for `get_rows()`
Jan 26, 2026
2f0050d
fall back on reverse-offset if ods version not compatible with cursor…
Jan 27, 2026
68401f2
fall back on reverse-offset if incompatible with cursor paging
Jan 27, 2026
b3b708c
remove default step_change_version arg
Jan 27, 2026
d622074
undo get()
Jan 27, 2026
749cfc0
remove unneeded code
Jan 27, 2026
8672da0
move fallbacks to `get_rows()`
Jan 27, 2026
db06877
debug
Jan 28, 2026
493f998
reverse-offset as default
Jan 28, 2026
776c3ba
code clean up
Jan 29, 2026
b1a83ec
Minor cleanup of whitespace and logging.
jayckaiser Feb 3, 2026
aca665f
Implement get_pages instead of get_pages_cursor in composites.
jayckaiser Feb 3, 2026
1376c31
Label composite-offset pagination scheme with correct method.
jayckaiser Feb 3, 2026
f2efff8
Merge branch 'main' into feature/cursor_paging
jayckaiser Mar 24, 2026
75c42b9
Use defined logger in EdFiEndpoint instead of default.
jayckaiser Mar 24, 2026
76052ab
Minor cleanup.
jayckaiser Mar 24, 2026
8762b2c
Merge branch 'feature/cursor_paging' of https://github.com/edanalytic…
Apr 13, 2026
e6b684e
remove attribute
Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 88 additions & 11 deletions edfi_api_client/edfi_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
if TYPE_CHECKING:
from edfi_api_client.edfi_client import EdFiClient

from joblib import Parallel, delayed
Comment thread
gnguyen87 marked this conversation as resolved.
Outdated
from functools import partial


class EdFiEndpoint:
"""
Expand Down Expand Up @@ -123,29 +126,34 @@ def ping(self, *, params: Optional[dict] = None, **kwargs) -> requests.Response:
return res


def get(self, limit: Optional[int] = None, *, params: Optional[dict] = None, **kwargs) -> List[dict]:
def get(self, url: Optional[str] = None, limit: Optional[int] = None, *, params: Optional[dict] = None, **kwargs) -> requests.Response:
Comment thread
jayckaiser marked this conversation as resolved.
Outdated
"""
This method returns the rows from a single GET request using the exact params passed by the user.

:return:
"""
logging.info(f"[Get {self.component}] Endpoint: {self.url}")
end_point = url or self.url
logging.info(f"[Get {self.component}] Endpoint: {end_point}")

# Override init params if passed
params = EdFiParams(params or self.params).copy()
if limit: # Override limit if passed
params['limit'] = limit

logging.info(f"[Get {self.component}] Parameters: {params}")
return self.client.session.get_response(self.url, params=params, **kwargs).json()

resp = self.client.session.get_response(end_point, params=params, **kwargs)
return resp


def get_rows(self,
*,
params: Optional[dict] = None, # Optional alternative params
page_size: int = 100,
reverse_paging: bool = True,
Comment thread
gnguyen87 marked this conversation as resolved.
reverse_paging: bool = False,
Comment thread
gnguyen87 marked this conversation as resolved.
Outdated
step_change_version: bool = False,
cursor_paging: bool = True,
partitioning: bool = False,
change_version_step_size: int = 50000,
**kwargs
) -> Iterator[dict]:
Expand All @@ -165,7 +173,7 @@ def get_rows(self,
"""
paged_result_iter = self.get_pages(
params=params,
page_size=page_size, reverse_paging=reverse_paging,
page_size=page_size, reverse_paging=reverse_paging, cursor_paging= cursor_paging, partitioning=partitioning,
step_change_version=step_change_version, change_version_step_size=change_version_step_size,
**kwargs
)
Expand All @@ -180,7 +188,10 @@ def get_pages(self,
page_size: int = 100,
reverse_paging: bool = True,
step_change_version: bool = False,
cursor_paging: bool = False,
partitioning: bool = False,
change_version_step_size: int = 50000,
number: Optional[int] = None,
**kwargs
) -> Iterator[List[dict]]:
"""
Expand Down Expand Up @@ -211,18 +222,44 @@ def get_pages(self,
logging.info(f"[Paged Get {self.component}] Pagination Method: Change Version Stepping")
paged_params.init_page_by_offset(page_size)
paged_params.init_page_by_change_version_step(change_version_step_size)


elif cursor_paging and partitioning:
ods_version = tuple(map(int, self.client.get_ods_version().split(".")[:2]))
Comment thread
jayckaiser marked this conversation as resolved.
Outdated
if ods_version < (7,3):
raise ValueError(f"ODS {self.client.get_ods_version()} is incompatible. Cursor Paging with Partitions requires v.7.3 or higher. Ending pagination")
if self.get_deletes or self.get_key_changes:
raise ValueError(f"Cursor Paging with Partitions does not support deletes/key_changes. Ending pagination")

token_url = f"{self.url.rstrip('/')}/partitions"
paged_tokens = self.get(url = token_url, params = paged_params.init_page_by_partitions(number = number) , **kwargs).json().get("pageTokens")
logging.info(f"[Paged Get {self.component}] Pagination Method: Cursor Paging with Partitions")
logging.info(f"[Get {self.component}] Retrieved {len(paged_tokens)} token(s): {paged_tokens}")

elif cursor_paging:
ods_version = tuple(map(int, self.client.get_ods_version().split(".")[:2]))
if ods_version < (7,3):
raise ValueError(f"ODS {self.client.get_ods_version()} is incompatible. Cursor Paging requires v.7.3 or higher. Ending pagination")
if self.get_deletes or self.get_key_changes:
raise ValueError(f"Cursor Paging does not support deletes/key_changes. Ending pagination")
logging.info(f"ODS {self.client.get_ods_version()}")
logging.info(f"[Paged Get {self.component}] Pagination Method: Cursor Paging")
# First request should not have any `page_token` defined
paged_params.init_page_by_token(page_token = None, page_size = None)
else:
logging.info(f"[Paged Get {self.component}] Pagination Method: Offset Pagination")
paged_params.init_page_by_offset(page_size)


# Begin pagination-loop
while True:

### GET from the API and yield the resulting JSON payload
paged_rows = self.get(params=paged_params, **kwargs)
logging.info(f"[Get {self.component}] Retrieved {len(paged_rows)} rows.")
yield paged_rows
if not partitioning:
result = self.get(params=paged_params, **kwargs)
paged_rows = result.json()
logging.info(f"[Get {self.component}] Retrieved {len(paged_rows)} rows.")

yield paged_rows

### Paginate, depending on the method specified in arguments
# Reverse offset pagination is only applicable during change-version stepping.
Expand All @@ -239,7 +276,35 @@ def get_pages(self,
except StopIteration:
logging.info(f"[Paged Get {self.component}] @ Change version exceeded max. Ending pagination.")
break

elif cursor_paging and partitioning:
def partitioning_with_token(token):
results = []
p = paged_params.copy()
while True:
p.init_page_by_token(page_token=token, page_size=page_size)
result = self.get(params=p, **kwargs)
paged_rows = result.json()
results.extend(paged_rows)
if len(paged_rows) == 0:
logging.info(f"[Paged Get {self.component}] @ Retrieved zero rows for token: {token}. Ending pagination.")
break
logging.info(f"[Get {self.component}] Retrieved {len(paged_rows)} rows for token: {token}.")
token = result.headers.get("Next-Page-Token")
return results

results = [element for sublist in Parallel(n_jobs=len(paged_tokens), backend="threading")(delayed(partitioning_with_token)(token) for token in paged_tokens) for element in sublist]
Comment thread
jayckaiser marked this conversation as resolved.
Outdated
logging.info(f"{len(results)}")
for paged_page in results:
yield paged_page
return

elif cursor_paging :
logging.info(f"[Paged Get {self.component}] @ Cursor paging ...")
paged_params.init_page_by_token( page_token = result.headers.get("Next-Page-Token"), page_size = page_size)
if not result.headers.get("Next-Page-Token"):
logging.info(f"[Paged Get {self.component}] @ Retrieved zero rows. Ending pagination.")
break

else:
# If no rows are returned, end pagination.
if len(paged_rows) == 0:
Expand All @@ -259,7 +324,7 @@ def get_pages(self,
else:
logging.info(f"@ Paginating offset...")
paged_params.page_by_offset()


def get_total_count(self, *, params: Optional[dict] = None, **kwargs) -> int:
"""
Expand Down Expand Up @@ -319,6 +384,18 @@ def _get_attributes_from_swagger(self):
'has_deletes': (self.namespace, self.name) in swagger.deletes,
}

def partitioning_with_token(self, params: Optional[dict] = None, token: str = None, page_size: str = None, **kwargs ):

params.init_page_by_token(page_token=token, page_size=page_size)
result = params.get(params=params, **kwargs)
paged_rows = result.json()
yield paged_rows
logging.info(f"[Get {self.component}] Retrieved {len(paged_rows)} rows.")

token = result.headers.get("Next-Page-Token")
if not token:
logging.info(f"[Paged Get {self.component}] @ Retrieved zero rows. Ending pagination.")
return False

class EdFiResource(EdFiEndpoint):
component: str = 'Resource'
Expand Down
35 changes: 35 additions & 0 deletions edfi_api_client/edfi_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def __init__(self,
# These parameters are only used during pagination. They must be explicitly initialized.
self.page_size = None
self.change_version_step_size = None
self.page_token = None
self.number = None
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this number attribute used?



def copy(self) -> 'EdFiParams':
Expand Down Expand Up @@ -190,3 +192,36 @@ def reverse_page_by_offset(self):

if self['offset'] < 0:
raise StopIteration

def init_page_by_token(self, page_token: str, page_size: int):
Comment thread
gnguyen87 marked this conversation as resolved.
Outdated
"""

:param page_size:
:param page_token:
:return:
"""

# Cursor paging behavior: page_token is required when page_size is specified.
# - If page_token is None: first request, do NOT include page_size
# - If page_token is present: include page_token and page_size
self.page_size = page_size
self.page_token = page_token

if page_token is None:
self.pop("pageToken", None)
self.pop("pageSize", None)
else:
self["page_token"] = self.page_token
self["page_size"] = self.page_size

def init_page_by_partitions(self, number: int):
"""

:param number:
:return:
"""

self.number = number
self["number"] = number