Skip to content

Commit c8443e2

Browse files
authored
Merge pull request #38 from pmorris-dev/pagination-phase1
feat: add keyset pagination to sqlx-sqlite-toolkit
2 parents 20f0385 + 6f61755 commit c8443e2

7 files changed

Lines changed: 2134 additions & 2 deletions

File tree

crates/sqlx-sqlite-toolkit/README.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,61 @@ tx.commit().await?;
127127
// Or: tx.rollback().await?;
128128
```
129129

130+
### Pagination
131+
132+
When working with large result sets, loading all rows at once can cause
133+
performance degradation and excessive memory usage. The toolkit provides
134+
built-in pagination via `fetch_page` to fetch data in fixed-size pages,
135+
keeping memory bounded and queries fast regardless of total row count.
136+
137+
#### Why Keyset Pagination
138+
139+
The toolkit uses keyset (cursor-based) pagination rather than traditional
140+
OFFSET-based pagination. With OFFSET, the database must scan and discard
141+
all skipped rows on every page request, making deeper pages progressively
142+
slower. Keyset pagination uses indexed column values from the last row of
143+
the current page to seek directly to the next page, keeping query time
144+
constant no matter how far you paginate.
145+
146+
```rust
147+
use sqlx_sqlite_toolkit::pagination::KeysetColumn;
148+
149+
let keyset = vec![
150+
KeysetColumn::asc("category"),
151+
KeysetColumn::desc("score"),
152+
KeysetColumn::asc("id"),
153+
];
154+
155+
// First page
156+
let page = db.fetch_page(
157+
"SELECT id, title, category, score FROM posts".into(),
158+
vec![],
159+
keyset.clone(),
160+
25,
161+
).await?;
162+
163+
// Next page (forward) — pass the cursor from the previous page
164+
if let Some(cursor) = page.next_cursor {
165+
let next = db.fetch_page(
166+
"SELECT id, title, category, score FROM posts".into(),
167+
vec![],
168+
keyset.clone(),
169+
25,
170+
).after(cursor.clone()).await?;
171+
172+
// Previous page (backward) — rows are returned in original sort order
173+
let prev = db.fetch_page(
174+
"SELECT id, title, category, score FROM posts".into(),
175+
vec![],
176+
keyset,
177+
25,
178+
).before(cursor).await?;
179+
}
180+
```
181+
182+
The base query must not contain `ORDER BY` or `LIMIT` clauses — the builder
183+
appends these automatically based on the keyset definition.
184+
130185
### Cross-Database Queries
131186

132187
Attach other databases using the builder pattern:
@@ -183,6 +238,7 @@ cleanup_all_transactions(&interruptible, &regular).await;
183238
| `begin_interruptible_transaction()` | Begin interruptible transaction (builder) |
184239
| `fetch_all(query, values)` | Fetch all rows as JSON maps |
185240
| `fetch_one(query, values)` | Fetch single row or `None` |
241+
| `fetch_page(query, values, keyset, page_size)` | Keyset pagination (builder, supports `.after()`, `.before()`, `.attach()`) |
186242
| `acquire_writer()` | Acquire exclusive `WriterGuard` |
187243
| `run_migrations(migrator)` | Run pending migrations |
188244
| `close()` | Close connection |
@@ -214,6 +270,13 @@ All errors provide an `error_code()` method returning a machine-readable string:
214270
| `NO_ACTIVE_TRANSACTION` | Remove from empty state |
215271
| `INVALID_TRANSACTION_TOKEN` | Wrong transaction ID |
216272
| `IO_ERROR` | File system error |
273+
| `EMPTY_KEYSET_COLUMNS` | Keyset pagination requires at least one column |
274+
| `INVALID_PAGE_SIZE` | Page size must be greater than zero |
275+
| `CURSOR_LENGTH_MISMATCH` | Cursor value count does not match keyset column count |
276+
| `INVALID_PAGINATION_QUERY` | Base query contains top-level ORDER BY or LIMIT |
277+
| `CURSOR_COLUMN_NOT_FOUND` | Keyset column not found in query results |
278+
| `INVALID_COLUMN_NAME` | Keyset column name contains invalid characters |
279+
| `CONFLICTING_CURSORS` | Both `after` and `before` cursors provided |
217280

218281
## Development
219282

crates/sqlx-sqlite-toolkit/src/builders.rs

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use serde_json::Value as JsonValue;
99
use sqlx_sqlite_conn_mgr::AttachedSpec;
1010

1111
use crate::Error;
12+
use crate::pagination::{KeysetColumn, KeysetPage, build_paginated_query};
1213
use crate::wrapper::{DatabaseWrapper, WriteQueryResult, bind_value};
1314

1415
/// Builder for SELECT queries returning multiple rows
@@ -153,6 +154,188 @@ impl IntoFuture for FetchOneBuilder {
153154
}
154155
}
155156

157+
/// Internal cursor position for forward vs backward pagination.
158+
enum CursorPosition {
159+
Forward(Vec<JsonValue>),
160+
Backward(Vec<JsonValue>),
161+
}
162+
163+
/// Builder for paginated SELECT queries using keyset (cursor-based) pagination
164+
pub struct FetchPageBuilder {
165+
db: Arc<sqlx_sqlite_conn_mgr::SqliteDatabase>,
166+
query: String,
167+
values: Vec<JsonValue>,
168+
keyset: Vec<KeysetColumn>,
169+
page_size: usize,
170+
cursor: Option<CursorPosition>,
171+
attached: Vec<AttachedSpec>,
172+
}
173+
174+
impl FetchPageBuilder {
175+
pub(crate) fn new(
176+
db: Arc<sqlx_sqlite_conn_mgr::SqliteDatabase>,
177+
query: String,
178+
values: Vec<JsonValue>,
179+
keyset: Vec<KeysetColumn>,
180+
page_size: usize,
181+
) -> Self {
182+
Self {
183+
db,
184+
query,
185+
values,
186+
keyset,
187+
page_size,
188+
cursor: None,
189+
attached: Vec::new(),
190+
}
191+
}
192+
193+
/// Set the cursor for fetching the next page (forward pagination).
194+
///
195+
/// Pass the `next_cursor` from a previous `KeysetPage` to fetch the page
196+
/// that follows it in the original sort order.
197+
pub fn after(mut self, cursor: Vec<JsonValue>) -> Self {
198+
self.cursor = Some(CursorPosition::Forward(cursor));
199+
self
200+
}
201+
202+
/// Set the cursor for fetching the previous page (backward pagination).
203+
///
204+
/// Pass a cursor to fetch the page that precedes it in the original sort
205+
/// order. Rows are returned in the original sort order (not reversed).
206+
pub fn before(mut self, cursor: Vec<JsonValue>) -> Self {
207+
self.cursor = Some(CursorPosition::Backward(cursor));
208+
self
209+
}
210+
211+
/// Attach additional databases for this query
212+
pub fn attach(mut self, attached: Vec<AttachedSpec>) -> Self {
213+
self.attached = attached;
214+
self
215+
}
216+
217+
/// Execute the paginated query and return a page of results
218+
pub async fn execute(self) -> Result<KeysetPage, Error> {
219+
// Validate inputs
220+
if self.keyset.is_empty() {
221+
return Err(Error::EmptyKeysetColumns);
222+
}
223+
if self.page_size == 0 {
224+
return Err(Error::InvalidPageSize);
225+
}
226+
227+
// Extract cursor values and direction
228+
let (cursor_values, backward) = match self.cursor {
229+
Some(CursorPosition::Forward(vals)) => (Some(vals), false),
230+
Some(CursorPosition::Backward(vals)) => (Some(vals), true),
231+
None => (None, false),
232+
};
233+
234+
if let Some(ref vals) = cursor_values
235+
&& vals.len() != self.keyset.len()
236+
{
237+
return Err(Error::CursorLengthMismatch {
238+
cursor_len: vals.len(),
239+
keyset_len: self.keyset.len(),
240+
});
241+
}
242+
243+
// Build paginated SQL — pass the user's bind count so cursor
244+
// placeholders are numbered $N+1, $N+2, … and never collide with
245+
// the user's $1, $2, … (or positional ?) parameters.
246+
let (sql, cursor_bind_values) = build_paginated_query(
247+
&self.query,
248+
&self.keyset,
249+
cursor_values.as_deref(),
250+
self.page_size,
251+
backward,
252+
self.values.len(),
253+
)?;
254+
255+
// Combine user values + cursor bind values
256+
let mut all_values = self.values;
257+
all_values.extend(cursor_bind_values);
258+
259+
// Execute query
260+
let rows = if self.attached.is_empty() {
261+
let pool = self.db.read_pool()?;
262+
let mut q = sqlx::query(&sql);
263+
for value in all_values {
264+
q = bind_value(q, value);
265+
}
266+
q.fetch_all(pool).await?
267+
} else {
268+
let mut conn =
269+
sqlx_sqlite_conn_mgr::acquire_reader_with_attached(&self.db, self.attached).await?;
270+
271+
let mut q = sqlx::query(&sql);
272+
for value in all_values {
273+
q = bind_value(q, value);
274+
}
275+
let rows = sqlx::Executor::fetch_all(&mut *conn, q).await?;
276+
277+
// Explicit cleanup
278+
conn.detach_all().await?;
279+
rows
280+
};
281+
282+
// Decode rows
283+
let mut decoded = decode_rows(rows)?;
284+
285+
// Determine has_more by checking if we got more rows than page_size
286+
let has_more = decoded.len() > self.page_size;
287+
if has_more {
288+
decoded.truncate(self.page_size);
289+
}
290+
291+
// Reverse rows when paginating backward to restore original sort order
292+
if backward {
293+
decoded.reverse();
294+
}
295+
296+
// Extract continuation cursor: first row if backward, last row if forward
297+
let cursor_row = if backward {
298+
decoded.first()
299+
} else {
300+
decoded.last()
301+
};
302+
303+
let next_cursor = if has_more {
304+
if let Some(row) = cursor_row {
305+
let mut cursor_vals = Vec::with_capacity(self.keyset.len());
306+
for col in &self.keyset {
307+
let value = row
308+
.get(&col.name)
309+
.ok_or_else(|| Error::CursorColumnNotFound {
310+
column: col.name.clone(),
311+
})?;
312+
cursor_vals.push(value.clone());
313+
}
314+
Some(cursor_vals)
315+
} else {
316+
None
317+
}
318+
} else {
319+
None
320+
};
321+
322+
Ok(KeysetPage {
323+
rows: decoded,
324+
next_cursor,
325+
has_more,
326+
})
327+
}
328+
}
329+
330+
impl IntoFuture for FetchPageBuilder {
331+
type Output = Result<KeysetPage, Error>;
332+
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
333+
334+
fn into_future(self) -> Self::IntoFuture {
335+
Box::pin(self.execute())
336+
}
337+
}
338+
156339
/// Builder for write queries (INSERT/UPDATE/DELETE)
157340
pub struct ExecuteBuilder {
158341
db: DatabaseWrapper,

0 commit comments

Comments
 (0)