Skip to content

Commit c659999

Browse files
committed
Add celery_app parameter to job_queue_size for priority queue support
Fixes RabbitMQ PRECONDITION_FAILED error when querying queues configured with custom arguments like x-max-priority. The celery_app parameter allows extracting queue arguments from the app's task_queues configuration and passing them to queue_declare(). The celery_app and broker_url parameters are mutually exclusive - a ValueError is raised if both are provided.
1 parent 0923bed commit c659999

2 files changed

Lines changed: 300 additions & 37 deletions

File tree

hirefire_resource/macro/celery.py

Lines changed: 89 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,26 @@ class ChannelError(Exception):
2525
from hirefire_resource.errors import MissingQueueError
2626

2727

28+
def _get_queue_arguments_from_app(app, queues):
29+
"""
30+
Extract queue arguments from Celery app configuration for specified queues.
31+
32+
Args:
33+
app: Celery app instance with task_queues configuration
34+
queues: List of queue names to extract arguments for
35+
36+
Returns:
37+
dict: Mapping of queue name to queue_arguments dict
38+
"""
39+
queue_args = {}
40+
task_queues = getattr(app.conf, "task_queues", None) or []
41+
for q in task_queues:
42+
queue_name = getattr(q, "name", None)
43+
if queue_name and queue_name in queues:
44+
queue_args[queue_name] = getattr(q, "queue_arguments", None)
45+
return queue_args
46+
47+
2848
def mitigate_connection_reset_error(retries=10, delay=1):
2949
"""
3050
Decorator to retry a function when ConnectionResetError occurs.
@@ -210,7 +230,7 @@ async def async_job_queue_latency(*queues, broker_url=None):
210230

211231

212232
@mitigate_connection_reset_error()
213-
def job_queue_size(*queues, broker_url=None):
233+
def job_queue_size(*queues, broker_url=None, celery_app=None):
214234
"""
215235
Calculates the total job queue size across the specified queues using Celery with either Redis
216236
or RabbitMQ (AMQP) as the broker.
@@ -229,22 +249,31 @@ def job_queue_size(*queues, broker_url=None):
229249
using a workaround, such as a separate queue for scheduled tasks that forwards tasks ready
230250
to run to the relevant regular queues. When using RabbitMQ (AMQP), consider using the
231251
Delayed Message Plugin.
252+
- For RabbitMQ queues with custom arguments (e.g., x-max-priority for priority queues),
253+
pass your configured Celery app via the `celery_app` parameter. This allows the function
254+
to extract and use the correct queue arguments when querying RabbitMQ.
232255
233256
Args:
234257
*queues (str): Names of the queues for size measurement.
235-
broker_url (str, optional): The broker URL. Defaults in the following order:
258+
broker_url (str, optional): The broker URL. Cannot be used together with `celery_app`.
259+
Defaults in the following order:
236260
- Passed argument `broker_url`.
237261
- Environment variables `AMQP_URL`, `RABBITMQ_URL`, `RABBITMQ_BIGWIG_URL`,
238262
`CLOUDAMQP_URL`, `REDIS_TLS_URL`, `REDIS_URL`, `REDISTOGO_URL`, `REDISCLOUD_URL`,
239263
`OPENREDIS_URL`.
240264
- "amqp://guest:guest@localhost:5672" if AMQP is available, otherwise
241265
"redis://localhost:6379/0".
266+
celery_app (Celery, optional): A configured Celery app instance. Cannot be used together
267+
with `broker_url`. When provided, the function uses this app's connection and extracts
268+
queue arguments from celery_app.conf.task_queues. This is required for RabbitMQ queues
269+
with custom arguments like x-max-priority.
242270
243271
Returns:
244272
int: The cumulative job queue size across the specified queues.
245273
246274
Raises:
247275
MissingQueueError: If no queue names are provided.
276+
ValueError: If both `broker_url` and `celery_app` are provided.
248277
249278
Examples:
250279
>>> job_queue_size("celery")
@@ -255,43 +284,54 @@ def job_queue_size(*queues, broker_url=None):
255284
42
256285
>>> job_queue_size("celery", broker_url="redis://localhost:6379/0")
257286
42
287+
>>> # For priority queues, pass your configured Celery app:
288+
>>> job_queue_size("celery", celery_app=celery_app)
289+
42
258290
"""
259291
if not queues:
260292
raise MissingQueueError()
261293

262-
broker_url = (
263-
broker_url
264-
or os.environ.get("AMQP_URL")
265-
or os.environ.get("RABBITMQ_URL")
266-
or os.environ.get("RABBITMQ_BIGWIG_URL")
267-
or os.environ.get("CLOUDAMQP_URL")
268-
or os.environ.get("REDIS_TLS_URL")
269-
or os.environ.get("REDIS_URL")
270-
or os.environ.get("REDISTOGO_URL")
271-
or os.environ.get("REDISCLOUD_URL")
272-
or os.environ.get("OPENREDIS_URL")
273-
)
274-
275-
if not broker_url:
276-
if AMQP_AVAILABLE:
277-
broker_url = "amqp://guest:guest@localhost:5672"
278-
else:
279-
broker_url = "redis://localhost:6379/0"
280-
281-
app = Celery(broker=broker_url)
294+
if celery_app is not None and broker_url is not None:
295+
raise ValueError(
296+
"Cannot specify both 'celery_app' and 'broker_url'. "
297+
"Use 'celery_app' to pass your configured Celery app (recommended for priority queues), "
298+
"or 'broker_url' for simple setups."
299+
)
300+
301+
if celery_app is None:
302+
broker_url = (
303+
broker_url
304+
or os.environ.get("AMQP_URL")
305+
or os.environ.get("RABBITMQ_URL")
306+
or os.environ.get("RABBITMQ_BIGWIG_URL")
307+
or os.environ.get("CLOUDAMQP_URL")
308+
or os.environ.get("REDIS_TLS_URL")
309+
or os.environ.get("REDIS_URL")
310+
or os.environ.get("REDISTOGO_URL")
311+
or os.environ.get("REDISCLOUD_URL")
312+
or os.environ.get("OPENREDIS_URL")
313+
)
314+
315+
if not broker_url:
316+
if AMQP_AVAILABLE:
317+
broker_url = "amqp://guest:guest@localhost:5672"
318+
else:
319+
broker_url = "redis://localhost:6379/0"
320+
321+
celery_app = Celery(broker=broker_url)
282322

283323
try:
284-
with app.connection_or_acquire() as connection:
324+
with celery_app.connection_or_acquire() as connection:
285325
with connection.channel() as channel:
286-
worker_task_count = _job_queue_size_worker(app, queues)
287-
broker_task_count = _job_queue_size_broker(channel, queues)
326+
worker_task_count = _job_queue_size_worker(celery_app, queues)
327+
broker_task_count = _job_queue_size_broker(celery_app, channel, queues)
288328
return worker_task_count + broker_task_count
289329

290330
except OperationalError:
291331
return 0
292332

293333

294-
async def async_job_queue_size(*queues, broker_url=None):
334+
async def async_job_queue_size(*queues, broker_url=None, celery_app=None):
295335
"""
296336
Asynchronously calculates the total job queue size across the specified queues using Celery with
297337
either Redis or RabbitMQ (AMQP) as the broker.
@@ -311,22 +351,29 @@ async def async_job_queue_size(*queues, broker_url=None):
311351
using a workaround, such as a separate queue for scheduled tasks that forwards tasks ready
312352
to run to the relevant regular queues. When using RabbitMQ (AMQP), consider using the
313353
Delayed Message Plugin.
354+
- For RabbitMQ queues with custom arguments (e.g., x-max-priority for priority queues),
355+
pass your configured Celery app via the `celery_app` parameter.
314356
315357
Args:
316358
*queues (str): Names of the queues for size measurement.
317-
broker_url (str, optional): The broker URL. Defaults in the following order:
359+
broker_url (str, optional): The broker URL. Cannot be used together with `celery_app`.
360+
Defaults in the following order:
318361
- Passed argument `broker_url`.
319362
- Environment variables `AMQP_URL`, `RABBITMQ_URL`, `RABBITMQ_BIGWIG_URL`,
320363
`CLOUDAMQP_URL`, `REDIS_TLS_URL`, `REDIS_URL`, `REDISTOGO_URL`, `REDISCLOUD_URL`,
321364
`OPENREDIS_URL`.
322365
- "amqp://guest:guest@localhost:5672" if AMQP is available, otherwise
323366
"redis://localhost:6379/0".
367+
celery_app (Celery, optional): A configured Celery app instance. Cannot be used together
368+
with `broker_url`. When provided, the function uses this app's connection and extracts
369+
queue arguments from celery_app.conf.task_queues.
324370
325371
Returns:
326372
int: The cumulative job queue size across the specified queues.
327373
328374
Raises:
329375
MissingQueueError: If no queue names are provided.
376+
ValueError: If both `broker_url` and `celery_app` are provided.
330377
331378
Examples:
332379
>>> await async_job_queue_size("celery")
@@ -335,11 +382,13 @@ async def async_job_queue_size(*queues, broker_url=None):
335382
85
336383
>>> await async_job_queue_size("celery", broker_url="amqp://user:password@host:5672")
337384
42
338-
>>> await async_job_queue_size("celery", broker_url="redis://localhost:6379/0")
385+
>>> await async_job_queue_size("celery", celery_app=celery_app)
339386
42
340387
"""
341388
loop = asyncio.get_event_loop()
342-
func = functools.partial(job_queue_size, *queues, broker_url=broker_url)
389+
func = functools.partial(
390+
job_queue_size, *queues, broker_url=broker_url, celery_app=celery_app
391+
)
343392
return await loop.run_in_executor(None, func)
344393

345394

@@ -399,22 +448,26 @@ def _job_queue_size_worker(app, queues):
399448
return sum(worker_data.get(queue, 0) for queue in queues)
400449

401450

402-
def _job_queue_size_broker(channel, queues):
451+
def _job_queue_size_broker(app, channel, queues):
403452
if hasattr(channel, "_size"):
404-
fn = _job_queue_size_redis
453+
return sum(_job_queue_size_redis(channel, queue) for queue in queues)
405454
else:
406-
fn = _job_queue_size_rabbitmq
407-
408-
return sum(fn(channel, queue) for queue in queues)
455+
queue_args = _get_queue_arguments_from_app(app, queues)
456+
return sum(
457+
_job_queue_size_rabbitmq(channel, queue, queue_args.get(queue))
458+
for queue in queues
459+
)
409460

410461

411462
def _job_queue_size_redis(channel, queue):
412463
return channel.client.llen(queue)
413464

414465

415-
def _job_queue_size_rabbitmq(channel, queue):
466+
def _job_queue_size_rabbitmq(channel, queue, arguments=None):
416467
try:
417-
return channel.queue_declare(queue=queue, passive=True).message_count
468+
return channel.queue_declare(
469+
queue=queue, passive=True, arguments=arguments
470+
).message_count
418471
except ChannelError:
419472
return 0
420473

0 commit comments

Comments
 (0)