Skip to content

feat(server): wire cluster query coordinator + distributed cursor — query Phase 3 (SPEC-304)#46

Merged
ivkan merged 5 commits into
mainfrom
sf-442-wire-coordinator
Jun 10, 2026
Merged

feat(server): wire cluster query coordinator + distributed cursor — query Phase 3 (SPEC-304)#46
ivkan merged 5 commits into
mainfrom
sf-442-wire-coordinator

Conversation

@ivkan

@ivkan ivkan commented Jun 10, 2026

Copy link
Copy Markdown
Member

Summary

SPEC-304 (from TODO-442) — Query Engine Consolidation Phase 3: wire the cluster query
coordinator into production and complete the distributed merge path. Builds on SPEC-301, which
fixed distributed merge correctness (global sort + global limit) but deliberately left the
coordinator unwired.

Changes

  • bin/topgun_server.rs — construct and wire ClusterQueryCoordinator into the prod assembly
    behind a cluster-mode gate; share the completion_registry so DagComplete frames resolve the
    coordinator's awaiting receivers; route cluster DAG frames to the coordinator dispatch loop.
  • dag/converter.rs — place the cursor vertex worker-side in multi-node DAG plans (so
    per-node keyset pagination positions are produced before the coordinator merge);
    build_cursor_vertex_config helper + if-let cleanup for the single-node branch.
  • service/domain/query.rs — coordinator wiring surface.
  • sim/cluster.rsdistributed_keyset_cursor_under_partition fault-injection sim test
    exercising distributed cursor pagination under a network partition.

Verification

Reviewed via the SpecFlow impl-review gate. 4 Rust files (within the 5-file Language Profile).
CI gate is Rust (cargo test + sim + clippy --all-targets --all-features -D warnings + fmt);
must be green before merge.

ivkan added 5 commits June 10, 2026 20:23
In non-GROUP-BY multi-node plans the cursor vertex was being appended
after network-receiver (coordinator-side), meaning each node sent all
matching records and the coordinator filtered them — wrong for keyset
pagination.  Move the cursor vertex into the multi-node branch so it
precedes network-sender; Step 3b (single-node path) is guarded with
`!multi_node` to avoid double-insertion.

Add converter unit test asserting cursor→network-sender edge exists and
network-receiver→cursor edge does not exist in multi-node plans.

Also route handle_query_subscribe through coordinator.execute_distributed
when self.coordinator is Some, falling back to run_dag_local otherwise.
The linear_engine_for_tests opt-out retains highest priority.  The
coordinator field was previously declared but never read on the query path.
Construct a ClusterQueryCoordinator and call QueryService::with_coordinator
only when --seed-nodes is provided (cluster_mode = !seed_list.is_empty()).
Single-node startup leaves coordinator: None so existing behaviour is unchanged.

Key wiring links:
- completion_registry Arc<DashMap<String, oneshot::Sender<DagCompletePayload>>>
  is shared between the coordinator and the cluster dispatch loop so
  DagComplete frames from peer nodes resolve the coordinator's awaiting receivers.
- Inbound routing task now routes DagExecute/DagComplete/DagData frames to
  the dispatch channel instead of discarding them (bin:343-356 previously
  dropped all non-heartbeat frames, which would have timed out every
  distributed query).
- ClusterStateAdapter bridges Arc<ClusterState> to Arc<dyn ClusterService>
  for the coordinator (same pattern as SimClusterService in simulation tests).
- build_services gains a cluster_params: Option<ClusterParams> parameter;
  when Some, constructs the coordinator with the real connection_registry and
  record_store_factory available inside the function.
Adds a 2-node partition fault-injection simulation test proving that the
global keyset cursor is correctly applied worker-side before the network
boundary.

Test setup: node-0 owns scores {10,30,50}, node-1 owns {20,40,60}.
Page 1 (no cursor, limit=3) returns [10,20,30].  A CursorData is encoded
from the last row (score=30, key="delta").  A partition is injected then
healed.  Page 2 (with cursor, limit=3) returns [40,50,60] — strictly after
the cursor keyset position, in global sort order, with no overlap or
duplicates from page 1.

Coverage note (per spec AC4): this sim drives coordinator.execute_distributed
directly and does NOT exercise handle_query_subscribe routing or the prod
bin dispatch-loop wiring.  The dispatch-loop to completion_registry link
is verified by source inspection per the Validation Checklist.
- Replaces is_some() + expect() with an if-let binding so the cursor
  string is captured without a panic path (clippy unnecessary_unwrap)
- No behavior change: branch remains gated on !multi_node
Eliminates the duplicated predicate/sort-hash + cursor-config map block
shared by the multi-node worker-side and single-node cursor paths.
@cloudflare-workers-and-pages

cloudflare-workers-and-pages Bot commented Jun 10, 2026

Copy link
Copy Markdown

Deploying topgun with  Cloudflare Pages  Cloudflare Pages

Latest commit: 4d31e09
Status: ✅  Deploy successful!
Preview URL: https://00196dd2.topgun-f45.pages.dev
Branch Preview URL: https://sf-442-wire-coordinator.topgun-f45.pages.dev

View logs

@ivkan ivkan merged commit 88e2b3d into main Jun 10, 2026
10 checks passed
@ivkan ivkan deleted the sf-442-wire-coordinator branch June 10, 2026 18:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant