From 4b34de06363054155167d9e5cfe9fc0ce5ce40ab Mon Sep 17 00:00:00 2001 From: Sprite Date: Thu, 11 Jun 2026 17:04:03 +0000 Subject: [PATCH] JS: Add lazy streaming SELECT/CONSTRUCT query cursors Add Store.querySolutions(query, options) -> QuerySolutions (SELECT) and Store.queryTriples(query, options) -> QueryTriples (CONSTRUCT/DESCRIBE). Each cursor holds the lazy QuerySolutionIter/QueryTripleIter alive instead of draining it into an Array, exposing nextBatch(count) (an empty Array signals exhaustion; count must be >= 1) plus, for SELECT, a variables getter. This lets the WASM bindings stream large result sets in bounded batches rather than materializing the whole set or serializing it into one string that can exceed V8's max string length. The iterators are 'static because on_store() snapshots storage rather than borrowing &Store, so a cursor lives in a wasm-bindgen struct across JS calls and is MVCC-isolated: an open cursor still yields the rows that matched at query time even if the store is concurrently mutated. The snapshot is pinned until the cursor's generated free() is called. Option parsing is shared with query() via a new execute_query() helper, so the cursors get full parity (base_iri, default_graph, named_graphs, use_default_graph_as_union) with no behavior change to query(). Adds TypeScript typings for both methods and cursor classes, README docs, and red/green tests covering bounded batching, exhaustion, option parity, term fidelity, non-matching-form rejection, snapshot isolation, and free(). Co-Authored-By: Claude Opus 4.8 --- js/README.md | 33 ++++++ js/src/lib.rs | 34 ++++++ js/src/store.rs | 249 ++++++++++++++++++++++++++++++++---------- js/test/store.test.ts | 205 ++++++++++++++++++++++++++++++++++ 4 files changed, 462 insertions(+), 59 deletions(-) diff --git a/js/README.md b/js/README.md index e6459e09a..dd0b46ceb 100644 --- a/js/README.md +++ b/js/README.md @@ -182,6 +182,39 @@ console.log(store.query("ASK { ?p ?o }", { })); ``` +#### `Store.prototype.querySolutions(String query, object options)` +Executes a [SPARQL 1.1 `SELECT` query](https://www.w3.org/TR/sparql11-query/#select) and returns a `QuerySolutions` cursor that yields results **lazily**, in bounded batches, instead of materializing the whole result set into an array like `query()` does. This keeps memory bounded for very large result sets — useful in particular for the 32-bit WASM heap and for streaming results across a worker boundary without serializing everything into one string. + +It accepts the same evaluation options as `query()` (`base_iri`, `default_graph`, `named_graphs`, `use_default_graph_as_union`); `results_format` does not apply. It throws if the query is not a `SELECT`. + +```js +const cursor = store.querySolutions("SELECT ?s ?p ?o WHERE { ?s ?p ?o }"); +console.log(cursor.variables); // ["s", "p", "o"], available before pulling any row +for (let batch = cursor.nextBatch(1000); batch.length > 0; batch = cursor.nextBatch(1000)) { + for (const binding of batch) { + console.log(binding.get("s").value); + } +} +cursor.free(); // release the cursor's storage snapshot (see below) +``` + +`QuerySolutions.prototype.nextBatch(Number count)` pulls up to `count` solutions (each a `Map` from bound variable name to `Term`). An **empty array signals exhaustion**; because of this, `count` must be at least `1`. `QuerySolutions.prototype.variables` is the array of selected variable names. + +A cursor pins an MVCC **snapshot** of the store taken at query time: it keeps yielding the rows that matched when it was opened even if the store is concurrently mutated (loaded, cleared, or updated), and it holds that snapshot resident until released. Call `QuerySolutions.prototype.free()` when done to release it. + +#### `Store.prototype.queryTriples(String query, object options)` +The triple-producing dual of `querySolutions()`: executes a [SPARQL 1.1 `CONSTRUCT` or `DESCRIBE` query](https://www.w3.org/TR/sparql11-query/#construct) and returns a `QueryTriples` cursor that yields `Quad`s lazily in bounded batches. Same options and snapshot/`free()` semantics as `querySolutions()`. It throws if the query is not a `CONSTRUCT` or `DESCRIBE`. + +```js +const cursor = store.queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"); +for (let batch = cursor.nextBatch(1000); batch.length > 0; batch = cursor.nextBatch(1000)) { + for (const quad of batch) { + console.log(quad.subject.value); + } +} +cursor.free(); +``` + #### `Store.prototype.update(String query, object options)` Executes a [SPARQL 1.1 Update](https://www.w3.org/TR/sparql11-update/). The [`LOAD` operation](https://www.w3.org/TR/sparql11-update/#load) is not supported yet. diff --git a/js/src/lib.rs b/js/src/lib.rs index adcdcd180..8558593d5 100644 --- a/js/src/lib.rs +++ b/js/src/lib.rs @@ -62,6 +62,26 @@ export class Store { } ): boolean | Map[] | Quad[] | string; + querySolutions( + query: string, + options?: { + base_iri?: NamedNode | string; + default_graph?: BlankNode | DefaultGraph | NamedNode | Iterable; + named_graphs?: Iterable; + use_default_graph_as_union?: boolean; + } + ): QuerySolutions; + + queryTriples( + query: string, + options?: { + base_iri?: NamedNode | string; + default_graph?: BlankNode | DefaultGraph | NamedNode | Iterable; + named_graphs?: Iterable; + use_default_graph_as_union?: boolean; + } + ): QueryTriples; + update( update: string, options?: { @@ -70,6 +90,20 @@ export class Store { ): void; } +export class QuerySolutions { + readonly variables: string[]; + + nextBatch(count: number): Map[]; + + free(): void; +} + +export class QueryTriples { + nextBatch(count: number): Quad[]; + + free(): void; +} + function parse( input: string | UInt8Array, options: { diff --git a/js/src/store.rs b/js/src/store.rs index 04e0b21e9..67b37a39d 100644 --- a/js/src/store.rs +++ b/js/src/store.rs @@ -6,7 +6,9 @@ use crate::utils::{to_option, to_option_ref}; use js_sys::{Array, Map, try_iter}; use oxigraph::io::{RdfParser, RdfSerializer}; use oxigraph::sparql::results::{QueryResultsFormat, QueryResultsSerializer}; -use oxigraph::sparql::{QueryResults, SparqlEvaluator}; +use oxigraph::sparql::{ + QueryResults, QuerySolutionIter, QueryTripleIter, SparqlEvaluator, Variable, +}; use oxigraph::store::Store; use wasm_bindgen::prelude::*; @@ -86,42 +88,10 @@ impl JsStore { } pub fn query(&self, query: &str, options: &JsValue) -> Result { - // Parsing options - let mut base_iri = None; - let mut use_default_graph_as_union = false; + // results_format only affects how query() materializes its output; the + // evaluation options are parsed by execute_query (shared with the cursors). let mut results_format = None; - let mut default_graph = None; - let mut named_graphs = None; if let Some(options) = to_option_ref(options) { - base_iri = convert_base_iri(&reflect_get(options, &BASE_IRI)?)?; - - default_graph = - if let Some(default_graph) = to_option(reflect_get(options, &DEFAULT_GRAPH)?) { - Some(if let Some(iter) = try_iter(&default_graph)? { - iter.map(|term| to_graph_name(&term?)) - .collect::, _>>()? - } else { - vec![to_graph_name(&default_graph)?] - }) - } else { - None - }; - - named_graphs = - if let Some(named_graphs) = to_option(reflect_get(options, &NAMED_GRAPHS)?) { - Some( - try_iter(&named_graphs)? - .ok_or_else(|| format_err!("named_graphs option must be iterable"))? - .map(|term| to_named_or_blank_node(&term?)) - .collect::, _>>()?, - ) - } else { - None - }; - - use_default_graph_as_union = - reflect_get(options, &USED_DEFAULT_GRAPH_AS_UNION)?.is_truthy(); - if let Some(js_results_format) = to_option(reflect_get(options, &RESULTS_FORMAT)?) { results_format = Some( js_results_format @@ -131,30 +101,7 @@ impl JsStore { } } - let mut evaluator = SparqlEvaluator::new(); - if let Some(base_iri) = base_iri { - evaluator = evaluator.with_base_iri(&base_iri).map_err(JsError::from)?; - } - - let mut prepared_query = evaluator.parse_query(&query).map_err(JsError::from)?; - if use_default_graph_as_union { - prepared_query.dataset_mut().set_default_graph_as_union(); - } - if let Some(default_graph) = default_graph { - prepared_query - .dataset_mut() - .set_default_graph(default_graph); - } - if let Some(named_graphs) = named_graphs { - prepared_query - .dataset_mut() - .set_available_named_graphs(named_graphs); - } - - let results = prepared_query - .on_store(&self.store) - .execute() - .map_err(JsError::from)?; + let results = self.execute_query(query, options)?; Ok(match results { QueryResults::Solutions(solutions) => { if let Some(results_format) = results_format { @@ -230,6 +177,113 @@ impl JsStore { }) } + // Parse the SPARQL query evaluation options (base_iri, default_graph, + // named_graphs, use_default_graph_as_union) and run it against a storage + // snapshot. on_store() snapshots rather than borrowing &Store, so the + // returned results — and any iterator inside them — are 'static and can be + // handed to a long-lived cursor. Shared by query(), querySolutions() and + // queryTriples() so they stay consistent. + fn execute_query( + &self, + query: &str, + options: &JsValue, + ) -> Result, JsValue> { + let mut base_iri = None; + let mut use_default_graph_as_union = false; + let mut default_graph = None; + let mut named_graphs = None; + if let Some(options) = to_option_ref(options) { + base_iri = convert_base_iri(&reflect_get(options, &BASE_IRI)?)?; + + default_graph = + if let Some(default_graph) = to_option(reflect_get(options, &DEFAULT_GRAPH)?) { + Some(if let Some(iter) = try_iter(&default_graph)? { + iter.map(|term| to_graph_name(&term?)) + .collect::, _>>()? + } else { + vec![to_graph_name(&default_graph)?] + }) + } else { + None + }; + + named_graphs = + if let Some(named_graphs) = to_option(reflect_get(options, &NAMED_GRAPHS)?) { + Some( + try_iter(&named_graphs)? + .ok_or_else(|| format_err!("named_graphs option must be iterable"))? + .map(|term| to_named_or_blank_node(&term?)) + .collect::, _>>()?, + ) + } else { + None + }; + + use_default_graph_as_union = + reflect_get(options, &USED_DEFAULT_GRAPH_AS_UNION)?.is_truthy(); + } + + let mut evaluator = SparqlEvaluator::new(); + if let Some(base_iri) = base_iri { + evaluator = evaluator.with_base_iri(&base_iri).map_err(JsError::from)?; + } + + let mut prepared_query = evaluator.parse_query(query).map_err(JsError::from)?; + if use_default_graph_as_union { + prepared_query.dataset_mut().set_default_graph_as_union(); + } + if let Some(default_graph) = default_graph { + prepared_query + .dataset_mut() + .set_default_graph(default_graph); + } + if let Some(named_graphs) = named_graphs { + prepared_query + .dataset_mut() + .set_available_named_graphs(named_graphs); + } + + prepared_query + .on_store(&self.store) + .execute() + .map_err(JsError::from) + .map_err(Into::into) + } + + // Lazy streaming SELECT. Returns a cursor that holds the QuerySolutionIter + // alive instead of draining it into an Array, so the caller can pull rows in + // bounded batches and never materializes the whole result set. + #[wasm_bindgen(js_name = querySolutions)] + pub fn query_solutions( + &self, + query: &str, + options: &JsValue, + ) -> Result { + match self.execute_query(query, options)? { + QueryResults::Solutions(solutions) => Ok(JsQuerySolutions { + variables: solutions.variables().to_vec(), + iter: solutions, + data_factory: default_data_factory(), + }), + _ => Err(format_err!("querySolutions only supports SELECT queries")), + } + } + + // Lazy streaming CONSTRUCT/DESCRIBE, the triple-producing dual of + // querySolutions(). Pulls quads in bounded batches. + #[wasm_bindgen(js_name = queryTriples)] + pub fn query_triples(&self, query: &str, options: &JsValue) -> Result { + match self.execute_query(query, options)? { + QueryResults::Graph(triples) => Ok(JsQueryTriples { + iter: triples, + data_factory: default_data_factory(), + }), + _ => Err(format_err!( + "queryTriples only supports CONSTRUCT and DESCRIBE queries" + )), + } + } + pub fn update(&self, update: &str, options: &JsValue) -> Result<(), JsValue> { // Parsing options let mut base_iri = None; @@ -339,6 +393,83 @@ impl JsStore { } } +// A JS-exposed cursor over a lazy SELECT iterator. The iterator is +// QuerySolutionIter<'static> because on_store() snapshots the storage rather +// than borrowing &Store, so there is no self-referential borrow. The snapshot +// is pinned until the cursor is dropped (wasm-bindgen's generated free()). +#[wasm_bindgen(js_name = QuerySolutions)] +pub struct JsQuerySolutions { + iter: QuerySolutionIter<'static>, + variables: Vec, + data_factory: DataFactory, +} + +#[wasm_bindgen(js_class = QuerySolutions)] +impl JsQuerySolutions { + #[wasm_bindgen(getter)] + pub fn variables(&self) -> Array { + self.variables + .iter() + .map(|v| JsValue::from_str(v.as_str())) + .collect() + } + + // Pull up to `count` rows. An empty Array signals exhaustion, so `count` + // must be at least 1. One wasm->JS crossing per batch, not per row. + #[wasm_bindgen(js_name = nextBatch)] + pub fn next_batch(&mut self, count: usize) -> Result { + if count == 0 { + return Err(format_err!("nextBatch requires a batch size of at least 1")); + } + let out = Array::new(); + for _ in 0..count { + let Some(solution) = self.iter.next() else { + break; + }; + let solution = solution.map_err(JsError::from)?; + let result = Map::new(); + for (variable, value) in solution.iter() { + result.set( + &variable.as_str().into(), + &from_term(&self.data_factory, value), + ); + } + out.push(&result.into()); + } + Ok(out) + } +} + +// The triple-producing dual of JsQuerySolutions, for CONSTRUCT/DESCRIBE. +#[wasm_bindgen(js_name = QueryTriples)] +pub struct JsQueryTriples { + iter: QueryTripleIter<'static>, + data_factory: DataFactory, +} + +#[wasm_bindgen(js_class = QueryTriples)] +impl JsQueryTriples { + // Pull up to `count` quads. An empty Array signals exhaustion, so `count` + // must be at least 1. + #[wasm_bindgen(js_name = nextBatch)] + pub fn next_batch(&mut self, count: usize) -> Result { + if count == 0 { + return Err(format_err!("nextBatch requires a batch size of at least 1")); + } + let out = Array::new(); + for _ in 0..count { + let Some(triple) = self.iter.next() else { + break; + }; + out.push(&from_triple( + &self.data_factory, + &triple.map_err(JsError::from)?, + )); + } + Ok(out) + } +} + fn query_results_format(format: &str) -> Result { if format.contains('/') { QueryResultsFormat::from_media_type(format).ok_or_else(|| { diff --git a/js/test/store.test.ts b/js/test/store.test.ts index f07585629..a8f4295f9 100644 --- a/js/test/store.test.ts +++ b/js/test/store.test.ts @@ -170,6 +170,211 @@ describe("Store", () => { }); }); + // Three distinct quads in the default graph, for batching tests. + const q1 = dataModel.quad(ex, ex, ex); + const q2 = dataModel.quad(ex, ex, ex2); + const q3 = dataModel.quad(ex2, ex, ex); + + // Pull every row out of a cursor, batchSize rows at a time, until exhausted. + function drainSolutions( + cursor: { nextBatch(count: number): Map[] }, + batchSize = 2, + ): Map[] { + const all: Map[] = []; + for (;;) { + const batch = cursor.nextBatch(batchSize); + if (batch.length === 0) break; + all.push(...batch); + } + return all; + } + + describe("#querySolutions()", () => { + it("exposes variables up front", () => { + const store = new Store([q1]); + const cursor = store.querySolutions("SELECT ?s ?p ?o WHERE { ?s ?p ?o }"); + assert.deepStrictEqual(["s", "p", "o"], cursor.variables); + }); + + it("streams in bounded batches, empty batch signals exhaustion", () => { + const store = new Store([q1, q2, q3]); + const cursor = store.querySolutions("SELECT ?s ?p ?o WHERE { ?s ?p ?o }"); + assert.strictEqual(2, cursor.nextBatch(2).length); + assert.strictEqual(1, cursor.nextBatch(2).length); + assert.strictEqual(0, cursor.nextBatch(2).length); + }); + + it("a batch larger than the result returns everything, then exhausts", () => { + const store = new Store([q1, q2, q3]); + const cursor = store.querySolutions("SELECT ?s ?p ?o WHERE { ?s ?p ?o }"); + assert.strictEqual(3, cursor.nextBatch(100).length); + assert.strictEqual(0, cursor.nextBatch(100).length); + }); + + it("exhaustion is stable across repeated pulls", () => { + const store = new Store([q1]); + const cursor = store.querySolutions("SELECT ?s WHERE { ?s ?p ?o }"); + assert.strictEqual(1, cursor.nextBatch(10).length); + assert.strictEqual(0, cursor.nextBatch(10).length); + assert.strictEqual(0, cursor.nextBatch(10).length); + }); + + it("an empty result set yields an empty batch but still has variables", () => { + const store = new Store([q1]); + const cursor = store.querySolutions("SELECT ?s WHERE { ?s ?p }"); + assert.deepStrictEqual(["s"], cursor.variables); + assert.strictEqual(0, cursor.nextBatch(10).length); + }); + + it("yields faithful RDF/JS terms", () => { + const lit = dataModel.literal("hi", "en"); + const store = new Store([dataModel.quad(ex, ex2, lit)]); + const rows = store.querySolutions("SELECT ?s ?o WHERE { ?s ?p ?o }").nextBatch(10); + assert.strictEqual(1, rows.length); + assert(ex.equals(rows[0]?.get("s"))); + assert(lit.equals(rows[0]?.get("o"))); + }); + + it("honours the base_iri option", () => { + const store = new Store(); + const rows = store + .querySolutions("SELECT * WHERE { BIND( AS ?t) }", { + base_iri: "http://example.com/", + }) + .nextBatch(10); + assert.strictEqual(1, rows.length); + assert(dataModel.namedNode("http://example.com/t").equals(rows[0]?.get("t"))); + }); + + it("honours use_default_graph_as_union", () => { + const store = new Store([dataModel.quad(ex, ex, ex, ex)]); + const cursor = store.querySolutions("SELECT * WHERE { ?s ?p ?o }", { + use_default_graph_as_union: true, + }); + assert.strictEqual(1, drainSolutions(cursor).length); + }); + + it("honours an explicit default_graph", () => { + const store = new Store([dataModel.quad(ex, ex, ex, ex)]); + const cursor = store.querySolutions("SELECT * WHERE { ?s ?p ?o }", { + default_graph: ex, + }); + assert.strictEqual(1, drainSolutions(cursor).length); + }); + + it("honours a named_graphs list", () => { + const store = new Store([ + dataModel.quad(ex, ex, ex, ex), + dataModel.quad(ex, ex, ex, ex2), + ]); + const cursor = store.querySolutions("SELECT * WHERE { GRAPH ?g { ?s ?p ?o } }", { + named_graphs: [ex], + }); + assert.strictEqual(1, drainSolutions(cursor).length); + }); + + it("rejects non-SELECT queries", () => { + const store = new Store([q1]); + assert.throws(() => store.querySolutions("ASK { ?s ?p ?o }"), /SELECT/); + assert.throws( + () => store.querySolutions("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"), + /SELECT/, + ); + assert.throws(() => store.querySolutions("DESCRIBE "), /SELECT/); + }); + + it("rejects a zero-sized batch (the empty batch is reserved for exhaustion)", () => { + const store = new Store([q1]); + const cursor = store.querySolutions("SELECT ?s WHERE { ?s ?p ?o }"); + assert.throws(() => cursor.nextBatch(0)); + }); + + it("is isolated from concurrent mutation (MVCC snapshot)", () => { + const store = new Store([q1, q2, q3]); + const cursor = store.querySolutions("SELECT ?s ?p ?o WHERE { ?s ?p ?o }"); + assert.strictEqual(1, cursor.nextBatch(1).length); + store.update("DELETE WHERE { ?s ?p ?o }"); + assert.strictEqual(0, store.size); + // The open cursor still sees the snapshot taken at query time. + assert.strictEqual(2, drainSolutions(cursor, 10).length); + }); + + it("can no longer be used after free()", () => { + const store = new Store([q1]); + const cursor = store.querySolutions("SELECT ?s WHERE { ?s ?p ?o }"); + cursor.free(); + assert.throws(() => cursor.nextBatch(1)); + }); + }); + + describe("#queryTriples()", () => { + it("streams triples in bounded batches, empty batch signals exhaustion", () => { + const store = new Store([q1, q2, q3]); + const cursor = store.queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"); + assert.strictEqual(2, cursor.nextBatch(2).length); + assert.strictEqual(1, cursor.nextBatch(2).length); + assert.strictEqual(0, cursor.nextBatch(2).length); + }); + + it("yields faithful RDF/JS quads", () => { + const store = new Store([q1]); + const rows = store + .queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }") + .nextBatch(10); + assert.strictEqual(1, rows.length); + assert(dataModel.quad(ex, ex, ex).equals(rows[0])); + }); + + it("supports DESCRIBE", () => { + const store = new Store([q1]); + const rows = store.queryTriples("DESCRIBE ").nextBatch(10); + assert(rows.length >= 1); + assert(dataModel.quad(ex, ex, ex).equals(rows[0])); + }); + + it("an empty result set yields an empty batch", () => { + const store = new Store([q1]); + const cursor = store.queryTriples( + "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p }", + ); + assert.strictEqual(0, cursor.nextBatch(10).length); + }); + + it("rejects non-CONSTRUCT/DESCRIBE queries", () => { + const store = new Store([q1]); + assert.throws(() => store.queryTriples("SELECT ?s WHERE { ?s ?p ?o }"), /CONSTRUCT/); + assert.throws(() => store.queryTriples("ASK { ?s ?p ?o }"), /CONSTRUCT/); + }); + + it("rejects a zero-sized batch", () => { + const store = new Store([q1]); + const cursor = store.queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"); + assert.throws(() => cursor.nextBatch(0)); + }); + + it("is isolated from concurrent mutation (MVCC snapshot)", () => { + const store = new Store([q1, q2, q3]); + const cursor = store.queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"); + assert.strictEqual(1, cursor.nextBatch(1).length); + store.update("DELETE WHERE { ?s ?p ?o }"); + assert.strictEqual(0, store.size); + let seen = 1; + for (;;) { + const batch = cursor.nextBatch(10); + if (batch.length === 0) break; + seen += batch.length; + } + assert.strictEqual(3, seen); + }); + + it("can no longer be used after free()", () => { + const store = new Store([q1]); + const cursor = store.queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"); + cursor.free(); + assert.throws(() => cursor.nextBatch(1)); + }); + }); + describe("#update()", () => { it("INSERT DATA", () => { const store = new Store();