Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions docs/ROW_MAPPING.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Row Mapping: Five Forms
# Row Mapping: Six Forms

When querying Hyper, there are five ways to map result rows into Rust values
from fully manual to fully automatic. Forms 1–4 trade manual control for
When querying Hyper, there are several ways to map result rows into Rust values
from fully manual to fully automatic. Forms 1–4 trade manual control for
convenience; Form 5 combines the automatic struct mapping of Form 4 with the
constant-memory streaming of Form 1. Start with the simplest that fits your
situation.
constant-memory streaming of Form 1; Form 6 adds `$1` parameter binding to the
struct mapping. Start with the simplest that fits your situation.

All five forms are demonstrated end-to-end in one runnable example:
All six forms are demonstrated end-to-end in one runnable example:

```
cargo run -p hyperdb-api --example row_mapping_forms
Expand Down Expand Up @@ -307,6 +307,57 @@ async fn print_products(conn: &hyperdb_api::AsyncConnection) -> hyperdb_api::Res

---

## Form 6 — Parameterized struct mapping

Forms 3–5 map rows into structs but take a plain `&str` query — no parameter
binding. [`query_params`](https://docs.rs/hyperdb-api/latest/hyperdb_api/struct.Connection.html#method.query_params)
binds `$1`, `$2`, … placeholders safely but returns raw `Row`s. The `_as_params`
methods are the intersection: they bind parameters via `ToSqlParam` *and* map
each result row into a `FromRow` struct, in a single call — no manual
`RowAccessor` loop and no SQL-injection risk.

There are three, mirroring the non-param trio:

- `fetch_one_as_params::<T>(query, params)` → `Result<T>`
- `fetch_all_as_params::<T>(query, params)` → `Result<Vec<T>>`
- `stream_as_params::<T>(query, params)` → `Result<impl Iterator<Item = Result<T>>>`
(constant memory, like Form 5)

```rust
use hyperdb_api::{Connection, CreateMode, FromRow, HyperProcess, Result};

fn main() -> Result<()> {
let hyper = HyperProcess::new(None, None)?;
let conn = Connection::new(&hyper, "products.hyper", CreateMode::DoNotCreate)?;

// Product derives FromRow (see Form 4); $1 is bound from `params`.
let max_price = 15.0f64;
let affordable: Vec<Product> = conn.fetch_all_as_params(
"SELECT id, name, price, in_stock FROM products WHERE price < $1 ORDER BY id",
&[&max_price],
)?;
for p in &affordable {
println!("{:>2} {:<10} ${:.2} in_stock={}", p.id, p.name, p.price, p.in_stock);
}
Ok(())
}
```

Error handling matches the underlying methods: `fetch_one_as_params` /
`fetch_all_as_params` return their errors on the `Result`; `stream_as_params`
(sync) reports stream-open errors — including `FeatureNotSupported` on the gRPC
transport, since prepared statements are TCP-only — on the outer `Result`, and
per-row mapping errors as each item's `Result`. The async `stream_as_params`
returns an `impl Stream` with no outer `Result`, so submission failures surface
as the first yielded `Err` item (as with the async Form 5 above).

### Async

Each `_as_params` method has an `AsyncConnection` equivalent — `await` the
`fetch_*` variants, and pin the `stream_as_params` stream just like Form 5.

---

## Choosing a form

| Need | Use |
Expand All @@ -316,6 +367,7 @@ async fn print_products(conn: &hyperdb_api::AsyncConnection) -> hyperdb_api::Res
| Named struct, custom mapping logic | Form 3 (`impl FromRow` manually) |
| Named struct, fields match columns | Form 4 (`#[derive(FromRow)]`) |
| Streaming + named struct (constant memory) | Form 5 (`stream_as`) |
| Named struct from a parameterized (`$1`) query | Form 6 (`fetch_*_as_params` / `stream_as_params`) |

For scalar values (a single `COUNT(*)`, `MAX`, etc.), use
[`fetch_scalar`](https://docs.rs/hyperdb-api/latest/hyperdb_api/struct.Connection.html#method.fetch_scalar)
Expand Down
16 changes: 16 additions & 0 deletions hyperdb-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,22 @@ let rows_affected = conn.command_params(
Supported parameter types: `i16`, `i32`, `i64`, `f32`, `f64`, `bool`, `&str`, `String`,
`Option<T>`, `Date`, `Time`, `Timestamp`, `OffsetTimestamp`, `Vec<u8>`, `&[u8]`.

To map a parameterized query's results straight into a `FromRow` struct (instead
of raw rows), use the `_as_params` variants — `fetch_one_as_params`,
`fetch_all_as_params`, and `stream_as_params` (sync and async). They combine the
`$1` binding above with the struct mapping of `fetch_*_as` / `stream_as` in one
call:

```rust
#[derive(hyperdb_api_derive::FromRow)]
struct User { id: i32, name: String }

let users: Vec<User> = conn.fetch_all_as_params(
"SELECT id, name FROM users WHERE org_id = $1",
&[&42i32],
)?;
```

#### Compile-time SQL validation (opt-in)

With the `hyperdb-api-derive` crate's `compile-time` feature, the `query_as!`
Expand Down
44 changes: 42 additions & 2 deletions hyperdb-api/examples/additional_examples/row_mapping_forms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
//! - **Form 3** — hand-written `FromRow` impl + `fetch_all_as`.
//! - **Form 4** — `#[derive(FromRow)]` + `fetch_all_as`.
//! - **Form 5** — streaming `FromRow`: `stream_as`, constant memory.
//! - **Form 6** — parameterized `FromRow`: `fetch_all_as_params` /
//! `stream_as_params`, combining `$1` parameter binding with struct mapping.
//!
//! Every form prints the same four products, so you can see they are
//! equivalent. Form 5 is shown on both the sync `Connection` and the async
//! Forms 1–5 print the same four products, so you can see they are equivalent.
//! Form 5 is shown on both the sync `Connection` and the async
//! `AsyncConnection` (which returns an `impl Stream` instead of an iterator).
//! Form 6 binds a `$1` price filter, so it prints only the matching subset.
//! Run with:
//!
//! cargo run -p hyperdb-api --example row_mapping_forms
Expand Down Expand Up @@ -51,6 +54,7 @@ fn main() -> Result<()> {
form3_manual_from_row(&conn)?;
form4_derive_from_row(&conn)?;
form5_streaming_from_row(&conn)?;
form6_parameterized_from_row(&conn)?;

// Form 5 also has an async flavor. Drop the sync connection first so the
// async one reopens the same database file cleanly, then drive the stream
Expand Down Expand Up @@ -216,6 +220,42 @@ fn form5_streaming_from_row(conn: &Connection) -> Result<()> {
Ok(())
}

/// Form 6 — parameterized `FromRow`. The `_as_params` methods combine `$1`
/// parameter binding (via `ToSqlParam`, exactly as `query_params`) with the
/// automatic struct mapping of Forms 3–5. This closes the last gap: a
/// parameterized `SELECT` whose rows you want as structs, in one call, with no
/// manual `RowAccessor` loop and no SQL-injection risk.
///
/// `fetch_all_as_params` collects the matches; `stream_as_params` is the
/// constant-memory streaming variant (shown here returning the same rows). Both
/// have async equivalents on `AsyncConnection`.
fn form6_parameterized_from_row(conn: &Connection) -> Result<()> {
println!("== Form 6 — parameterized FromRow (fetch_all_as_params / stream_as_params) ==");

// Bind a price ceiling as $1 — only products cheaper than $15 come back.
let max_price = 15.0f64;
let affordable: Vec<ProductDerived> = conn.fetch_all_as_params(
"SELECT id, name, price, in_stock FROM products WHERE price < $1 ORDER BY id",
&[&max_price],
)?;
println!("fetch_all_as_params (price < ${max_price:.2}):");
for p in &affordable {
print_row(p.id, &p.name, p.price, p.in_stock);
}

// Same query, streamed one chunk at a time via stream_as_params.
println!("stream_as_params (same filter, constant memory):");
for row_result in conn.stream_as_params::<ProductDerived>(
"SELECT id, name, price, in_stock FROM products WHERE price < $1 ORDER BY id",
&[&max_price],
)? {
let p = row_result?;
print_row(p.id, &p.name, p.price, p.in_stock);
}
println!();
Ok(())
}

/// Form 5, async flavor. `AsyncConnection::stream_as` returns an
/// `impl Stream<Item = Result<T>>` rather than an iterator — otherwise the
/// shape is identical to the sync version: lazy, one chunk in memory at a
Expand Down
150 changes: 147 additions & 3 deletions hyperdb-api/src/async_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,149 @@ impl AsyncConnection {
}
}

/// Fetches a single row from a **parameterized** query and maps it to a
/// struct using [`FromRow`](crate::FromRow) (async).
///
/// Parameterized counterpart to [`fetch_one_as`](Self::fetch_one_as): binds
/// `$1`, `$2`, … placeholders from `params` (via
/// [`ToSqlParam`](crate::params::ToSqlParam), exactly as
/// [`query_params`](Self::query_params)) and maps the first result row into
/// `T`.
///
/// # Errors
///
/// - Returns [`Error::FeatureNotSupported`] on gRPC transports (prepared
/// statements are TCP-only).
/// - Returns the error from [`query_params`](Self::query_params) if the
/// server rejects the statement, or on transport-level I/O failures.
/// - Returns [`Error::Conversion`] with message `"Query returned no rows"`
/// if the query produced zero rows.
/// - Returns whatever [`FromRow::from_row`](crate::FromRow::from_row)
/// produces when the row cannot be mapped.
pub async fn fetch_one_as_params<T: crate::FromRow>(
&self,
query: &str,
params: &[&dyn crate::params::ToSqlParam],
) -> Result<T> {
let row = self
.query_params(query, params)
.await?
.require_first_row()
.await?;
let indices = row
.schema()
.map(crate::row_accessor::RowAccessor::build_indices)
.unwrap_or_default();
T::from_row(crate::RowAccessor::new(&row, &indices))
}

/// Fetches all rows from a **parameterized** query and maps them to structs
/// using [`FromRow`](crate::FromRow) (async).
///
/// Parameterized counterpart to [`fetch_all_as`](Self::fetch_all_as): binds
/// `$1`, `$2`, … placeholders from `params` (see
/// [`query_params`](Self::query_params)) and maps every result row into `T`.
///
/// # Errors
///
/// - Returns [`Error::FeatureNotSupported`] on gRPC transports.
/// - Returns the error from [`query_params`](Self::query_params) if the
/// server rejects the statement, or on transport-level I/O failures.
/// - Returns the first error produced by
/// [`FromRow::from_row`](crate::FromRow::from_row) on any row.
pub async fn fetch_all_as_params<T: crate::FromRow>(
&self,
query: &str,
params: &[&dyn crate::params::ToSqlParam],
) -> Result<Vec<T>> {
let rows = self
.query_params(query, params)
.await?
.collect_rows()
.await?;
// Build the column-name → index lookup once from the first row's
// schema; reuse for every row. See `fetch_all_as`.
let indices = rows
.first()
.and_then(crate::result::Row::schema)
.map(crate::row_accessor::RowAccessor::build_indices)
.unwrap_or_default();
rows.iter()
.map(|r| T::from_row(crate::RowAccessor::new(r, &indices)))
.collect()
}

/// Returns a lazy `Stream` over the rows of a **parameterized** query,
/// mapping each to `T` via [`FromRow`] (async).
///
/// Parameterized counterpart to [`stream_as`](Self::stream_as): binds `$1`,
/// `$2`, … placeholders from `params` and streams the result with O(chunk)
/// memory, mapping each row into `T`. The column-index map is built once on
/// the first chunk and reused.
///
/// # Errors
///
/// Like [`stream_as`](Self::stream_as), this returns the `Stream` directly
/// (no outer `Result`) — the query is lazy and does not execute until first
/// polled. Each yielded item is a `Result<T>`:
/// - The **first** item is `Err(e)` if statement submission fails:
/// [`Error::FeatureNotSupported`] on gRPC transport, a `Parse`/`Bind`
/// rejection, or a transport failure. These surface as the first item,
/// *not* eagerly.
/// - Subsequent items are `Ok(T)` on a clean per-row mapping, or `Err(e)`
/// for a mapping failure (missing column, type mismatch, NULL in a
/// non-`Option` field) or a transport error hit on a later chunk.
///
/// [`FromRow`]: crate::FromRow
pub fn stream_as_params<'a, T: crate::FromRow + 'a>(
&'a self,
query: &str,
params: &[&dyn crate::params::ToSqlParam],
) -> impl futures_core::Stream<Item = Result<T>> + 'a {
// `&[&dyn ToSqlParam]` can't cross the `try_stream!` await points, so
// own the query string and encode params up front (encoding needs no
// connection). The prepare+execute sequence below mirrors
// `query_params` (see that method) — keep the two in sync if its
// Parse/Bind/Execute handling ever changes.
let query = query.to_owned();
let oids: Vec<crate::Oid> = params.iter().map(|p| p.sql_oid()).collect();
let encoded: Vec<Option<Vec<u8>>> = params.iter().map(|p| p.encode_param()).collect();
async_stream::try_stream! {
let client = match &self.transport {
AsyncTransport::Tcp(tcp) => &tcp.client,
AsyncTransport::Grpc(_) => {
// `?` inside try_stream! yields Err(e) and terminates the
// generator, so `unreachable!()` is dead code — its `!`
// type just satisfies the arm's need for a `&AsyncClient`
// (same type the Tcp arm produces).
Err(Error::feature_not_supported(
"prepared statements are not supported over gRPC transport",
))?;
unreachable!()
}
};
let stmt = client.prepare_typed(&query, &oids).await?;
let stream = client
.execute_prepared_streaming(&stmt, encoded, crate::result::DEFAULT_BINARY_CHUNK_SIZE)
.await?;
let mut rs = AsyncRowset::from_prepared(stream).with_statement_guard(stmt);
// The Prepared path captures the schema at prepare time, so the
// column-name → index map is available immediately — build it once
// up front rather than deferring to the first chunk (the empty-map
// fallback matches `stream_as` / `fetch_all_as` if it is somehow
// unavailable, surfacing a per-row `Missing` error).
let idx = rs
.schema()
.map(|schema| crate::RowAccessor::build_owned_indices(&schema))
.unwrap_or_default();
while let Some(chunk) = rs.next_chunk().await? {
for row in &chunk {
yield T::from_row(crate::RowAccessor::new_owned(row, &idx))?;
}
}
}
}

/// Fetches a single non-NULL scalar value. Errors on empty / NULL.
///
/// # Errors
Expand Down Expand Up @@ -558,11 +701,12 @@ impl AsyncConnection {
// Parameterized Queries
// =========================================================================

/// Executes a parameterized query with safely escaped parameters (async).
/// Executes a parameterized query with binary-encoded parameters (async).
///
/// Mirrors the sync [`Connection::query_params`](crate::Connection::query_params);
/// see that method for the design rationale around text-mode escaping
/// vs. future native Bind/Execute support.
/// see that method for the design rationale. Parameters travel through the
/// extended query protocol (Parse/Bind/Execute) in HyperBinary format — no
/// SQL escaping, full SQL-injection safety regardless of parameter content.
///
/// # Errors
///
Expand Down
Loading
Loading