Skip to content

Search as a Configurable Pipeline instance: transactional Writer, two update modes, per-reference label sources #534

Description

@ddeboer

Problem Statement

The Dataset Register indexes dataset descriptions for search today via a bespoke full-rebuild indexer. The next step is indexing the objects inside registered datasets (SCHEMA-AP-NDE Person, CreativeWork, Organization, …) – potentially millions of records across many datasets, most of them static between runs.

The current search indexing is not an LDE pipeline. It does not reuse the pipeline’s Discovery / Selection / change-detection spine, it cannot skip unchanged datasets, and its full-rebuild-from-zero rebuild does not scale to object volumes. There is no shared, domain-agnostic way to express “index these RDF entities into a search engine” as an LDE pipeline, and no home for run-level lifecycle (alias swap, deletion sweeps, cross-pod locking) in the current per-dataset Writer.

Solution

Model search / object indexing as an LDE Configurable Pipeline instance, reusing the existing spine (Discovery → Selection → Iteration → change-gate) and the already-merged skip-unchanged machinery (#450). Split the work at the transaction boundary: the reusable transforms (extraction, projection) are pipeline stages, and the Writer – load + finalize – is a transaction-aware terminal with a run lifecycle, so Blue/green Rebuild (catalog + Knowledge Graph) and In-place Rebuild (objects) both have a home. Keep everything domain-agnostic: LDE ships capabilities (references, label sources, update modes, transaction-aware Writer, federated search); the deployment (Dataset Register) supplies the domain.

User Stories

  1. As a pipeline author, I want to index RDF entities into a search engine as an LDE pipeline, reusing Discovery / Selection / change-detection, so that I do not rebuild that spine per consumer.
  2. As a pipeline author with a large, mostly-static corpus, I want to skip re-extracting unchanged datasets and update only changed ones in place, so that a daily run does not re-index the whole corpus.
  3. As a pipeline author, I want atomic Blue/green Rebuilds for small collections and In-place Rebuilds for large ones, chosen by the Writer implementation, so that I pick the right consistency/cost trade-off without the pipeline branching on mode.
  4. As a pipeline author, I want deletions handled without special-casing, so that removed records and removed datasets leave the index correctly.
  5. As a search API consumer, I want references (creator, subject, publisher) resolved to labels server-side, so that clients never perform joins.
  6. As a search API consumer, I want to search a high-cardinality facet by name with live counts, so that I can find a specific value among thousands.
  7. As an operator, I want indexing to stay low-memory regardless of dataset size, so that a gigantic dump does not exhaust memory.
  8. As an LDE maintainer, I want LDE to stay domain-agnostic, so that the same capabilities serve any deployment, not only the Dataset Register.

Implementation Decisions

Pipeline shape

Two grains, one spine

Terminology (from the Stack): a search record takes one of two grainscatalog grain (one document per dataset description) or object grain (one per object). Grain is distinct from substrate (the body of source data: A = descriptions, B = dataset contents, C = terminology); for search the substrate follows from the grain — object grain rides substrate B, catalog grain rides substrate A.

Both grains start at the Dataset Register through the same DatasetSelector (RegistrySelector) – that spine is shared. They diverge immediately after selection, in their Reader – what it reads and produces quads from:

Step Catalog grain (descriptions, substrate A) Object grain (substrate B)
Select RegistrySelectorall datasets RegistrySelector – a subset via criteria (datasets with an indexable, SCHEMA-AP-conformant object distribution)
Reader input the dataset description in the register / DKG (central) each dataset’s own distribution (remote SPARQL endpoint / dump)
Extract CONSTRUCT the description → 1 doc/dataset select URIs → CONSTRUCT per entity → N docs/dataset (streaming batches)
Change signal description changed (per dataset) distribution data changed – Last-Modified / ETag (per dataset)
Writer Blue/green Rebuild, one collection In-place Rebuild, multi-collection, source = dataset

Key consequences:

  • A Reader is given a dataset or a distribution — not only a distribution. The catalog-grain reader reads the dataset description from the register/DKG; the object-grain reader crawls the dataset’s own distribution. Non-RDF origins fit too: a SPARQL-Anything reader facades JSON/CSV/XML to quads. So the Reader is the seam where the grains differ.
  • Catalog grain reads the DKG’s output (descriptions in a central store), downstream of the DKG; object grain reads the same raw distributions the DKG reads, alongside it — sharing the entire read side (selection + distribution resolution + object CONSTRUCT) and differing only in the Writer (Typesense In-place vs QLever). Object search is a new Writer on the existing DKG extraction path.
  • Selection criteria differ (catalog: all datasets; object: only those with a crawlable object distribution), via RegistrySelector criteria.
  • The change signal is a different event per grain (a description can change without the data changing, and vice versa). A shared provenance store must be scoped by pipeline IRI (Skip reprocessing unchanged datasets: source signal + consumer-declared pipeline version #450 already does) so records do not collide.

Composable building blocks: typed stages + a distinct Writer

The pipeline is a chain of typed Stage<In, Out> steps feeding a distinct transactional Writer. Stage is the generic step-unit name (a small generalization of today’s executor-only Stage into Stage<In, Out>); the payload types make composition safe. Engine-specific logic is isolated in the writer; everything upstream is engine-agnostic and shared.

  • ExtractionStage<_, Quad> running one or more Readers. A Reader produces quads for a dataset from outside the pipeline (named for the read side, not its output — every reader yields quads). Its input is a dataset description (register/DKG) or a distribution (crawl), and its backing data may be non-RDF (a SPARQL-Anything reader facades JSON/CSV/XML to quads). Implementations: SparqlConstructReader (endpoint/dump), a register/description reader, SparqlAnythingReader (non-RDF). Object-grain extraction is per-entity (select URIs → CONSTRUCT each), so quads arrive grouped by resource — the collection happens once, here. Quad→quad enrichments are Stage<Quad, Quad> (ADR 2’s QuadTransform).
  • ProjectionStage<Quad, Document> (@lde/search). Takes a resource’s grouped quads and produces the engine-agnostic logical document (field model → fields, references, localized values). RDF framing is fused inside this step, sharing the grouped structure — not a separate re-emitting frame stage, which would collect the same quads twice. This is the one type-changing (quad→document) step, and it is shared across all engines. @lde/search already owns it (per feat(search)!: engine- and domain-agnostic query model, Typesense compiler, and GraphQL surface #529).
  • Writer — distinct, transactional, engine-specific (@lde/search-<engine>). Consumes the logical document; owns the run transaction (openRun / write / commit / abort): index/collection schema, load/upsert, and finalize — alias swap (Blue/green) or sweep (In-place), lock. A Writer is not a Stage — a stage is a pure transform; the writer additionally owns the run lifecycle.

Type-safety by composition. Stage<In, Out> threads Out → In, so invalid pipelines are compile errors: a Stage<Quad, Quad> (or a quad enrichment) cannot follow the Stage<Quad, Document> projection. The typed transition prevents “attach a quad transform after projection” without any runtime marker. For config-driven composition, a phase-order check is the backstop.

The logical document is the shared contract (Ports & Adapters). Typesense and Elasticsearch writers each have their own API, schema, and alias-swap, but consume the same logical document — so projection cannot live inside the Typesense writer (ES would duplicate it) and must be an upstream, engine-agnostic stage. This matches the Stack: phases 1–4 are the engine-agnostic projection; phases 5–6 the engine adapter.

Transaction-aware Writer (one @lde/pipeline core change)

  • Keep Writer; rename the input unit ExecutorReader. Reader / Writer is the familiar I/O pair, so only the input renames — the output keeps its name (and fileWriter / sparqlUpdateWriter keep theirs). These map to the Stack’s conceptual source / sink (a Reader is a source, a Writer a sink); we use the more familiar I/O terms. Upgrade Writer from a per-dataset write to a factory + per-run transaction, and narrow its input to pre-framed documents. Done cleanly, not backward-compatibly (LDE is 0.x; a breaking change is a routine minor bump). The existing fileWriter + sparqlUpdateWriter become transactional Writers; re-touches the DKG. A lifecycle-free writer uses a no-op helper – not a second TransactionalWriter interface (that would reintroduce the dual-path we rejected).

Interface (from the design discussion; decision-rich shape):

interface Writer {
  openRun(ctx: RunContext): Promise<RunWriter>;
}
interface RunWriter {
  write(dataset: Dataset, documents: AsyncIterable<Document>): Promise<void>; // pre-framed upstream
  commit(): Promise<void>;   // Blue/green: alias swap; In-place: membership sweep + release lock
  abort(error: unknown): Promise<void>;
}
interface RunContext {
  readonly runId: string;                 // stamps last_seen (In-place), names blue collection (Blue/green)
  readonly startedAt: string;             // injected (no Date.now inside the run)
  selectedSources(): Iterable<DatasetId>; // full registry set → membership sweep
  readonly provenance?: ProvenanceStore;  // #450, when skip is enabled
}
  • Pipeline.run drives openRun → write* → commit/abort uniformly and never branches on mode. swap() is a private step of the Blue/green writer’s commit(), never called by the pipeline.

Two update modes (not three)

  • Blue/green Rebuild (full rebuild from zero, atomic swap, implicit deletion): catalog and KG. commit swaps the alias / bulk-loads + dir-swaps; abort drops the blue collection.
  • In-place Rebuild (per-source upsert + sweep; no swap, source-scoped atomicity, docs carry source + last_seen): objects. commit runs the membership sweep + releases the lock; abort releases the lock and lets the next run reconcile (upserts idempotent). This is the Stack’s In-place Rebuild pattern verbatim – the live index is the cache, so Last-known-good Per-source Caching is not needed alongside it.
  • The mode is a Writer implementation (BlueGreenRebuild, InPlaceRebuild), not a marker/flag; the pipeline never branches on it. The swap – including the KG’s QLever bulk-load + dir swap – lives in the writer’s commit, not a separate pipeline stage.
  • Carry-over (Blue/green + skip-unchanged) is rejected as YAGNI: it needs a read≫write imbalance neither DR case has (catalog is cheap/cheap → Blue/green; objects are expensive/expensive → In-place).
  • A single-flight cross-pod lock is acquired in openRun and released in commit/abort.

Run granularity

  • A run is one openRun → commit transaction; the interface is granularity-agnostic. Catalog and KG run as a single run per rebuild (one blue collection, one swap). Objects may run as a single batch run or per-dataset multi-run (event-driven), because In-place writes are independent. The registry-membership sweep needs the full selection set, so it is always a periodic full-selection run, never per-dataset.

Deletion

  • Blue/green: implicit (rebuild from zero). In-place: per-source sweep (delete where source = dataset AND last_seen < runId) + registry-membership sweep (delete where source ∉ selectedSources()). The Last-Modified / ETag signal is deletion-sound for dump distributions (re-publishing changes the header). Live SPARQL-endpoint sources without a usable signal fall back to always-reprocess or a COUNT / periodic sweep.

Streaming / low memory (by reuse)

  • The extraction Reader (SparqlConstructReader) streams AsyncIterable<Quad> (injectValues injects a VALUES block into the CONSTRUCT’s sub-SELECT – so “select entity URIs, then CONSTRUCT each entity’s subgraph” is the existing extraction). The Projection frames each bounded batch of URIs (reusing frame-by-type) into typed documents. The Writer loads documents (batched import). Memory is bounded by the VALUES batch size, independent of dataset size – no whole-dataset buffering, no new framer. KG writers stream quads unchanged. The only new orchestration is the two-level iteration (dataset → entity-URI batches) and the batch-size knob (memory vs request-count).

References and labels (one capability)

  • A reference resolves its label (and optionally more fields) from a configured target collection (a “label source”). Generalize the single labelsCollection option → a per-reference label source. A “labels collection” and a typed entity collection are the same kind of thing; the global labels collection is one instantiation, kept small only for typeless referents.
  • The local label is denormalized on the doc (within-source, keeping In-place per-source-clean). The canonical label is reached via an opt-in nested field, resolved query-time, hidden behind the GraphQL API (clients never join), and pruned by field selection.
  • Typed entity collections (e.g. persons, organizations) are first-class searchable collections. This is the Stack’s Multi-source Composition (same kind, multiple sources → one collection; different kinds → separate collections; kind and source orthogonal; identity is URI equality). The Projection emits typed documents; the Writer routes each type to its collection and, per source, upserts + sweeps; commit runs the membership sweep across all collections. Cross-type federated search (multi_search across schemas) queries several collections at once.
  • This server-side reference resolution is the Stack’s Projection mode (engine documents carry the search payload; the GraphQL API resolves typed fields from the engine directly). The alternative URI-filter mode (engine returns URIs; an RDF-native frontend hits SPARQL with VALUES ?s) is Projection mode with idOnly references throughout – same generator, thinner surface.

Facet-by-name search (required for v1)

  • Typeahead the reference’s target collection by label → the user selects values → filter the main query by the bounded selected IRIs (never filter by all name-matches – that overflows). Counts on debounced keystroke are contextual, computed at query time (respecting active filters + skip-own-filter), not precomputed (precompute would re-break In-place via cross-collection count invalidation). The reference-facet search is a two-step, hidden behind GraphQL.

Domain-agnostic discipline

  • LDE names no domain (no creator / term / person). It would be a design smell for @lde/search or @lde/search-typesense to branch on a domain notion to resolve a label or route a reference. DR supplies every domain semantic (which fields are references, which type / source each resolves against, the merge policy). HTTP concerns go through a per-target fetch-middleware capability slot (Unify pipeline extension surface: capability slots on PipelinePlugin + fetch middleware #422).

Testing Decisions

Good tests exercise observable behavior through public interfaces, not implementation details, and survive internal refactors. Spec tests for the three deep modules:

  1. Transaction-aware Writer implementations (BlueGreenRebuild, InPlaceRebuild) – behavioral, via a Typesense testcontainer. After a run, assert what is in the live collection, what was swapped, what was swept; that abort drops the blue collection (Blue/green) or leaves the live collection for next-run reconciliation (In-place). Prior art: the existing @lde/search-typesense testcontainer integration test.
  2. In-place sweep logic (per-source + membership) – pure/unit. Given docs + a run’s selection set: removed sources’ docs are deleted, unchanged docs survive, and the membership sweep drops docs whose source left the registry.
  3. Per-reference label resolver – unit. Given references + label sources: labels resolve (batched), pruned by selection, degrading gracefully on dangling references.

Reuse (no new tests): reprocessDecision (#450) is already tested; the Projection reuses/extends the frame-by-type guardrail test; the extraction Reader is tested in @lde/pipeline.

A behavioral memory-flatness test on the object write path is worth adding: assert memory stays flat across a large synthetic input (bounded by the VALUES batch size), guarding the streaming property directly.

Out of Scope

  • The Dataset Register consumer side (the concrete SCHEMA-AP-NDE schemas and the hand-written dr:* CONSTRUCTs) lands in a separate DR change. DR catalog ships now on its current pragmatic path.
  • SHACL-driven extraction generation (retiring the hand-written CONSTRUCTs) – a later step forced by the object work.
  • Carry-over (Blue/green + skip) – rejected.
  • A REST surface (URI-filter mode), and reference strategies beyond what the above needs.

Further Notes

Open decisions to resolve during implementation:

  1. Cross-dataset identity of shared-URI typed entities. The Stack’s Multi-source Composition sets identity by URI equality and projects each resource from its own dataset (plus Network-of-Terms labels), which points away from a field-level merge. But In-place Rebuild’s per-source sweep assumes source = dataset: a URI co-owned by two datasets collides (source A’s sweep would delete source B’s contribution). Resolve: URI equality with per-dataset docs (accepting duplication in standalone search) vs a contributor-set sweep + a DR-supplied merge policy for co-owned URIs. LDE stays domain-agnostic via a merge-policy hook.
  2. Whether typeless referents persist (justifying a small labels collection) or every reference gets a typed collection.
  3. Whether the single-flight lock is the Writer’s concern (it guards its own target) or a generic pipeline concern (LDE ships a lock helper).

Relations: reuses #450 (closed, skip-unchanged); builds on #469 (distribution health signal); composes with #422 (plugin extension surface / fetch middleware); relates to #533 (facet_query typeahead). Forcing function: indexing SCHEMA-AP-NDE-compliant objects inside datasets registered with the Dataset Register. Respect ADRs 0003 (core query model) and 0004 (GraphQL surface); a follow-up ADR should capture “search as a Configurable Pipeline instance: stages + transaction-aware Writer, two update modes, per-reference label sources.”

Grounding in the NDE Stack (bidirectional). This design and the Stack docs are reconciled both ways. The Stack’s patterns are the ubiquitous language adopted here – Blue/green Rebuild, In-place Rebuild, Multi-source Composition, Configurable Pipeline, Ports & Adapters, Discovery via DCAT-AP Registry, Change-driven Rebuild, Last-known-good Per-source Caching, and the Projection / URI-filter consumption modes:

Code vocabulary: the pipeline uses the familiar I/O pair Reader / Writer, which map to the Stack’s conceptual source / sink (a Reader is a source, a Writer a sink).

Where this design has newer insight, the Stack should change too – it is not a fixed contract. Specifically:

  • Fold the separate typesense-load and typesense-alias-swap stages into one transaction-aware Writer. Load + finalize is a single atomic unit (illegal-state prevention, multi-collection coordination); QLever cannot split load from swap at all. The Stack’s step-7 package split should reflect this.
  • Reconcile In-place Rebuild’s source-scoped sweep with Multi-source Composition’s URI-equality rule for URIs co-owned by multiple sources (open question 1).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Fields

    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions