Skip to content

Async I/O Cancellation Safety and Graceful Shutdown Protocol for Telemetry Stream Lifecycle #49

Description

@elizabetheonoja-art

Problem Statement / Feature Objective

When the utility backend receives a SIGTERM or a configuration-triggered graceful shutdown, the current Drop implementations and tokio task cancellation via CancellationToken do not guarantee that in-flight telemetry streams are drained, acknowledged, and their watermark persisted before process exit. This results in meter events being silently dropped on restart, causing gaps in billing data and potential double-settlement if the watermark was partially updated. A formal async cancellation safety and graceful shutdown protocol must be implemented that defines precisely which resources are drained in which order, uses structured concurrency with cancellation scopes, and persists a shutdown_checkpoint that allows crash recovery to resume from the exact point of interruption.

Technical Invariants & Bounds

  • Shutdown phase ordering: (1) stop TCP acceptors (no new connections); (2) drain in-flight reassembly buffers; (3) drain parser channels; (4) finalize in-progress tariff evaluations; (5) flush storage write buffers; (6) persist watermark to RocksDB; (7) close Soroban client connections; (8) signal SIGKILL-safe via /tmp/utility_shutdown_complete marker file.
  • Per-phase timeout: each phase has a configurable deadline (default 30 seconds, max 300 seconds). If a phase times out, the remaining in-flight events are logged and the process exits with code 1 (to trigger supervisor restart).
  • Structured concurrency: each pipeline stage is a TaskGroup (from tokio_util::task::TaskGroup) that tracks all spawned tasks. Cancellation propagates top-down: the parent stage cancels its children before transitioning.
  • Cancellation token hierarchy: a root CancellationToken branches per stage; when a stage is cancelled, all its sub-tasks receive the downstream token.
  • Maximum in-flight events during shutdown: 1,000,000 events tracked across all stages; if exceeded, the process logs a CRIT and exits immediately (to prevent OOM during shutdown).
  • Restart recovery: on next startup, the process reads the shutdown_checkpoint from RocksDB containing the last persisted watermark per meter source; any events with offsets above the watermark are re-fetched from the meter source.

Codebase Navigation Guide

  • src/lifecycle/shutdown.rs — new module: ShutdownProtocol orchestrating phase ordering, timeouts, and checkpoint persistence.
  • src/lifecycle/task_group.rs — new module: StructuredTaskGroup wrapping tokio_util::TaskGroup with cancellation scope.
  • src/ingestion/pipeline.rs — integrate structured concurrency: each stage creates a StructuredTaskGroup that owns its sub-tasks.
  • src/ingestion/collector.rs — main collector loop; accept a CancellationToken and break on cancellation after draining.
  • src/storage/rocksdb/checkpoint.rs — new module: ShutdownCheckpoint serialization/deserialization, stored at key shutdown_checkpoint_v1.
  • src/bin/utility_backend.rs — main entry point; install signal handlers for SIGTERM, SIGINT, SIGHUP; initiate ShutdownProtocol::shutdown().
  • tests/lifecycle/graceful_shutdown_test.rs — integration test sending SIGTERM under load and verifying zero data loss.

Implementation Blueprint

  1. In shutdown.rs, define enum ShutdownPhase { Acceptors, Reassembly, Parsing, Evaluation, Storage, Watermark, Blockchain, Complete }. Implement ShutdownProtocol { phase: AtomicU8, deadline: Instant, checkpoint: Arc<RwLock<Checkpoint>> }.
  2. Implement async fn shutdown(&self, signal: SignalKind) that transitions through phases in order. For each phase, broadcast the corresponding CancellationToken::cancel(), then await TaskGroup::wait_for_all() with the phase deadline. If timeout, log phase name + remaining tasks and proceed to the next phase.
  3. In task_group.rs, define StructuredTaskGroup { group: TaskGroup, token: CancellationToken }. Spawning a task registers it in the group and passes the cancellation token. The drop implementation awaits the task group (but not during shutdown — shutdown explicitly awaits).
  4. In pipeline.rs, restructure the pipeline as a tree of StructuredTaskGroups: PipelineGroup at the root, with child groups for each stage. When the root receives cancellation, it cancels children in reverse order (Store → Evaluate → Parse → Reassemble → Accept).
  5. In collector.rs, the main accept loop checks token.is_cancelled() after each accepted connection. If cancelled, stop accepting, wait for existing connection handler tasks with a 30-second deadline, then persist the watermark.
  6. In checkpoint.rs, implement ShutdownCheckpoint { timestamp_ns: i64, watermark: HashMap<MeterSourceId, u64>, stages_drained: BitSet<8> }. Persist to RocksDB after each completed phase, using a synchronous write with WriteOptions::set_sync(true). On startup, load and resume from the last phase that completed.
  7. Write an integration test that starts the backend, injects 10,000 events/second, sends SIGTERM at a random time within the first 5 seconds, waits for the process to exit, restarts it, and verifies that the sum of events ingested pre-shutdown + post-restart equals the total injected (data integrity). Use insta for snapshot comparison of event counters.

Metadata

Metadata

Assignees

Labels

Complexity: HardcoreIssues requiring deep systems-level engineering rigorGrantFox OSSIssue tracked in GrantFox OSSLayer: Core-EngineCore engine layerMaybe RewardedIssue may be eligible for a GrantFox rewardOfficial CampaignCampaign: Official CampaignType: Core-ArchitectureCore architecture concerns, invariants, and structural design

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions