diff --git a/js/README.md b/js/README.md index e6459e09a..dd0b46ceb 100644 --- a/js/README.md +++ b/js/README.md @@ -182,6 +182,39 @@ console.log(store.query("ASK { ?p ?o }", { })); ``` +#### `Store.prototype.querySolutions(String query, object options)` +Executes a [SPARQL 1.1 `SELECT` query](https://www.w3.org/TR/sparql11-query/#select) and returns a `QuerySolutions` cursor that yields results **lazily**, in bounded batches, instead of materializing the whole result set into an array like `query()` does. This keeps memory bounded for very large result sets — useful in particular for the 32-bit WASM heap and for streaming results across a worker boundary without serializing everything into one string. + +It accepts the same evaluation options as `query()` (`base_iri`, `default_graph`, `named_graphs`, `use_default_graph_as_union`); `results_format` does not apply. It throws if the query is not a `SELECT`. + +```js +const cursor = store.querySolutions("SELECT ?s ?p ?o WHERE { ?s ?p ?o }"); +console.log(cursor.variables); // ["s", "p", "o"], available before pulling any row +for (let batch = cursor.nextBatch(1000); batch.length > 0; batch = cursor.nextBatch(1000)) { + for (const binding of batch) { + console.log(binding.get("s").value); + } +} +cursor.free(); // release the cursor's storage snapshot (see below) +``` + +`QuerySolutions.prototype.nextBatch(Number count)` pulls up to `count` solutions (each a `Map` from bound variable name to `Term`). An **empty array signals exhaustion**; because of this, `count` must be at least `1`. `QuerySolutions.prototype.variables` is the array of selected variable names. + +A cursor pins an MVCC **snapshot** of the store taken at query time: it keeps yielding the rows that matched when it was opened even if the store is concurrently mutated (loaded, cleared, or updated), and it holds that snapshot resident until released. Call `QuerySolutions.prototype.free()` when done to release it. + +#### `Store.prototype.queryTriples(String query, object options)` +The triple-producing dual of `querySolutions()`: executes a [SPARQL 1.1 `CONSTRUCT` or `DESCRIBE` query](https://www.w3.org/TR/sparql11-query/#construct) and returns a `QueryTriples` cursor that yields `Quad`s lazily in bounded batches. Same options and snapshot/`free()` semantics as `querySolutions()`. It throws if the query is not a `CONSTRUCT` or `DESCRIBE`. + +```js +const cursor = store.queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"); +for (let batch = cursor.nextBatch(1000); batch.length > 0; batch = cursor.nextBatch(1000)) { + for (const quad of batch) { + console.log(quad.subject.value); + } +} +cursor.free(); +``` + #### `Store.prototype.update(String query, object options)` Executes a [SPARQL 1.1 Update](https://www.w3.org/TR/sparql11-update/). The [`LOAD` operation](https://www.w3.org/TR/sparql11-update/#load) is not supported yet. diff --git a/js/src/lib.rs b/js/src/lib.rs index adcdcd180..8558593d5 100644 --- a/js/src/lib.rs +++ b/js/src/lib.rs @@ -62,6 +62,26 @@ export class Store { } ): boolean | Map[] | Quad[] | string; + querySolutions( + query: string, + options?: { + base_iri?: NamedNode | string; + default_graph?: BlankNode | DefaultGraph | NamedNode | Iterable; + named_graphs?: Iterable; + use_default_graph_as_union?: boolean; + } + ): QuerySolutions; + + queryTriples( + query: string, + options?: { + base_iri?: NamedNode | string; + default_graph?: BlankNode | DefaultGraph | NamedNode | Iterable; + named_graphs?: Iterable; + use_default_graph_as_union?: boolean; + } + ): QueryTriples; + update( update: string, options?: { @@ -70,6 +90,20 @@ export class Store { ): void; } +export class QuerySolutions { + readonly variables: string[]; + + nextBatch(count: number): Map[]; + + free(): void; +} + +export class QueryTriples { + nextBatch(count: number): Quad[]; + + free(): void; +} + function parse( input: string | UInt8Array, options: { diff --git a/js/src/store.rs b/js/src/store.rs index 04e0b21e9..67b37a39d 100644 --- a/js/src/store.rs +++ b/js/src/store.rs @@ -6,7 +6,9 @@ use crate::utils::{to_option, to_option_ref}; use js_sys::{Array, Map, try_iter}; use oxigraph::io::{RdfParser, RdfSerializer}; use oxigraph::sparql::results::{QueryResultsFormat, QueryResultsSerializer}; -use oxigraph::sparql::{QueryResults, SparqlEvaluator}; +use oxigraph::sparql::{ + QueryResults, QuerySolutionIter, QueryTripleIter, SparqlEvaluator, Variable, +}; use oxigraph::store::Store; use wasm_bindgen::prelude::*; @@ -86,42 +88,10 @@ impl JsStore { } pub fn query(&self, query: &str, options: &JsValue) -> Result { - // Parsing options - let mut base_iri = None; - let mut use_default_graph_as_union = false; + // results_format only affects how query() materializes its output; the + // evaluation options are parsed by execute_query (shared with the cursors). let mut results_format = None; - let mut default_graph = None; - let mut named_graphs = None; if let Some(options) = to_option_ref(options) { - base_iri = convert_base_iri(&reflect_get(options, &BASE_IRI)?)?; - - default_graph = - if let Some(default_graph) = to_option(reflect_get(options, &DEFAULT_GRAPH)?) { - Some(if let Some(iter) = try_iter(&default_graph)? { - iter.map(|term| to_graph_name(&term?)) - .collect::, _>>()? - } else { - vec![to_graph_name(&default_graph)?] - }) - } else { - None - }; - - named_graphs = - if let Some(named_graphs) = to_option(reflect_get(options, &NAMED_GRAPHS)?) { - Some( - try_iter(&named_graphs)? - .ok_or_else(|| format_err!("named_graphs option must be iterable"))? - .map(|term| to_named_or_blank_node(&term?)) - .collect::, _>>()?, - ) - } else { - None - }; - - use_default_graph_as_union = - reflect_get(options, &USED_DEFAULT_GRAPH_AS_UNION)?.is_truthy(); - if let Some(js_results_format) = to_option(reflect_get(options, &RESULTS_FORMAT)?) { results_format = Some( js_results_format @@ -131,30 +101,7 @@ impl JsStore { } } - let mut evaluator = SparqlEvaluator::new(); - if let Some(base_iri) = base_iri { - evaluator = evaluator.with_base_iri(&base_iri).map_err(JsError::from)?; - } - - let mut prepared_query = evaluator.parse_query(&query).map_err(JsError::from)?; - if use_default_graph_as_union { - prepared_query.dataset_mut().set_default_graph_as_union(); - } - if let Some(default_graph) = default_graph { - prepared_query - .dataset_mut() - .set_default_graph(default_graph); - } - if let Some(named_graphs) = named_graphs { - prepared_query - .dataset_mut() - .set_available_named_graphs(named_graphs); - } - - let results = prepared_query - .on_store(&self.store) - .execute() - .map_err(JsError::from)?; + let results = self.execute_query(query, options)?; Ok(match results { QueryResults::Solutions(solutions) => { if let Some(results_format) = results_format { @@ -230,6 +177,113 @@ impl JsStore { }) } + // Parse the SPARQL query evaluation options (base_iri, default_graph, + // named_graphs, use_default_graph_as_union) and run it against a storage + // snapshot. on_store() snapshots rather than borrowing &Store, so the + // returned results — and any iterator inside them — are 'static and can be + // handed to a long-lived cursor. Shared by query(), querySolutions() and + // queryTriples() so they stay consistent. + fn execute_query( + &self, + query: &str, + options: &JsValue, + ) -> Result, JsValue> { + let mut base_iri = None; + let mut use_default_graph_as_union = false; + let mut default_graph = None; + let mut named_graphs = None; + if let Some(options) = to_option_ref(options) { + base_iri = convert_base_iri(&reflect_get(options, &BASE_IRI)?)?; + + default_graph = + if let Some(default_graph) = to_option(reflect_get(options, &DEFAULT_GRAPH)?) { + Some(if let Some(iter) = try_iter(&default_graph)? { + iter.map(|term| to_graph_name(&term?)) + .collect::, _>>()? + } else { + vec![to_graph_name(&default_graph)?] + }) + } else { + None + }; + + named_graphs = + if let Some(named_graphs) = to_option(reflect_get(options, &NAMED_GRAPHS)?) { + Some( + try_iter(&named_graphs)? + .ok_or_else(|| format_err!("named_graphs option must be iterable"))? + .map(|term| to_named_or_blank_node(&term?)) + .collect::, _>>()?, + ) + } else { + None + }; + + use_default_graph_as_union = + reflect_get(options, &USED_DEFAULT_GRAPH_AS_UNION)?.is_truthy(); + } + + let mut evaluator = SparqlEvaluator::new(); + if let Some(base_iri) = base_iri { + evaluator = evaluator.with_base_iri(&base_iri).map_err(JsError::from)?; + } + + let mut prepared_query = evaluator.parse_query(query).map_err(JsError::from)?; + if use_default_graph_as_union { + prepared_query.dataset_mut().set_default_graph_as_union(); + } + if let Some(default_graph) = default_graph { + prepared_query + .dataset_mut() + .set_default_graph(default_graph); + } + if let Some(named_graphs) = named_graphs { + prepared_query + .dataset_mut() + .set_available_named_graphs(named_graphs); + } + + prepared_query + .on_store(&self.store) + .execute() + .map_err(JsError::from) + .map_err(Into::into) + } + + // Lazy streaming SELECT. Returns a cursor that holds the QuerySolutionIter + // alive instead of draining it into an Array, so the caller can pull rows in + // bounded batches and never materializes the whole result set. + #[wasm_bindgen(js_name = querySolutions)] + pub fn query_solutions( + &self, + query: &str, + options: &JsValue, + ) -> Result { + match self.execute_query(query, options)? { + QueryResults::Solutions(solutions) => Ok(JsQuerySolutions { + variables: solutions.variables().to_vec(), + iter: solutions, + data_factory: default_data_factory(), + }), + _ => Err(format_err!("querySolutions only supports SELECT queries")), + } + } + + // Lazy streaming CONSTRUCT/DESCRIBE, the triple-producing dual of + // querySolutions(). Pulls quads in bounded batches. + #[wasm_bindgen(js_name = queryTriples)] + pub fn query_triples(&self, query: &str, options: &JsValue) -> Result { + match self.execute_query(query, options)? { + QueryResults::Graph(triples) => Ok(JsQueryTriples { + iter: triples, + data_factory: default_data_factory(), + }), + _ => Err(format_err!( + "queryTriples only supports CONSTRUCT and DESCRIBE queries" + )), + } + } + pub fn update(&self, update: &str, options: &JsValue) -> Result<(), JsValue> { // Parsing options let mut base_iri = None; @@ -339,6 +393,83 @@ impl JsStore { } } +// A JS-exposed cursor over a lazy SELECT iterator. The iterator is +// QuerySolutionIter<'static> because on_store() snapshots the storage rather +// than borrowing &Store, so there is no self-referential borrow. The snapshot +// is pinned until the cursor is dropped (wasm-bindgen's generated free()). +#[wasm_bindgen(js_name = QuerySolutions)] +pub struct JsQuerySolutions { + iter: QuerySolutionIter<'static>, + variables: Vec, + data_factory: DataFactory, +} + +#[wasm_bindgen(js_class = QuerySolutions)] +impl JsQuerySolutions { + #[wasm_bindgen(getter)] + pub fn variables(&self) -> Array { + self.variables + .iter() + .map(|v| JsValue::from_str(v.as_str())) + .collect() + } + + // Pull up to `count` rows. An empty Array signals exhaustion, so `count` + // must be at least 1. One wasm->JS crossing per batch, not per row. + #[wasm_bindgen(js_name = nextBatch)] + pub fn next_batch(&mut self, count: usize) -> Result { + if count == 0 { + return Err(format_err!("nextBatch requires a batch size of at least 1")); + } + let out = Array::new(); + for _ in 0..count { + let Some(solution) = self.iter.next() else { + break; + }; + let solution = solution.map_err(JsError::from)?; + let result = Map::new(); + for (variable, value) in solution.iter() { + result.set( + &variable.as_str().into(), + &from_term(&self.data_factory, value), + ); + } + out.push(&result.into()); + } + Ok(out) + } +} + +// The triple-producing dual of JsQuerySolutions, for CONSTRUCT/DESCRIBE. +#[wasm_bindgen(js_name = QueryTriples)] +pub struct JsQueryTriples { + iter: QueryTripleIter<'static>, + data_factory: DataFactory, +} + +#[wasm_bindgen(js_class = QueryTriples)] +impl JsQueryTriples { + // Pull up to `count` quads. An empty Array signals exhaustion, so `count` + // must be at least 1. + #[wasm_bindgen(js_name = nextBatch)] + pub fn next_batch(&mut self, count: usize) -> Result { + if count == 0 { + return Err(format_err!("nextBatch requires a batch size of at least 1")); + } + let out = Array::new(); + for _ in 0..count { + let Some(triple) = self.iter.next() else { + break; + }; + out.push(&from_triple( + &self.data_factory, + &triple.map_err(JsError::from)?, + )); + } + Ok(out) + } +} + fn query_results_format(format: &str) -> Result { if format.contains('/') { QueryResultsFormat::from_media_type(format).ok_or_else(|| { diff --git a/js/test/store.test.ts b/js/test/store.test.ts index f07585629..a8f4295f9 100644 --- a/js/test/store.test.ts +++ b/js/test/store.test.ts @@ -170,6 +170,211 @@ describe("Store", () => { }); }); + // Three distinct quads in the default graph, for batching tests. + const q1 = dataModel.quad(ex, ex, ex); + const q2 = dataModel.quad(ex, ex, ex2); + const q3 = dataModel.quad(ex2, ex, ex); + + // Pull every row out of a cursor, batchSize rows at a time, until exhausted. + function drainSolutions( + cursor: { nextBatch(count: number): Map[] }, + batchSize = 2, + ): Map[] { + const all: Map[] = []; + for (;;) { + const batch = cursor.nextBatch(batchSize); + if (batch.length === 0) break; + all.push(...batch); + } + return all; + } + + describe("#querySolutions()", () => { + it("exposes variables up front", () => { + const store = new Store([q1]); + const cursor = store.querySolutions("SELECT ?s ?p ?o WHERE { ?s ?p ?o }"); + assert.deepStrictEqual(["s", "p", "o"], cursor.variables); + }); + + it("streams in bounded batches, empty batch signals exhaustion", () => { + const store = new Store([q1, q2, q3]); + const cursor = store.querySolutions("SELECT ?s ?p ?o WHERE { ?s ?p ?o }"); + assert.strictEqual(2, cursor.nextBatch(2).length); + assert.strictEqual(1, cursor.nextBatch(2).length); + assert.strictEqual(0, cursor.nextBatch(2).length); + }); + + it("a batch larger than the result returns everything, then exhausts", () => { + const store = new Store([q1, q2, q3]); + const cursor = store.querySolutions("SELECT ?s ?p ?o WHERE { ?s ?p ?o }"); + assert.strictEqual(3, cursor.nextBatch(100).length); + assert.strictEqual(0, cursor.nextBatch(100).length); + }); + + it("exhaustion is stable across repeated pulls", () => { + const store = new Store([q1]); + const cursor = store.querySolutions("SELECT ?s WHERE { ?s ?p ?o }"); + assert.strictEqual(1, cursor.nextBatch(10).length); + assert.strictEqual(0, cursor.nextBatch(10).length); + assert.strictEqual(0, cursor.nextBatch(10).length); + }); + + it("an empty result set yields an empty batch but still has variables", () => { + const store = new Store([q1]); + const cursor = store.querySolutions("SELECT ?s WHERE { ?s ?p }"); + assert.deepStrictEqual(["s"], cursor.variables); + assert.strictEqual(0, cursor.nextBatch(10).length); + }); + + it("yields faithful RDF/JS terms", () => { + const lit = dataModel.literal("hi", "en"); + const store = new Store([dataModel.quad(ex, ex2, lit)]); + const rows = store.querySolutions("SELECT ?s ?o WHERE { ?s ?p ?o }").nextBatch(10); + assert.strictEqual(1, rows.length); + assert(ex.equals(rows[0]?.get("s"))); + assert(lit.equals(rows[0]?.get("o"))); + }); + + it("honours the base_iri option", () => { + const store = new Store(); + const rows = store + .querySolutions("SELECT * WHERE { BIND( AS ?t) }", { + base_iri: "http://example.com/", + }) + .nextBatch(10); + assert.strictEqual(1, rows.length); + assert(dataModel.namedNode("http://example.com/t").equals(rows[0]?.get("t"))); + }); + + it("honours use_default_graph_as_union", () => { + const store = new Store([dataModel.quad(ex, ex, ex, ex)]); + const cursor = store.querySolutions("SELECT * WHERE { ?s ?p ?o }", { + use_default_graph_as_union: true, + }); + assert.strictEqual(1, drainSolutions(cursor).length); + }); + + it("honours an explicit default_graph", () => { + const store = new Store([dataModel.quad(ex, ex, ex, ex)]); + const cursor = store.querySolutions("SELECT * WHERE { ?s ?p ?o }", { + default_graph: ex, + }); + assert.strictEqual(1, drainSolutions(cursor).length); + }); + + it("honours a named_graphs list", () => { + const store = new Store([ + dataModel.quad(ex, ex, ex, ex), + dataModel.quad(ex, ex, ex, ex2), + ]); + const cursor = store.querySolutions("SELECT * WHERE { GRAPH ?g { ?s ?p ?o } }", { + named_graphs: [ex], + }); + assert.strictEqual(1, drainSolutions(cursor).length); + }); + + it("rejects non-SELECT queries", () => { + const store = new Store([q1]); + assert.throws(() => store.querySolutions("ASK { ?s ?p ?o }"), /SELECT/); + assert.throws( + () => store.querySolutions("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"), + /SELECT/, + ); + assert.throws(() => store.querySolutions("DESCRIBE "), /SELECT/); + }); + + it("rejects a zero-sized batch (the empty batch is reserved for exhaustion)", () => { + const store = new Store([q1]); + const cursor = store.querySolutions("SELECT ?s WHERE { ?s ?p ?o }"); + assert.throws(() => cursor.nextBatch(0)); + }); + + it("is isolated from concurrent mutation (MVCC snapshot)", () => { + const store = new Store([q1, q2, q3]); + const cursor = store.querySolutions("SELECT ?s ?p ?o WHERE { ?s ?p ?o }"); + assert.strictEqual(1, cursor.nextBatch(1).length); + store.update("DELETE WHERE { ?s ?p ?o }"); + assert.strictEqual(0, store.size); + // The open cursor still sees the snapshot taken at query time. + assert.strictEqual(2, drainSolutions(cursor, 10).length); + }); + + it("can no longer be used after free()", () => { + const store = new Store([q1]); + const cursor = store.querySolutions("SELECT ?s WHERE { ?s ?p ?o }"); + cursor.free(); + assert.throws(() => cursor.nextBatch(1)); + }); + }); + + describe("#queryTriples()", () => { + it("streams triples in bounded batches, empty batch signals exhaustion", () => { + const store = new Store([q1, q2, q3]); + const cursor = store.queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"); + assert.strictEqual(2, cursor.nextBatch(2).length); + assert.strictEqual(1, cursor.nextBatch(2).length); + assert.strictEqual(0, cursor.nextBatch(2).length); + }); + + it("yields faithful RDF/JS quads", () => { + const store = new Store([q1]); + const rows = store + .queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }") + .nextBatch(10); + assert.strictEqual(1, rows.length); + assert(dataModel.quad(ex, ex, ex).equals(rows[0])); + }); + + it("supports DESCRIBE", () => { + const store = new Store([q1]); + const rows = store.queryTriples("DESCRIBE ").nextBatch(10); + assert(rows.length >= 1); + assert(dataModel.quad(ex, ex, ex).equals(rows[0])); + }); + + it("an empty result set yields an empty batch", () => { + const store = new Store([q1]); + const cursor = store.queryTriples( + "CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p }", + ); + assert.strictEqual(0, cursor.nextBatch(10).length); + }); + + it("rejects non-CONSTRUCT/DESCRIBE queries", () => { + const store = new Store([q1]); + assert.throws(() => store.queryTriples("SELECT ?s WHERE { ?s ?p ?o }"), /CONSTRUCT/); + assert.throws(() => store.queryTriples("ASK { ?s ?p ?o }"), /CONSTRUCT/); + }); + + it("rejects a zero-sized batch", () => { + const store = new Store([q1]); + const cursor = store.queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"); + assert.throws(() => cursor.nextBatch(0)); + }); + + it("is isolated from concurrent mutation (MVCC snapshot)", () => { + const store = new Store([q1, q2, q3]); + const cursor = store.queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"); + assert.strictEqual(1, cursor.nextBatch(1).length); + store.update("DELETE WHERE { ?s ?p ?o }"); + assert.strictEqual(0, store.size); + let seen = 1; + for (;;) { + const batch = cursor.nextBatch(10); + if (batch.length === 0) break; + seen += batch.length; + } + assert.strictEqual(3, seen); + }); + + it("can no longer be used after free()", () => { + const store = new Store([q1]); + const cursor = store.queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }"); + cursor.free(); + assert.throws(() => cursor.nextBatch(1)); + }); + }); + describe("#update()", () => { it("INSERT DATA", () => { const store = new Store();