Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions js/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,39 @@ console.log(store.query("ASK { <s> ?p ?o }", {
}));
```

#### `Store.prototype.querySolutions(String query, object options)`
Executes a [SPARQL 1.1 `SELECT` query](https://www.w3.org/TR/sparql11-query/#select) and returns a `QuerySolutions` cursor that yields results **lazily**, in bounded batches, instead of materializing the whole result set into an array like `query()` does. This keeps memory bounded for very large result sets — useful in particular for the 32-bit WASM heap and for streaming results across a worker boundary without serializing everything into one string.

It accepts the same evaluation options as `query()` (`base_iri`, `default_graph`, `named_graphs`, `use_default_graph_as_union`); `results_format` does not apply. It throws if the query is not a `SELECT`.

```js
const cursor = store.querySolutions("SELECT ?s ?p ?o WHERE { ?s ?p ?o }");
console.log(cursor.variables); // ["s", "p", "o"], available before pulling any row
for (let batch = cursor.nextBatch(1000); batch.length > 0; batch = cursor.nextBatch(1000)) {
for (const binding of batch) {
console.log(binding.get("s").value);
}
}
cursor.free(); // release the cursor's storage snapshot (see below)
```

`QuerySolutions.prototype.nextBatch(Number count)` pulls up to `count` solutions (each a `Map` from bound variable name to `Term`). An **empty array signals exhaustion**; because of this, `count` must be at least `1`. `QuerySolutions.prototype.variables` is the array of selected variable names.

A cursor pins an MVCC **snapshot** of the store taken at query time: it keeps yielding the rows that matched when it was opened even if the store is concurrently mutated (loaded, cleared, or updated), and it holds that snapshot resident until released. Call `QuerySolutions.prototype.free()` when done to release it.

#### `Store.prototype.queryTriples(String query, object options)`
The triple-producing dual of `querySolutions()`: executes a [SPARQL 1.1 `CONSTRUCT` or `DESCRIBE` query](https://www.w3.org/TR/sparql11-query/#construct) and returns a `QueryTriples` cursor that yields `Quad`s lazily in bounded batches. Same options and snapshot/`free()` semantics as `querySolutions()`. It throws if the query is not a `CONSTRUCT` or `DESCRIBE`.

```js
const cursor = store.queryTriples("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }");
for (let batch = cursor.nextBatch(1000); batch.length > 0; batch = cursor.nextBatch(1000)) {
for (const quad of batch) {
console.log(quad.subject.value);
}
}
cursor.free();
```

#### `Store.prototype.update(String query, object options)`
Executes a [SPARQL 1.1 Update](https://www.w3.org/TR/sparql11-update/).
The [`LOAD` operation](https://www.w3.org/TR/sparql11-update/#load) is not supported yet.
Expand Down
34 changes: 34 additions & 0 deletions js/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,26 @@ export class Store {
}
): boolean | Map<string, Term>[] | Quad[] | string;

querySolutions(
query: string,
options?: {
base_iri?: NamedNode | string;
default_graph?: BlankNode | DefaultGraph | NamedNode | Iterable<BlankNode | DefaultGraph | NamedNode>;
named_graphs?: Iterable<BlankNode | NamedNode>;
use_default_graph_as_union?: boolean;
}
): QuerySolutions;

queryTriples(
query: string,
options?: {
base_iri?: NamedNode | string;
default_graph?: BlankNode | DefaultGraph | NamedNode | Iterable<BlankNode | DefaultGraph | NamedNode>;
named_graphs?: Iterable<BlankNode | NamedNode>;
use_default_graph_as_union?: boolean;
}
): QueryTriples;

update(
update: string,
options?: {
Expand All @@ -70,6 +90,20 @@ export class Store {
): void;
}

export class QuerySolutions {
readonly variables: string[];

nextBatch(count: number): Map<string, Term>[];

free(): void;
}

export class QueryTriples {
nextBatch(count: number): Quad[];

free(): void;
}

function parse(
input: string | UInt8Array,
options: {
Expand Down
249 changes: 190 additions & 59 deletions js/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::utils::{to_option, to_option_ref};
use js_sys::{Array, Map, try_iter};
use oxigraph::io::{RdfParser, RdfSerializer};
use oxigraph::sparql::results::{QueryResultsFormat, QueryResultsSerializer};
use oxigraph::sparql::{QueryResults, SparqlEvaluator};
use oxigraph::sparql::{
QueryResults, QuerySolutionIter, QueryTripleIter, SparqlEvaluator, Variable,
};
use oxigraph::store::Store;
use wasm_bindgen::prelude::*;

Expand Down Expand Up @@ -86,42 +88,10 @@ impl JsStore {
}

pub fn query(&self, query: &str, options: &JsValue) -> Result<JsValue, JsValue> {
// Parsing options
let mut base_iri = None;
let mut use_default_graph_as_union = false;
// results_format only affects how query() materializes its output; the
// evaluation options are parsed by execute_query (shared with the cursors).
let mut results_format = None;
let mut default_graph = None;
let mut named_graphs = None;
if let Some(options) = to_option_ref(options) {
base_iri = convert_base_iri(&reflect_get(options, &BASE_IRI)?)?;

default_graph =
if let Some(default_graph) = to_option(reflect_get(options, &DEFAULT_GRAPH)?) {
Some(if let Some(iter) = try_iter(&default_graph)? {
iter.map(|term| to_graph_name(&term?))
.collect::<Result<Vec<_>, _>>()?
} else {
vec![to_graph_name(&default_graph)?]
})
} else {
None
};

named_graphs =
if let Some(named_graphs) = to_option(reflect_get(options, &NAMED_GRAPHS)?) {
Some(
try_iter(&named_graphs)?
.ok_or_else(|| format_err!("named_graphs option must be iterable"))?
.map(|term| to_named_or_blank_node(&term?))
.collect::<Result<Vec<_>, _>>()?,
)
} else {
None
};

use_default_graph_as_union =
reflect_get(options, &USED_DEFAULT_GRAPH_AS_UNION)?.is_truthy();

if let Some(js_results_format) = to_option(reflect_get(options, &RESULTS_FORMAT)?) {
results_format = Some(
js_results_format
Expand All @@ -131,30 +101,7 @@ impl JsStore {
}
}

let mut evaluator = SparqlEvaluator::new();
if let Some(base_iri) = base_iri {
evaluator = evaluator.with_base_iri(&base_iri).map_err(JsError::from)?;
}

let mut prepared_query = evaluator.parse_query(&query).map_err(JsError::from)?;
if use_default_graph_as_union {
prepared_query.dataset_mut().set_default_graph_as_union();
}
if let Some(default_graph) = default_graph {
prepared_query
.dataset_mut()
.set_default_graph(default_graph);
}
if let Some(named_graphs) = named_graphs {
prepared_query
.dataset_mut()
.set_available_named_graphs(named_graphs);
}

let results = prepared_query
.on_store(&self.store)
.execute()
.map_err(JsError::from)?;
let results = self.execute_query(query, options)?;
Ok(match results {
QueryResults::Solutions(solutions) => {
if let Some(results_format) = results_format {
Expand Down Expand Up @@ -230,6 +177,113 @@ impl JsStore {
})
}

// Parse the SPARQL query evaluation options (base_iri, default_graph,
// named_graphs, use_default_graph_as_union) and run it against a storage
// snapshot. on_store() snapshots rather than borrowing &Store, so the
// returned results — and any iterator inside them — are 'static and can be
// handed to a long-lived cursor. Shared by query(), querySolutions() and
// queryTriples() so they stay consistent.
fn execute_query(
&self,
query: &str,
options: &JsValue,
) -> Result<QueryResults<'static>, JsValue> {
let mut base_iri = None;
let mut use_default_graph_as_union = false;
let mut default_graph = None;
let mut named_graphs = None;
if let Some(options) = to_option_ref(options) {
base_iri = convert_base_iri(&reflect_get(options, &BASE_IRI)?)?;

default_graph =
if let Some(default_graph) = to_option(reflect_get(options, &DEFAULT_GRAPH)?) {
Some(if let Some(iter) = try_iter(&default_graph)? {
iter.map(|term| to_graph_name(&term?))
.collect::<Result<Vec<_>, _>>()?
} else {
vec![to_graph_name(&default_graph)?]
})
} else {
None
};

named_graphs =
if let Some(named_graphs) = to_option(reflect_get(options, &NAMED_GRAPHS)?) {
Some(
try_iter(&named_graphs)?
.ok_or_else(|| format_err!("named_graphs option must be iterable"))?
.map(|term| to_named_or_blank_node(&term?))
.collect::<Result<Vec<_>, _>>()?,
)
} else {
None
};

use_default_graph_as_union =
reflect_get(options, &USED_DEFAULT_GRAPH_AS_UNION)?.is_truthy();
}

let mut evaluator = SparqlEvaluator::new();
if let Some(base_iri) = base_iri {
evaluator = evaluator.with_base_iri(&base_iri).map_err(JsError::from)?;
}

let mut prepared_query = evaluator.parse_query(query).map_err(JsError::from)?;
if use_default_graph_as_union {
prepared_query.dataset_mut().set_default_graph_as_union();
}
if let Some(default_graph) = default_graph {
prepared_query
.dataset_mut()
.set_default_graph(default_graph);
}
if let Some(named_graphs) = named_graphs {
prepared_query
.dataset_mut()
.set_available_named_graphs(named_graphs);
}

prepared_query
.on_store(&self.store)
.execute()
.map_err(JsError::from)
.map_err(Into::into)
}

// Lazy streaming SELECT. Returns a cursor that holds the QuerySolutionIter
// alive instead of draining it into an Array, so the caller can pull rows in
// bounded batches and never materializes the whole result set.
#[wasm_bindgen(js_name = querySolutions)]
pub fn query_solutions(
&self,
query: &str,
options: &JsValue,
) -> Result<JsQuerySolutions, JsValue> {
match self.execute_query(query, options)? {
QueryResults::Solutions(solutions) => Ok(JsQuerySolutions {
variables: solutions.variables().to_vec(),
iter: solutions,
data_factory: default_data_factory(),
}),
_ => Err(format_err!("querySolutions only supports SELECT queries")),
}
}

// Lazy streaming CONSTRUCT/DESCRIBE, the triple-producing dual of
// querySolutions(). Pulls quads in bounded batches.
#[wasm_bindgen(js_name = queryTriples)]
pub fn query_triples(&self, query: &str, options: &JsValue) -> Result<JsQueryTriples, JsValue> {
match self.execute_query(query, options)? {
QueryResults::Graph(triples) => Ok(JsQueryTriples {
iter: triples,
data_factory: default_data_factory(),
}),
_ => Err(format_err!(
"queryTriples only supports CONSTRUCT and DESCRIBE queries"
)),
}
}

pub fn update(&self, update: &str, options: &JsValue) -> Result<(), JsValue> {
// Parsing options
let mut base_iri = None;
Expand Down Expand Up @@ -339,6 +393,83 @@ impl JsStore {
}
}

// A JS-exposed cursor over a lazy SELECT iterator. The iterator is
// QuerySolutionIter<'static> because on_store() snapshots the storage rather
// than borrowing &Store, so there is no self-referential borrow. The snapshot
// is pinned until the cursor is dropped (wasm-bindgen's generated free()).
#[wasm_bindgen(js_name = QuerySolutions)]
pub struct JsQuerySolutions {
iter: QuerySolutionIter<'static>,
variables: Vec<Variable>,
data_factory: DataFactory,
}

#[wasm_bindgen(js_class = QuerySolutions)]
impl JsQuerySolutions {
#[wasm_bindgen(getter)]
pub fn variables(&self) -> Array {
self.variables
.iter()
.map(|v| JsValue::from_str(v.as_str()))
.collect()
}

// Pull up to `count` rows. An empty Array signals exhaustion, so `count`
// must be at least 1. One wasm->JS crossing per batch, not per row.
#[wasm_bindgen(js_name = nextBatch)]
pub fn next_batch(&mut self, count: usize) -> Result<Array, JsValue> {
if count == 0 {
return Err(format_err!("nextBatch requires a batch size of at least 1"));
}
let out = Array::new();
for _ in 0..count {
let Some(solution) = self.iter.next() else {
break;
};
let solution = solution.map_err(JsError::from)?;
let result = Map::new();
for (variable, value) in solution.iter() {
result.set(
&variable.as_str().into(),
&from_term(&self.data_factory, value),
);
}
out.push(&result.into());
}
Ok(out)
}
}

// The triple-producing dual of JsQuerySolutions, for CONSTRUCT/DESCRIBE.
#[wasm_bindgen(js_name = QueryTriples)]
pub struct JsQueryTriples {
iter: QueryTripleIter<'static>,
data_factory: DataFactory,
}

#[wasm_bindgen(js_class = QueryTriples)]
impl JsQueryTriples {
// Pull up to `count` quads. An empty Array signals exhaustion, so `count`
// must be at least 1.
#[wasm_bindgen(js_name = nextBatch)]
pub fn next_batch(&mut self, count: usize) -> Result<Array, JsValue> {
if count == 0 {
return Err(format_err!("nextBatch requires a batch size of at least 1"));
}
let out = Array::new();
for _ in 0..count {
let Some(triple) = self.iter.next() else {
break;
};
out.push(&from_triple(
&self.data_factory,
&triple.map_err(JsError::from)?,
));
}
Ok(out)
}
}

fn query_results_format(format: &str) -> Result<QueryResultsFormat, JsValue> {
if format.contains('/') {
QueryResultsFormat::from_media_type(format).ok_or_else(|| {
Expand Down
Loading
Loading