Skip to content

Commit acef797

Browse files
committed
Add Broker sync method
Replace per-queue create()/delete() with sync(queues) that takes all queues at once. This better fits backends like PostgreSQL where setup means creating tables rather than declaring individual queues. The CLI sync command now delegates directly to broker.sync() instead of looping over queues itself. The --recreate flag is passed through so each backend handles recreation internally.
1 parent 9d44883 commit acef797

12 files changed

Lines changed: 84 additions & 96 deletions

File tree

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,18 @@ The format is loosely based on [Keep a Changelog](https://keepachangelog.com).
88
[Unreleased]
99
------------
1010

11+
### Breaking Changes
12+
13+
- Removed `Broker.create(queue)`, in favor of `Broker.sync([queue])`.
14+
- Removed `Broker.delete(queue)`, without replacement.
15+
- Removed `QueueIO.create(queue)`, in favor of `QueueIO.sync([queue])`.
16+
- Removed `QueueIO.delete(queue)`, without replacement.
17+
18+
### Added
19+
20+
- Added `Broker.sync()` to back `queueio sync`.
21+
- Added `QueueIO.sync()` to back `queueio sync`.
22+
1123
[0.6.0] - 2026-02-16
1224
--------------------
1325

queueio/__main__.py

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from contextlib import suppress
21
from typing import Annotated
32

43
import typer
@@ -103,20 +102,12 @@ def sync(
103102
recreate: Annotated[
104103
bool,
105104
typer.Option(
106-
help="Delete and recreate queues that have incompatible arguments. "
107-
"WARNING: This will lose any pending messages in those queues.",
105+
help="Recreate all resources for the broker and journal. "
106+
"WARNING: This will lose pending messages.",
108107
),
109108
] = False,
110109
):
111-
"""Sync known queues to the broker.
112-
113-
Planned flags:
114-
- (default) ensure everything needed exists
115-
- --repair: rebuild only what's broken
116-
- --recreate: tear down and rebuild everything we know about
117-
- --prune: remove what's not needed
118-
- --plan: show what would happen without doing it
119-
"""
110+
"""Sync resources for the broker and journal."""
120111
queueio = QueueIO()
121112
try:
122113
routines = queueio.routines()
@@ -127,29 +118,19 @@ def sync(
127118

128119
queues = sorted({routine.queue for routine in routines})
129120

130-
print(f"Syncing queues for {len(routines)} routine(s):")
131-
if recreate:
132-
for queue in queues:
133-
print(f" Recreating queue: {queue}")
134-
with suppress(Exception):
135-
queueio.delete(queue=queue)
136-
137-
failed = []
121+
print(f"Discovered queues for {len(routines)} routine(s):")
138122
for queue in queues:
139-
print(f" Ensuring queue exists: {queue}")
140-
try:
141-
queueio.create(queue=queue)
142-
except Exception:
143-
failed.append(queue)
123+
print(f" {queue}")
144124

145-
if failed:
125+
try:
126+
queueio.sync(queues, recreate=recreate)
127+
except Exception:
146128
print(
147-
f"\nError: {len(failed)} queue(s) have incompatible arguments: "
148-
f"{', '.join(failed)}\n"
149-
f"Re-run with --recreate to delete and recreate them.\n"
150-
f"WARNING: This will lose any pending messages in those queues."
129+
"\nError: Failed to sync resources.\n"
130+
"Re-run with --recreate to recreate them.\n"
131+
"WARNING: This will lose pending messages."
151132
)
152-
raise typer.Exit(1)
133+
raise typer.Exit(1) from None
153134

154135
print(f"Successfully synced {len(queues)} queue(s)")
155136
finally:

queueio/broker.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from abc import ABC
22
from abc import abstractmethod
3+
from collections.abc import Iterable
34

45
from .queuespec import QueueSpec
56
from .receiver import Receiver
@@ -20,18 +21,17 @@ def from_uri(cls, uri: str, /):
2021
raise NotImplementedError("Subclasses must implement this method.")
2122

2223
@abstractmethod
23-
def enqueue(self, body: bytes, /, *, queue: str, priority: int):
24-
"""Enqueue a message."""
25-
raise NotImplementedError("Subclasses must implement this method.")
24+
def sync(self, queues: Iterable[str], *, recreate: bool = False):
25+
"""Ensure the given queues are ready to use.
2626
27-
@abstractmethod
28-
def create(self, *, queue: str):
29-
"""Create a queue if it doesn't exist."""
27+
If recreate is True, destroy and recreate all resources,
28+
losing any pending messages.
29+
"""
3030
raise NotImplementedError("Subclasses must implement this method.")
3131

3232
@abstractmethod
33-
def delete(self, *, queue: str):
34-
"""Delete a queue."""
33+
def enqueue(self, body: bytes, /, *, queue: str, priority: int):
34+
"""Enqueue a message."""
3535
raise NotImplementedError("Subclasses must implement this method.")
3636

3737
@abstractmethod

queueio/broker_test.py

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def broker(self):
6060
@pytest.mark.timeout(2)
6161
def test_prefetch_limits_message_consumption(self, broker):
6262
"""Verify that prefetch parameter limits message consumption."""
63-
broker.create(queue="test-queue")
63+
broker.sync(["test-queue"])
6464
broker.purge(queue="test-queue")
6565

6666
# Enqueue more messages than prefetch limit
@@ -93,7 +93,7 @@ def receive_messages():
9393
@pytest.mark.timeout(2)
9494
def test_suspend_resume_affects_prefetch_capacity(self, broker):
9595
"""Verify suspending messages frees capacity and resuming reduces it."""
96-
broker.create(queue="test-queue")
96+
broker.sync(["test-queue"])
9797
broker.purge(queue="test-queue")
9898

9999
# Enqueue messages
@@ -131,7 +131,7 @@ def receive_messages():
131131
@pytest.mark.timeout(2)
132132
def test_complete_message_frees_prefetch_capacity(self, broker):
133133
"""Verify completing messages frees up capacity."""
134-
broker.create(queue="test-queue")
134+
broker.sync(["test-queue"])
135135
broker.purge(queue="test-queue")
136136

137137
# Enqueue more messages than prefetch limit
@@ -171,7 +171,7 @@ def receive_messages():
171171
@pytest.mark.timeout(2)
172172
def test_multiple_receivers_independent_prefetch_limits(self, broker):
173173
"""Verify multiple receivers operate with independent prefetch limits."""
174-
broker.create(queue="test-queue")
174+
broker.sync(["test-queue"])
175175
broker.purge(queue="test-queue")
176176

177177
# Enqueue enough messages for both receivers
@@ -223,8 +223,7 @@ def test_receive_rejects_empty_queues(self, broker):
223223
@skip_if_unsupported("supports_multiple_queues")
224224
def test_receive_supports_multiple_queues(self, broker):
225225
"""Verify broker can receive from multiple queues."""
226-
broker.create(queue="queue1")
227-
broker.create(queue="queue2")
226+
broker.sync(["queue1", "queue2"])
228227
broker.purge(queue="queue1")
229228
broker.purge(queue="queue2")
230229

@@ -260,10 +259,7 @@ def receive_messages():
260259
def test_multiple_queues_with_mixed_empty_and_filled(self, broker):
261260
"""Verify broker handles multiple queues with some empty, some with messages."""
262261

263-
broker.create(queue="empty1")
264-
broker.create(queue="filled")
265-
broker.create(queue="empty2")
266-
broker.create(queue="also_filled")
262+
broker.sync(["empty1", "filled", "empty2", "also_filled"])
267263
broker.purge(queue="empty1")
268264
broker.purge(queue="filled")
269265
broker.purge(queue="empty2")
@@ -308,8 +304,7 @@ def test_duplicate_queue_names_get_additional_priority(self, broker):
308304
probability in the round-robin selection.
309305
"""
310306

311-
broker.create(queue="priority_queue")
312-
broker.create(queue="normal_queue")
307+
broker.sync(["priority_queue", "normal_queue"])
313308
broker.purge(queue="priority_queue")
314309
broker.purge(queue="normal_queue")
315310

@@ -410,9 +405,7 @@ def test_empty_queue_cycling_fairness(self, broker):
410405
the message, not just cycle by a fixed amount.
411406
"""
412407

413-
broker.create(queue="empty")
414-
broker.create(queue="queue1")
415-
broker.create(queue="queue2")
408+
broker.sync(["empty", "queue1", "queue2"])
416409
broker.purge(queue="empty")
417410
broker.purge(queue="queue1")
418411
broker.purge(queue="queue2")
@@ -473,7 +466,7 @@ def receive_messages():
473466
@pytest.mark.timeout(2)
474467
def test_higher_priority_consumed_first(self, broker):
475468
"""Verify that higher priority messages are consumed before lower ones."""
476-
broker.create(queue="test-queue")
469+
broker.sync(["test-queue"])
477470
broker.purge(queue="test-queue")
478471

479472
# Enqueue low priority first, then high priority

queueio/pika/broker.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from collections.abc import Iterable
2+
from contextlib import suppress
13
from threading import Lock
24

35
from pika import URLParameters
@@ -26,6 +28,19 @@ def __init__(self, connection_params: Parameters):
2628
self.__shutdown = False
2729
self.__receivers = set[PikaReceiver]()
2830

31+
def sync(self, queues: Iterable[str], *, recreate: bool = False):
32+
channel = self.__connection.channel()
33+
try:
34+
for queue in queues:
35+
if recreate:
36+
with suppress(Exception):
37+
channel.delete(queue=queue)
38+
channel.queue_declare(
39+
queue=queue, durable=True, arguments={"x-max-priority": 9}
40+
)
41+
finally:
42+
channel.close()
43+
2944
def enqueue(self, body: bytes, /, *, queue: str, priority: int):
3045
self.__channel.publish(
3146
exchange="",
@@ -34,18 +49,6 @@ def enqueue(self, body: bytes, /, *, queue: str, priority: int):
3449
properties=BasicProperties(priority=priority),
3550
)
3651

37-
def create(self, *, queue: str):
38-
channel = self.__connection.channel()
39-
channel.queue_declare(
40-
queue=queue, durable=True, arguments={"x-max-priority": 9}
41-
)
42-
channel.close()
43-
44-
def delete(self, *, queue: str):
45-
channel = self.__connection.channel()
46-
channel.delete(queue=queue)
47-
channel.close()
48-
4952
def purge(self, *, queue: str):
5053
self.__channel.purge(queue=queue)
5154

queueio/queueio.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,8 @@ def run[R](self, invocation: Invocation[R], /) -> R:
119119
with self.invocation_handler():
120120
return invocation.submit().result()
121121

122-
def create(self, *, queue: str):
123-
self.__broker.create(queue=queue)
124-
125-
def delete(self, *, queue: str):
126-
self.__broker.delete(queue=queue)
122+
def sync(self, queues: Iterable[str], *, recreate: bool = False):
123+
self.__broker.sync(queues, recreate=recreate)
127124

128125
def purge(self, *, queue: str):
129126
self.__broker.purge(queue=queue)

queueio/queueio_test.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def test_queueio_with_custom_broker_and_journal():
1717

1818
try:
1919
# Test purge (uses broker)
20-
queueio.create(queue="queueio")
20+
queueio.sync(["queueio"])
2121
queueio.purge(queue="queueio")
2222

2323
# Test subscriptions (uses journal)
@@ -40,9 +40,9 @@ def test_different_queueio_instances_are_independent():
4040

4141
try:
4242
# Both should work independently
43-
queueio1.create(queue="queueio")
43+
queueio1.sync(["queueio"])
4444
queueio1.purge(queue="queueio")
45-
queueio2.create(queue="queueio")
45+
queueio2.sync(["queueio"])
4646
queueio2.purge(queue="queueio")
4747

4848
# Test that they can have independent subscriptions
@@ -86,7 +86,7 @@ def test_queueio_loads_configuration_from_pyproject(tmp_path):
8686
queueio = QueueIO()
8787
try:
8888
# Should be able to perform basic operations
89-
queueio.create(queue="test")
89+
queueio.sync(["test"])
9090
queueio.purge(queue="test")
9191
events = queueio.subscribe({object})
9292
queueio.unsubscribe(events)
@@ -136,9 +136,9 @@ def test_queueio_allows_pika_override(tmp_path):
136136

137137
try:
138138
# Both should work
139-
queueio1.create(queue="test")
139+
queueio1.sync(["test"])
140140
queueio1.purge(queue="test")
141-
queueio2.create(queue="test")
141+
queueio2.sync(["test"])
142142
queueio2.purge(queue="test")
143143
finally:
144144
queueio1.shutdown()
@@ -209,7 +209,7 @@ def test_queueio_with_valid_config(tmp_path):
209209
queueio = QueueIO()
210210
try:
211211
# Should load configuration successfully
212-
queueio.create(queue="test")
212+
queueio.sync(["test"])
213213
queueio.purge(queue="test")
214214
routines = queueio.routines()
215215
routine_names = {routine.name for routine in routines}
@@ -327,7 +327,7 @@ def test_queueio_with_uri_broker_config(tmp_path):
327327
queueio = QueueIO()
328328
try:
329329
# Should work with URI configuration
330-
queueio.create(queue="test")
330+
queueio.sync(["test"])
331331
queueio.purge(queue="test")
332332
finally:
333333
queueio.shutdown()
@@ -367,7 +367,7 @@ def test_queueio_with_uri_journal_config(tmp_path):
367367
queueio = QueueIO()
368368
try:
369369
# Should work with URI configuration
370-
queueio.create(queue="test")
370+
queueio.sync(["test"])
371371
queueio.purge(queue="test")
372372
finally:
373373
queueio.shutdown()
@@ -407,7 +407,7 @@ def test_queueio_with_both_uri_configs(tmp_path):
407407
queueio = QueueIO()
408408
try:
409409
# Should work with both URI configurations
410-
queueio.create(queue="test")
410+
queueio.sync(["test"])
411411
queueio.purge(queue="test")
412412
finally:
413413
queueio.shutdown()
@@ -488,7 +488,7 @@ def test_queueio_with_environment_variable(tmp_path, monkeypatch):
488488
queueio = QueueIO()
489489
try:
490490
# Should work with environment variables taking precedence
491-
queueio.create(queue="test")
491+
queueio.sync(["test"])
492492
queueio.purge(queue="test")
493493
finally:
494494
queueio.shutdown()

queueio/samples/basic_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def test_integration():
1515
queueio = QueueIO()
1616

1717
with queueio.activate():
18-
queueio.create(queue="basic")
18+
queueio.sync(["basic"])
1919
queueio.purge(queue="basic")
2020
events = queueio.subscribe({Invocation.Completed})
2121
invocation = yielding(7)

queueio/samples/expanded_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def test_integration():
1616
queueio = QueueIO()
1717

1818
with queueio.activate():
19-
queueio.create(queue="expanded")
19+
queueio.sync(["expanded"])
2020
queueio.purge(queue="expanded")
2121
events = queueio.subscribe({Invocation.Completed})
2222
invocation = irregular()

queueio/samples/priority_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def test_priority_propagation():
1616
queueio = QueueIO()
1717

1818
with queueio.activate():
19-
queueio.create(queue="priority")
19+
queueio.sync(["priority"])
2020
queueio.purge(queue="priority")
2121
events = queueio.subscribe({Invocation.Completed})
2222
invocation = demonstrate_priorities()

0 commit comments

Comments
 (0)