Skip to content

fix(queue): enforce hard backpressure on bounded queues#533

Open
Babigdk wants to merge 1 commit into
StellarFlow-Network:mainfrom
Babigdk:fix/487-queue-backpressure
Open

fix(queue): enforce hard backpressure on bounded queues#533
Babigdk wants to merge 1 commit into
StellarFlow-Network:mainfrom
Babigdk:fix/487-queue-backpressure

Conversation

@Babigdk

@Babigdk Babigdk commented Jun 27, 2026

Copy link
Copy Markdown

Summary

Implements bounded-memory backpressure controls for stream ingestion queues.

The existing queue already enforced a capacity limit, but non-critical packets were dropped immediately when the queue became full. This change applies hard producer-side backpressure so upstream ingestion blocks until downstream consumers free capacity, preventing uncontrolled memory growth during sink lag events.

Related Issue

Closes #487

Changes Made

  • Verified bounded queue capacity is enforced with a default maximum size of 1000
  • Updated enqueue logic to block producers when queue capacity is exhausted
  • Preserved saturation-based slowdown behavior
  • Preserved metric packet shedding under high saturation conditions
  • Improved protection against memory exhaustion caused by slow downstream writers

Backpressure Behavior

Before:

  • Critical packets waited
  • Non-critical packets were dropped when queue became full

After:

  • Queue remains strictly bounded
  • Producers block when capacity is exhausted
  • Upstream readers naturally slow until consumers drain the queue
  • Metric packets may still be dropped under configured saturation thresholds

Acceptance Criteria

  • Internal queue storage remains bounded
  • Maximum capacity enforced at 1000 items
  • Backpressure blocks upstream ingestion when capacity is reached
  • Prevents unbounded memory growth during downstream lag

Testing

Validation performed via code review of queue flow:

  • Capacity enforcement remains active
  • Producer blocking path uses queue.put(...)
  • Existing saturation metrics remain functional

Notes for Reviewers

The repository issue references a Python implementation using janus.Queue(maxsize=1000), but the current codebase implements equivalent functionality in TypeScript via AsyncBoundedQueue. This PR applies the requested bounded-memory backpressure behavior to the TypeScript implementation.

@drips-wave

drips-wave Bot commented Jun 27, 2026

Copy link
Copy Markdown

@Babigdk Great news! 🎉 Based on an automated assessment of this PR, the linked Wave issue(s) no longer count against your application limits.

You can now already apply to more issues while waiting for a review of this PR. Keep up the great work! 🚀

Learn more about application limits

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

🕒 Queue-Backpressure | Bounded Memory Queues for Stream Broker Sinks

1 participant