diff --git a/.github/workflows/pypi.yml b/.github/workflows/pypi.yml index c0b3d47..b5405af 100644 --- a/.github/workflows/pypi.yml +++ b/.github/workflows/pypi.yml @@ -18,12 +18,13 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4 with: fetch-depth: 0 + persist-credentials: false - name: Set up Python - uses: actions/setup-python@v5 + uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5 with: python-version: "3.12" @@ -33,4 +34,4 @@ jobs: python -m build - name: Publish to PyPI - uses: pypa/gh-action-pypi-publish@release/v1 + uses: pypa/gh-action-pypi-publish@cef221092ed1bacb1cc03d23a2d87d1d172e277b # release/v1 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 48ce0b4..704f1c6 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -27,10 +27,12 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4 + with: + persist-credentials: false - name: Install uv (with Python 3.12) - uses: astral-sh/setup-uv@v4 + uses: astral-sh/setup-uv@e4db8464a088ece1b920f60402e813ea4de65b8f # v4 with: python-version: "3.12" @@ -45,7 +47,7 @@ jobs: uv run pytest --cov=src --cov-branch --cov-report=xml - name: Upload results to Codecov - uses: codecov/codecov-action@v5 + uses: codecov/codecov-action@75cd11691c0faa626561e295848008c8a7dddffe # v5 with: token: ${{ secrets.CODECOV_TOKEN }} slug: flowdacity/queue-engine diff --git a/.qlty/.gitignore b/.qlty/.gitignore new file mode 100644 index 0000000..3036618 --- /dev/null +++ b/.qlty/.gitignore @@ -0,0 +1,7 @@ +* +!configs +!configs/** +!hooks +!hooks/** +!qlty.toml +!.gitignore diff --git a/.qlty/qlty.toml b/.qlty/qlty.toml new file mode 100644 index 0000000..f5140b3 --- /dev/null +++ b/.qlty/qlty.toml @@ -0,0 +1,88 @@ +# This file was automatically generated by `qlty init`. +# You can modify it to suit your needs. +# We recommend you to commit this file to your repository. +# +# This configuration is used by both Qlty CLI and Qlty Cloud. +# +# Qlty CLI -- Code quality toolkit for developers +# Qlty Cloud -- Fully automated Code Health Platform +# +# Try Qlty Cloud: https://qlty.sh +# +# For a guide to configuration, visit https://qlty.sh/d/config +# Or for a full reference, visit https://qlty.sh/d/qlty-toml +config_version = "0" + +exclude_patterns = [ + "*_min.*", + "*-min.*", + "*.min.*", + "**/.yarn/**", + "**/*.d.ts", + "**/assets/**", + "**/bower_components/**", + "**/build/**", + "**/cache/**", + "**/config/**", + "**/db/**", + "**/deps/**", + "**/dist/**", + "**/extern/**", + "**/external/**", + "**/generated/**", + "**/Godeps/**", + "**/gradlew/**", + "**/mvnw/**", + "**/node_modules/**", + "**/protos/**", + "**/seed/**", + "**/target/**", + "**/templates/**", + "**/testdata/**", + "**/vendor/**", +] + +test_patterns = [ + "**/test/**", + "**/spec/**", + "**/*.test.*", + "**/*.spec.*", + "**/*_test.*", + "**/*_spec.*", + "**/test_*.*", + "**/spec_*.*", +] + +[smells] +mode = "comment" + +[[source]] +name = "default" +default = true + + +[[plugin]] +name = "actionlint" + +[[plugin]] +name = "bandit" + +[[plugin]] +name = "radarlint-python" +mode = "comment" + +[[plugin]] +name = "ripgrep" +mode = "comment" + +[[plugin]] +name = "ruff" +drivers = [ + "lint", +] + +[[plugin]] +name = "trufflehog" + +[[plugin]] +name = "zizmor" diff --git a/README.md b/README.md index e1fe8e9..4eeb71d 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,13 @@ Flowdacity Queue ================ -Flowdacity Queue (FQ) is an asyncio-friendly, rate-limited job queue built on Redis. It stores jobs per queue type and queue id, enforces per-queue dequeue intervals, automatically requeues expired jobs, and exposes metrics to understand throughput and queue depth. +Flowdacity Queue (FQ) is a rate-limited job queue built on Redis. It stores jobs per queue type and queue id, enforces per-queue dequeue intervals, automatically requeues expired jobs, and exposes metrics to understand throughput and queue depth. ## Features - Per-queue rate limiting using millisecond intervals. -- Async Redis client with Lua scripts for predictable behavior. +- Async and sync interfaces backed by Redis clients from redis-py. +- Lua scripts for predictable queue behavior. - Automatic retries with configurable limits (including infinite retries). - Metrics for enqueue/dequeue counts and queue lengths. - Works with TCP or Unix socket Redis deployments and supports Redis Cluster. @@ -36,31 +37,48 @@ pip install -e . FQ accepts a simple config mapping. Intervals are in milliseconds. ```python config = { - "fq": { + "queue": { + "key_prefix": "queue_server", "job_expire_interval": 5000, "job_requeue_interval": 5000, "default_job_requeue_limit": -1, # -1 retries forever, 0 means no retries }, "redis": { "db": 0, - "key_prefix": "queue_server", - "conn_type": "tcp_sock", # or "unix_sock" + "conn_type": "tcp_sock", "host": "127.0.0.1", "port": 6379, "password": "", "clustered": False, - "unix_socket_path": "/tmp/redis.sock", }, } ``` -> If you connect via Unix sockets, uncomment the `unixsocket` lines in your `redis.conf`: +For Unix socket connections, use `conn_type: "unix_sock"` and provide +`unix_socket_path`: +```python +"redis": { + "db": 0, + "conn_type": "unix_sock", + "unix_socket_path": "/tmp/redis.sock", + "password": "", + "clustered": False, +} +``` + +> If you use Unix sockets, uncomment the `unixsocket` lines in your `redis.conf`: > ``` > unixsocket /var/run/redis/redis.sock > unixsocketperm 755 > ``` -## Quickstart +## Async Usage + +Import `FQ` from the top-level package: + +```python +from fq import FQ +``` ```python import asyncio @@ -70,20 +88,19 @@ from fq import FQ async def main(): config = { - "fq": { + "queue": { + "key_prefix": "queue_server", "job_expire_interval": 5000, "job_requeue_interval": 5000, "default_job_requeue_limit": -1, }, "redis": { "db": 0, - "key_prefix": "queue_server", "conn_type": "tcp_sock", "host": "127.0.0.1", "port": 6379, "password": "", "clustered": False, - "unix_socket_path": "/tmp/redis.sock", }, } @@ -114,13 +131,64 @@ async def main(): asyncio.run(main()) ``` -Common operations: +## Sync Usage + +Import `FQ` from `fq.sync`: + +```python +import uuid +from fq.sync import FQ + + +config = { + "queue": { + "key_prefix": "queue_server", + "job_expire_interval": 5000, + "job_requeue_interval": 5000, + "default_job_requeue_limit": -1, + }, + "redis": { + "db": 0, + "conn_type": "tcp_sock", + "host": "127.0.0.1", + "port": 6379, + "password": "", + "clustered": False, + }, +} + +fq = FQ(config) +fq.initialize() + +job_id = str(uuid.uuid4()) +fq.enqueue( + payload={"message": "hello, world"}, + interval=1000, + job_id=job_id, + queue_id="user001", + queue_type="sms", +) + +job = fq.dequeue(queue_type="sms") +if job["status"] == "success": + fq.finish( + queue_type="sms", + queue_id=job["queue_id"], + job_id=job["job_id"], + ) + +fq.close() +``` + +## Common Operations - `await fq.requeue()` — move expired jobs back onto their queues. - `await fq.interval(interval=5000, queue_id="user001", queue_type="sms")` — change a queue’s rate limit on the fly. - `await fq.metrics()` — global metrics; pass `queue_type` and/or `queue_id` for scoped stats and queue length. - `await fq.clear_queue(queue_type="sms", queue_id="user001", purge_all=True)` — drop queued jobs and their payload/interval metadata. +The same operations are available from `fq.sync.FQ` without `await`. + ## Development - Start Redis for local development: `make redis-up` (binds to `localhost:6379`). diff --git a/src/fq/base.py b/src/fq/base.py new file mode 100644 index 0000000..ffef37a --- /dev/null +++ b/src/fq/base.py @@ -0,0 +1,224 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. + +from dataclasses import dataclass + +from fq.config import FQConfig +from fq.exceptions import BadArgumentException +from fq.keys import RedisKeys +from fq.responses import ( + decode_redis_value, + format_dequeue_response, + format_metrics_counts, + format_queue_ids, + format_queue_types, +) +from fq.utils import generate_epoch +from fq.validators import ( + validate_clear_queue_arguments, + validate_dequeue_arguments, + validate_enqueue_arguments, + validate_finish_arguments, + validate_get_queue_length_arguments, + validate_interval_arguments, + validate_metrics_arguments, +) + + +@dataclass(frozen=True) +class ClearQueuePlan: + primary_set: str + job_queue: str + payload_hash: str + interval_hash: str + interval_member: str + queue_type: str + queue_id: str + + def payload_member(self, job_id): + return "%s:%s:%s" % (self.queue_type, self.queue_id, job_id) + + +class BaseFQ(object): + """Shared non-I/O behavior for async and sync FQ clients.""" + + def __init__(self, config): + self._r = None + self._scripts = None + self.config = FQConfig.from_mapping(config) + self._keys = RedisKeys(self.config.queue.key_prefix) + + self._key_prefix = self.config.queue.key_prefix + self._job_expire_interval = int(self.config.queue.job_expire_interval) + self._default_job_requeue_limit = int( + self.config.queue.default_job_requeue_limit + ) + + def redis_client(self): + return self._r + + def _current_timestamp(self): + return str(generate_epoch()) + + def _build_enqueue_call( + self, + payload, + interval, + job_id, + queue_id, + queue_type, + requeue_limit, + ): + enqueue_args = validate_enqueue_arguments( + payload, + interval, + job_id, + queue_id, + queue_type, + requeue_limit, + self._default_job_requeue_limit, + ) + keys = [self._key_prefix, queue_type] + args = [ + self._current_timestamp(), + queue_id, + job_id, + enqueue_args.serialized_payload, + interval, + enqueue_args.requeue_limit, + ] + return keys, args + + def _build_dequeue_call(self, queue_type): + validate_dequeue_arguments(queue_type) + return [self._key_prefix, queue_type], [ + self._current_timestamp(), + self._job_expire_interval, + ] + + def _build_finish_call(self, job_id, queue_id, queue_type): + validate_finish_arguments(job_id, queue_id, queue_type) + return [self._key_prefix, queue_type], [queue_id, job_id] + + def _build_interval_call(self, interval, queue_id, queue_type): + validate_interval_arguments(interval, queue_id, queue_type) + keys = [ + self._keys.interval_hash, + self._keys.interval_member(queue_type, queue_id), + ] + return keys, [interval] + + def _build_requeue_call(self, queue_type, timestamp): + queue_type = decode_redis_value(queue_type) + return [self._key_prefix, queue_type], [timestamp] + + def _build_global_metrics_call(self): + return [self._key_prefix], [self._current_timestamp()] + + def _build_queue_metrics_call(self, queue_type, queue_id): + return [self._keys.job_queue(queue_type, queue_id)], [self._current_timestamp()] + + def _validate_metrics_call(self, queue_type, queue_id): + validate_metrics_arguments(queue_type, queue_id) + if not queue_type and queue_id: + raise BadArgumentException( + "`queue_id` should be accompanied by `queue_type`." + ) + + def _queue_type_metrics_keys(self, queue_type): + return ( + self._keys.ready_queue_set(queue_type), + self._keys.active_queue_set(queue_type), + ) + + def _queue_length_key(self, queue_type, queue_id): + validate_get_queue_length_arguments(queue_type, queue_id) + return self._keys.job_queue(queue_type, queue_id) + + def _clear_queue_plan(self, queue_type, queue_id): + validate_clear_queue_arguments(queue_type, queue_id) + return ClearQueuePlan( + primary_set=self._keys.ready_queue_set(queue_type), + job_queue=self._keys.job_queue(queue_type, queue_id), + payload_hash=self._keys.payload_hash, + interval_hash=self._keys.interval_hash, + interval_member=self._keys.interval_member(queue_type, queue_id), + queue_type=queue_type, + queue_id=queue_id, + ) + + def _finish_response(self, finish_response): + if finish_response == 0: + return {"status": "failure"} + return {"status": "success"} + + def _interval_response(self, interval_response): + if interval_response == 0: + return {"status": "failure"} + return {"status": "success"} + + def _dequeue_response(self, dequeue_response): + return format_dequeue_response(dequeue_response) + + def _global_metrics_response( + self, + active_queue_types, + ready_queue_types, + enqueue_details, + dequeue_details, + ): + enqueue_counts, dequeue_counts = format_metrics_counts( + enqueue_details, + dequeue_details, + ) + return { + "status": "success", + "queue_types": format_queue_types(active_queue_types, ready_queue_types), + "enqueue_counts": enqueue_counts, + "dequeue_counts": dequeue_counts, + } + + def _queue_type_metrics_response(self, ready_queues, active_queues): + return { + "status": "success", + "queue_ids": format_queue_ids(ready_queues, active_queues), + } + + def _queue_metrics_response( + self, + queue_length, + enqueue_details, + dequeue_details, + ): + enqueue_counts, dequeue_counts = format_metrics_counts( + enqueue_details, + dequeue_details, + ) + return { + "status": "success", + "queue_length": int(queue_length), + "enqueue_counts": enqueue_counts, + "dequeue_counts": dequeue_counts, + } + + def _decode_redis_value(self, value): + return decode_redis_value(value) + + def _decode_requeue_job(self, job): + queue_id, job_id = decode_redis_value(job).split(":") + return queue_id, job_id + + def _clear_queue_empty_response(self): + return {"status": "Failure", "message": "No queued calls found"} + + def _clear_queue_removed_response(self): + return { + "status": "Success", + "message": "Successfully removed all queued calls", + } + + def _clear_queue_purged_response(self): + return { + "status": "Success", + "message": "Successfully removed all queued calls and purged related resources", + } diff --git a/src/fq/config.py b/src/fq/config.py new file mode 100644 index 0000000..7f93879 --- /dev/null +++ b/src/fq/config.py @@ -0,0 +1,201 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. + +from collections.abc import Mapping +from dataclasses import dataclass + +from fq.exceptions import FQException +from fq.utils import is_valid_interval, is_valid_requeue_limit + + +REDIS_CONN_TYPES = {"tcp_sock", "unix_sock"} + + +@dataclass(frozen=True) +class RedisConfig: + conn_type: str + db: int + host: str | None = None + port: int | None = None + unix_socket_path: str | None = None + clustered: bool = False + password: str | None = None + + @classmethod + def from_mapping(cls, config): + cls._validate_required(config) + cls._validate_connection(config) + cls._validate_optional(config) + + return cls( + conn_type=config["conn_type"], + db=config["db"], + host=config.get("host"), + port=config.get("port"), + unix_socket_path=config.get("unix_socket_path"), + clustered=config.get("clustered", False), + password=config.get("password"), + ) + + @classmethod + def _validate_required(cls, config): + conn_type = cls._require_value(config, "conn_type") + if conn_type not in REDIS_CONN_TYPES: + raise FQException( + "Invalid config: redis.conn_type must be 'tcp_sock' or 'unix_sock'" + ) + + db = cls._require_value(config, "db") + if not cls._is_int_not_bool(db): + raise FQException("Invalid config: redis.db must be an integer") + + @classmethod + def _validate_connection(cls, config): + cls._validate_clustered(config) + + if config["conn_type"] == "unix_sock": + cls._validate_unix_socket(config) + return + + cls._validate_tcp_socket(config) + + @classmethod + def _validate_clustered(cls, config): + if "clustered" in config and not isinstance(config["clustered"], bool): + raise FQException("Invalid config: redis.clustered must be a boolean") + + @classmethod + def _validate_unix_socket(cls, config): + unix_socket_path = cls._require_value(config, "unix_socket_path") + if not cls._is_non_empty_string(unix_socket_path): + raise FQException( + "Invalid config: redis.unix_socket_path must be a non-empty string" + ) + + @classmethod + def _validate_tcp_socket(cls, config): + host = cls._require_value(config, "host") + if not cls._is_non_empty_string(host): + raise FQException("Invalid config: redis.host must be a non-empty string") + + port = cls._require_value(config, "port") + if not cls._is_int_not_bool(port): + raise FQException("Invalid config: redis.port must be an integer") + + if port < 1 or port > 65535: + raise FQException( + "Invalid config: redis.port must be an integer between 1 and 65535" + ) + + @classmethod + def _validate_optional(cls, config): + if "password" in config and config["password"] is not None: + if not isinstance(config["password"], str): + raise FQException("Invalid config: redis.password must be a string") + + @staticmethod + def _require_value(config, option_name): + if option_name not in config: + raise FQException("Missing config: redis.%s" % option_name) + + return config[option_name] + + @staticmethod + def _is_non_empty_string(value): + return isinstance(value, str) and bool(value) + + @staticmethod + def _is_int_not_bool(value): + return isinstance(value, int) and not isinstance(value, bool) + + +@dataclass(frozen=True) +class QueueConfig: + key_prefix: str + job_expire_interval: int + job_requeue_interval: int + default_job_requeue_limit: int + + @classmethod + def from_mapping(cls, config): + cls._validate_required(config) + + return cls( + key_prefix=config["key_prefix"], + job_expire_interval=config["job_expire_interval"], + job_requeue_interval=config["job_requeue_interval"], + default_job_requeue_limit=config["default_job_requeue_limit"], + ) + + @classmethod + def _validate_required(cls, config): + key_prefix = cls._require_value(config, "key_prefix") + if not cls._is_non_empty_string(key_prefix): + raise FQException( + "Invalid config: queue.key_prefix must be a non-empty string" + ) + + for option_name in ("job_expire_interval", "job_requeue_interval"): + value = cls._require_value(config, option_name) + if not is_valid_interval(value): + raise FQException( + "Invalid config: queue.%s must be a positive integer" + % option_name + ) + + default_requeue_limit = cls._require_value(config, "default_job_requeue_limit") + if not is_valid_requeue_limit(default_requeue_limit): + raise FQException( + "Invalid config: " + "queue.default_job_requeue_limit must be an integer >= -1" + ) + + @staticmethod + def _require_value(config, option_name): + if option_name not in config: + raise FQException("Missing config: queue.%s" % option_name) + + return config[option_name] + + @staticmethod + def _is_non_empty_string(value): + return isinstance(value, str) and bool(value) + + +@dataclass(frozen=True) +class FQConfig: + redis: RedisConfig + queue: QueueConfig + + @classmethod + def from_mapping(cls, config): + normalized = cls._normalize_sections(config) + cls._require_sections(normalized) + + return cls( + redis=RedisConfig.from_mapping(normalized["redis"]), + queue=QueueConfig.from_mapping(normalized["queue"]), + ) + + @staticmethod + def _normalize_sections(config): + if not isinstance(config, Mapping): + raise FQException("Config must be a mapping with redis and queue sections") + + normalized = {} + for section_name, section_values in config.items(): + if not isinstance(section_values, Mapping): + raise FQException( + "Config section '%s' must be a mapping" % section_name + ) + + normalized[str(section_name)] = { + str(option): value for option, value in section_values.items() + } + + return normalized + + @staticmethod + def _require_sections(config): + if "redis" not in config or "queue" not in config: + raise FQException("Config missing required sections: redis, queue") diff --git a/src/fq/keys.py b/src/fq/keys.py new file mode 100644 index 0000000..ddf3279 --- /dev/null +++ b/src/fq/keys.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class RedisKeys: + key_prefix: str + + @property + def active_queue_types(self): + return "%s:active:queue_type" % self.key_prefix + + @property + def ready_queue_types(self): + return "%s:ready:queue_type" % self.key_prefix + + @property + def interval_hash(self): + return "%s:interval" % self.key_prefix + + @property + def payload_hash(self): + return "%s:payload" % self.key_prefix + + @property + def deep_status(self): + return "fq:deep_status:%s" % self.key_prefix + + def ready_queue_set(self, queue_type): + return "%s:%s" % (self.key_prefix, queue_type) + + def active_queue_set(self, queue_type): + return "%s:%s:active" % (self.key_prefix, queue_type) + + def job_queue(self, queue_type, queue_id): + return "%s:%s:%s" % (self.key_prefix, queue_type, queue_id) + + def interval_member(self, queue_type, queue_id): + return "%s:%s" % (queue_type, queue_id) + + def payload_member(self, queue_type, queue_id, job_id): + return "%s:%s:%s" % (queue_type, queue_id, job_id) diff --git a/src/fq/lua.py b/src/fq/lua.py new file mode 100644 index 0000000..cd9affa --- /dev/null +++ b/src/fq/lua.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. + +from dataclasses import dataclass, fields +from pathlib import Path +from typing import Any + + +@dataclass(frozen=True) +class LuaScripts: + enqueue: Any + dequeue: Any + finish: Any + interval: Any + requeue: Any + metrics: Any + + @classmethod + def register(cls, redis_client): + registered_scripts = { + script_field.name: redis_client.register_script( + cls._read_script(script_field.name) + ) + for script_field in fields(cls) + } + return cls(**registered_scripts) + + @staticmethod + def _read_script(script_name): + script_path = ( + Path(__file__).with_name("scripts") / "lua" / ("%s.lua" % script_name) + ) + return script_path.read_text(encoding="utf-8") diff --git a/src/fq/queue.py b/src/fq/queue.py index cde545c..41832c0 100644 --- a/src/fq/queue.py +++ b/src/fq/queue.py @@ -3,239 +3,27 @@ # Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. import asyncio -import os -from collections.abc import Mapping - -from redis.asyncio import Redis -from redis.asyncio.cluster import RedisCluster - -from fq.utils import ( - is_valid_identifier, - is_valid_interval, - is_valid_requeue_limit, - generate_epoch, - serialize_payload, - deserialize_payload, - convert_to_str, -) -from fq.exceptions import FQException, BadArgumentException - - -class FQ(object): - """The FQ object is the core of this queue. - FQ does the following. - - 1. Accepts structured configuration. - 2. Initializes the queue. - 3. Exposes functions to interact with the queue. - """ - - def __init__(self, config): - """Construct a FQ object by doing the following. - 1. Store the queue configuration. - 2. Validate the config shape. - """ - self._r = None # redis client placeholder - if not isinstance(config, Mapping): - raise FQException("Config must be a mapping with redis and fq sections") - - normalized = {} - for section_name, section_values in config.items(): - if not isinstance(section_values, Mapping): - raise FQException( - "Config section '%s' must be a mapping" % section_name - ) - - normalized[str(section_name)] = { - str(option): value for option, value in section_values.items() - } - - if "redis" not in normalized or "fq" not in normalized: - raise FQException("Config missing required sections: redis, fq") - - redis_config = normalized["redis"] - fq_config = normalized["fq"] - - if "key_prefix" not in redis_config: - raise FQException("Missing config: redis.key_prefix") - if not isinstance(redis_config["key_prefix"], str) or not redis_config[ - "key_prefix" - ]: - raise FQException( - "Invalid config: redis.key_prefix must be a non-empty string" - ) - - if "conn_type" not in redis_config: - raise FQException("Missing config: redis.conn_type") - if redis_config["conn_type"] not in {"tcp_sock", "unix_sock"}: - raise FQException( - "Invalid config: redis.conn_type must be 'tcp_sock' or 'unix_sock'" - ) - - if "db" not in redis_config: - raise FQException("Missing config: redis.db") - if isinstance(redis_config["db"], bool) or not isinstance( - redis_config["db"], int - ): - raise FQException("Invalid config: redis.db must be an integer") - - if "job_expire_interval" not in fq_config: - raise FQException("Missing config: fq.job_expire_interval") - if not is_valid_interval(fq_config["job_expire_interval"]): - raise FQException( - "Invalid config: fq.job_expire_interval must be a positive integer" - ) - - if "job_requeue_interval" not in fq_config: - raise FQException("Missing config: fq.job_requeue_interval") - if not is_valid_interval(fq_config["job_requeue_interval"]): - raise FQException( - "Invalid config: fq.job_requeue_interval must be a positive integer" - ) - - if "default_job_requeue_limit" not in fq_config: - raise FQException("Missing config: fq.default_job_requeue_limit") - if not is_valid_requeue_limit(fq_config["default_job_requeue_limit"]): - raise FQException( - "Invalid config: fq.default_job_requeue_limit must be an integer >= -1" - ) - - if redis_config["conn_type"] == "unix_sock": - if "unix_socket_path" not in redis_config: - raise FQException("Missing config: redis.unix_socket_path") - if not isinstance(redis_config["unix_socket_path"], str) or not redis_config[ - "unix_socket_path" - ]: - raise FQException( - "Invalid config: redis.unix_socket_path must be a non-empty string" - ) - if redis_config["conn_type"] == "tcp_sock": - if "host" not in redis_config: - raise FQException("Missing config: redis.host") - if not isinstance(redis_config["host"], str) or not redis_config["host"]: - raise FQException( - "Invalid config: redis.host must be a non-empty string" - ) +from fq.base import BaseFQ +from fq.lua import LuaScripts +from fq.redis import create_async_redis_client, validate_async_redis_connection - if "port" not in redis_config: - raise FQException("Missing config: redis.port") - if isinstance(redis_config["port"], bool) or not isinstance( - redis_config["port"], int - ): - raise FQException("Invalid config: redis.port must be an integer") - if "clustered" in redis_config and not isinstance( - redis_config["clustered"], bool - ): - raise FQException("Invalid config: redis.clustered must be a boolean") - - if "password" in redis_config and redis_config["password"] is not None: - if not isinstance(redis_config["password"], str): - raise FQException("Invalid config: redis.password must be a string") - - self.config = normalized +class FQ(BaseFQ): + """Async Flowdacity Queue API.""" async def initialize(self): - """Async initializer to set up redis and lua scripts.""" - fq_config = self.config["fq"] - redis_config = self.config["redis"] - - self._key_prefix = redis_config["key_prefix"] - self._job_expire_interval = int(fq_config["job_expire_interval"]) - self._default_job_requeue_limit = int(fq_config["default_job_requeue_limit"]) + """Set up the async Redis client and register Lua scripts.""" + self._r = create_async_redis_client(self.config.redis) + await validate_async_redis_connection(self._r) + self._register_lua_scripts() - redis_connection_type = redis_config["conn_type"] - db = redis_config["db"] - - if redis_connection_type == "unix_sock": - self._r = Redis( - db=db, - unix_socket_path=redis_config["unix_socket_path"], - ) - elif redis_connection_type == "tcp_sock": - isclustered = False - if "clustered" in redis_config: - isclustered = redis_config["clustered"] - - if isclustered: - startup_nodes = [ - { - "host": redis_config["host"], - "port": int(redis_config["port"]), - } - ] - self._r = RedisCluster( - startup_nodes=startup_nodes, - decode_responses=False, - socket_timeout=5, - ) - else: - self._r = Redis( - db=db, - host=redis_config["host"], - port=int(redis_config["port"]), - password=redis_config.get("password"), - ) - else: - raise FQException("Unknown redis conn_type: %s" % redis_connection_type) - - await self._validate_redis_connection() - self._load_lua_scripts() - - async def _validate_redis_connection(self): - """Ping redis once to surface bad connection details early.""" - if self._r is None: - raise FQException("Redis client is not initialized") - - ping = getattr(self._r, "ping", None) - if not callable(ping): - return - - try: - result = await ping() - except Exception as exc: - raise FQException("Failed to connect to Redis: %s" % exc) from exc - - if result is False: - raise FQException("Failed to connect to Redis: ping returned False") - - def redis_client(self): - return self._r - - def _load_lua_scripts(self): - """Loads all lua scripts required by FQ.""" - # load lua scripts - lua_script_path = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "scripts/lua" - ) - with open(os.path.join(lua_script_path, "enqueue.lua"), "r") as enqueue_file: - self._lua_enqueue_script = enqueue_file.read() - self._lua_enqueue = self._r.register_script(self._lua_enqueue_script) - - with open(os.path.join(lua_script_path, "dequeue.lua"), "r") as dequeue_file: - self._lua_dequeue_script = dequeue_file.read() - self._lua_dequeue = self._r.register_script(self._lua_dequeue_script) - - with open(os.path.join(lua_script_path, "finish.lua"), "r") as finish_file: - self._lua_finish_script = finish_file.read() - self._lua_finish = self._r.register_script(self._lua_finish_script) - - with open(os.path.join(lua_script_path, "interval.lua"), "r") as interval_file: - self._lua_interval_script = interval_file.read() - self._lua_interval = self._r.register_script(self._lua_interval_script) - - with open(os.path.join(lua_script_path, "requeue.lua"), "r") as requeue_file: - self._lua_requeue_script = requeue_file.read() - self._lua_requeue = self._r.register_script(self._lua_requeue_script) - - with open(os.path.join(lua_script_path, "metrics.lua"), "r") as metrics_file: - self._lua_metrics_script = metrics_file.read() - self._lua_metrics = self._r.register_script(self._lua_metrics_script) + def _register_lua_scripts(self): + self._scripts = LuaScripts.register(self._r) def reload_lua_scripts(self): - """Lets user reload the lua scripts in run time.""" - self._load_lua_scripts() + """Lets user reload the Lua scripts at run time.""" + self._register_lua_scripts() async def enqueue( self, @@ -246,383 +34,152 @@ async def enqueue( queue_type="default", requeue_limit=None, ): - """Enqueues the job into the specified queue_id - of a particular queue_type - """ - # validate all the input - if not is_valid_interval(interval): - raise BadArgumentException("`interval` has an invalid value.") - - if not is_valid_identifier(job_id): - raise BadArgumentException("`job_id` has an invalid value.") - - if not is_valid_identifier(queue_id): - raise BadArgumentException("`queue_id` has an invalid value.") - - if not is_valid_identifier(queue_type): - raise BadArgumentException("`queue_type` has an invalid value.") - - if requeue_limit is None: - requeue_limit = self._default_job_requeue_limit - - if not is_valid_requeue_limit(requeue_limit): - raise BadArgumentException("`requeue_limit` has an invalid value.") - - try: - serialized_payload = serialize_payload(payload) - except TypeError as e: - raise BadArgumentException("can not serialize.") - - timestamp = str(generate_epoch()) - keys = [self._key_prefix, queue_type] - args = [ - timestamp, - queue_id, - job_id, - serialized_payload, + """Enqueue a job into the specified queue_id and queue_type.""" + keys, args = self._build_enqueue_call( + payload, interval, + job_id, + queue_id, + queue_type, requeue_limit, - ] - await self._lua_enqueue(keys=keys, args=args) + ) + await self._scripts.enqueue(keys=keys, args=args) return {"status": "queued"} async def dequeue(self, queue_type="default"): - """Dequeues a job from any of the ready queues - based on the queue_type. If no job is ready, - returns a failure status. - """ - if not is_valid_identifier(queue_type): - raise BadArgumentException("`queue_type` has an invalid value.") - - timestamp = str(generate_epoch()) - - keys = [self._key_prefix, queue_type] - args = [timestamp, self._job_expire_interval] - - dequeue_response = await self._lua_dequeue(keys=keys, args=args) - - if len(dequeue_response) < 4: - return {"status": "failure"} - - queue_id, job_id, payload, requeues_remaining = dequeue_response - - if payload is None: - return {"status": "failure"} - - payload = deserialize_payload(payload) - - return { - "status": "success", - "queue_id": queue_id.decode("utf-8"), - "job_id": job_id.decode("utf-8"), - "payload": payload, - "requeues_remaining": int(requeues_remaining), - } + """Dequeue a ready job for queue_type, or return failure.""" + keys, args = self._build_dequeue_call(queue_type) + dequeue_response = await self._scripts.dequeue(keys=keys, args=args) + return self._dequeue_response(dequeue_response) async def finish(self, job_id, queue_id, queue_type="default"): - """Marks any dequeued job as *completed successfully*. - Any job which gets a finish will be treated as complete - and will be removed from the FQ. - """ - if not is_valid_identifier(job_id): - raise BadArgumentException("`job_id` has an invalid value.") - - if not is_valid_identifier(queue_id): - raise BadArgumentException("`queue_id` has an invalid value.") - - if not is_valid_identifier(queue_type): - raise BadArgumentException("`queue_type` has an invalid value.") - - keys = [self._key_prefix, queue_type] - - args = [queue_id, job_id] - - finish_response = await self._lua_finish(keys=keys, args=args) - if finish_response == 0: - # the finish failed. - return {"status": "failure"} - - return {"status": "success"} + """Mark a dequeued job as completed successfully.""" + keys, args = self._build_finish_call(job_id, queue_id, queue_type) + finish_response = await self._scripts.finish(keys=keys, args=args) + return self._finish_response(finish_response) async def interval(self, interval, queue_id, queue_type="default"): - """Updates the interval for a specific queue_id - of a particular queue type. - """ - # validate all the input - if not is_valid_interval(interval): - raise BadArgumentException("`interval` has an invalid value.") - - if not is_valid_identifier(queue_id): - raise BadArgumentException("`queue_id` has an invalid value.") - - if not is_valid_identifier(queue_type): - raise BadArgumentException("`queue_type` has an invalid value.") - - # generate the interval key - interval_hmap_key = "%s:interval" % self._key_prefix - interval_queue_key = "%s:%s" % (queue_type, queue_id) - keys = [interval_hmap_key, interval_queue_key] - - args = [interval] - interval_response = await self._lua_interval(keys=keys, args=args) - if interval_response == 0: - # the queue with the id and type does not exist. - return {"status": "failure"} - else: - return {"status": "success"} + """Update the interval for a queue_id and queue_type.""" + keys, args = self._build_interval_call(interval, queue_id, queue_type) + interval_response = await self._scripts.interval(keys=keys, args=args) + return self._interval_response(interval_response) async def requeue(self): - """Re-queues any expired job (one which does not get an expire - before the job_expiry_interval) back into their respective queue. - This function has to be run at specified intervals to ensure the - expired jobs are re-queued back. - """ - timestamp = str(generate_epoch()) - # get all queue_types and requeue one by one. - # not recommended to do this entire process - # in lua as it might take long and block other - # enqueues and dequeues. - active_queue_type_list = await self._r.smembers( - "%s:active:queue_type" % self._key_prefix - ) + """Re-queue expired active jobs back into their ready queues.""" + timestamp = self._current_timestamp() + active_queue_type_list = await self._r.smembers(self._keys.active_queue_types) for queue_type in active_queue_type_list: - # requeue all expired jobs in all queue types. - - queue_type = queue_type.decode("utf-8") - - keys = [self._key_prefix, queue_type] - - args = [timestamp] - job_discard_list = await self._lua_requeue(keys=keys, args=args) - # discard the jobs if any + queue_type = self._decode_redis_value(queue_type) + keys, args = self._build_requeue_call(queue_type, timestamp) + job_discard_list = await self._scripts.requeue(keys=keys, args=args) for job in job_discard_list: - queue_id, job_id = job.decode("utf-8").split(":") - # explicitly finishing a job - # is nothing but discard. + queue_id, job_id = self._decode_requeue_job(job) await self.finish( - job_id=job_id, queue_id=queue_id, queue_type=queue_type + job_id=job_id, + queue_id=queue_id, + queue_type=queue_type, ) async def metrics(self, queue_type=None, queue_id=None): - """Provides a way to get statistics about various parameters like, - * global enqueue / dequeue rates per min. - * per queue enqueue / dequeue rates per min. - * queue length of each queue. - * list of queue ids for each queue type. - """ - if queue_id is not None and not is_valid_identifier(queue_id): - raise BadArgumentException("`queue_id` has an invalid value.") - - if queue_type is not None and not is_valid_identifier(queue_type): - raise BadArgumentException("`queue_type` has an invalid value.") + """Return global, queue-type, or queue-specific metrics.""" + self._validate_metrics_call(queue_type, queue_id) - response = {"status": "failure"} if not queue_type and not queue_id: - # return global stats. - # list of active queue types (ready + active) - active_queue_types = await self._r.smembers( - "%s:active:queue_type" % self._key_prefix - ) - ready_queue_types = await self._r.smembers( - "%s:ready:queue_type" % self._key_prefix + active_queue_types = await self._r.smembers(self._keys.active_queue_types) + ready_queue_types = await self._r.smembers(self._keys.ready_queue_types) + + keys, args = self._build_global_metrics_call() + enqueue_details, dequeue_details = await self._scripts.metrics( + keys=keys, + args=args, ) - all_queue_types = active_queue_types | ready_queue_types - queue_types = convert_to_str(all_queue_types) - # global rates for past 10 minutes - timestamp = str(generate_epoch()) - keys = [self._key_prefix] - args = [timestamp] - enqueue_details, dequeue_details = await self._lua_metrics( - keys=keys, args=args + return self._global_metrics_response( + active_queue_types, + ready_queue_types, + enqueue_details, + dequeue_details, ) - enqueue_counts = {} - dequeue_counts = {} - # the length of enqueue & dequeue details are always same. - for i in range(0, len(enqueue_details), 2): - enqueue_counts[str(enqueue_details[i])] = int( - enqueue_details[i + 1] or 0 - ) - dequeue_counts[str(dequeue_details[i])] = int( - dequeue_details[i + 1] or 0 - ) - response.update( - { - "status": "success", - "queue_types": queue_types, - "enqueue_counts": enqueue_counts, - "dequeue_counts": dequeue_counts, - } + if queue_type and not queue_id: + ready_queue_key, active_queue_key = self._queue_type_metrics_keys( + queue_type ) - return response - elif queue_type and not queue_id: - # return list of queue_ids. - # get data from two sorted sets in a transaction pipe = self._r.pipeline() - pipe.zrange("%s:%s" % (self._key_prefix, queue_type), 0, -1) - pipe.zrange("%s:%s:active" % (self._key_prefix, queue_type), 0, -1) + pipe.zrange(ready_queue_key, 0, -1) + pipe.zrange(active_queue_key, 0, -1) ready_queues, active_queues = await pipe.execute() - # extract the queue_ids from the queue_id:job_id string - active_queues = [i.decode("utf-8").split(":")[0] for i in active_queues] - all_queue_set = set(ready_queues) | set(active_queues) - queue_list = convert_to_str(all_queue_set) - response.update({"status": "success", "queue_ids": queue_list}) - return response - elif queue_type and queue_id: - # return specific details. - active_queue_types = await self._r.smembers( - "%s:active:queue_type" % self._key_prefix - ) - ready_queue_types = await self._r.smembers( - "%s:ready:queue_type" % self._key_prefix - ) - all_queue_types = active_queue_types | ready_queue_types - # queue specific rates for past 10 minutes - timestamp = str(generate_epoch()) - keys = ["%s:%s:%s" % (self._key_prefix, queue_type, queue_id)] - args = [timestamp] - enqueue_details, dequeue_details = await self._lua_metrics( - keys=keys, args=args - ) - - enqueue_counts = {} - dequeue_counts = {} - # the length of enqueue & dequeue details are always same. - for i in range(0, len(enqueue_details), 2): - enqueue_counts[str(enqueue_details[i])] = int( - enqueue_details[i + 1] or 0 - ) - dequeue_counts[str(dequeue_details[i])] = int( - dequeue_details[i + 1] or 0 - ) + return self._queue_type_metrics_response(ready_queues, active_queues) - # get the queue length for the job queue - queue_length = await self._r.llen( - "%s:%s:%s" % (self._key_prefix, queue_type, queue_id) + if queue_type and queue_id: + keys, args = self._build_queue_metrics_call(queue_type, queue_id) + enqueue_details, dequeue_details = await self._scripts.metrics( + keys=keys, + args=args, ) - - response.update( - { - "status": "success", - "queue_length": int(queue_length), - "enqueue_counts": enqueue_counts, - "dequeue_counts": dequeue_counts, - } + queue_length = await self._r.llen( + self._queue_length_key(queue_type, queue_id) ) - return response - elif not queue_type and queue_id: - raise BadArgumentException( - "`queue_id` should be accompanied by `queue_type`." + return self._queue_metrics_response( + queue_length, + enqueue_details, + dequeue_details, ) - return response + return {"status": "failure"} async def deep_status(self): """ - To check the availability of redis. If redis is down get will throw exception + Check Redis availability. If Redis is down, set() will raise. :return: value or None """ - return await self._r.set( - "fq:deep_status:{}".format(self._key_prefix), "sharq_deep_status" - ) + return await self._r.set(self._keys.deep_status, "sharq_deep_status") async def clear_queue(self, queue_type=None, queue_id=None, purge_all=False): - """clear the all entries in queue with particular queue_id - and queue_type. It takes an optional argument, - purge_all : if True, then it will remove the related resources - from the redis. - """ - if queue_id is None or not is_valid_identifier(queue_id): - raise BadArgumentException("`queue_id` has an invalid value.") - - if queue_type is None or not is_valid_identifier(queue_type): - raise BadArgumentException("`queue_type` has an invalid value.") + """Clear entries in a queue and optionally purge related resources.""" + plan = self._clear_queue_plan(queue_type, queue_id) - response = {"status": "Failure", "message": "No queued calls found"} - # remove from the primary sorted set - primary_set = "{}:{}".format(self._key_prefix, queue_type) - queued_status = await self._r.zrem(primary_set, queue_id) + response = self._clear_queue_empty_response() + queued_status = await self._r.zrem(plan.primary_set, queue_id) if queued_status: - response.update( - { - "status": "Success", - "message": "Successfully removed all queued calls", - } - ) - # do a full cleanup of reources - # although this is not necessary as we don't remove resources - # while dequeue operation - job_queue_list = "{}:{}:{}".format(self._key_prefix, queue_type, queue_id) + response = self._clear_queue_removed_response() + if queued_status and purge_all: - job_list = await self._r.lrange(job_queue_list, 0, -1) + job_list = await self._r.lrange(plan.job_queue, 0, -1) pipe = self._r.pipeline() - # clear the payload data for job_uuid for job_uuid in job_list: if job_uuid is None: continue - if isinstance(job_uuid, bytes): - job_uuid_str = job_uuid.decode("utf-8") - else: - job_uuid_str = job_uuid - payload_set = "{}:payload".format(self._key_prefix) - job_payload_key = "{}:{}:{}".format(queue_type, queue_id, job_uuid_str) - pipe.hdel(payload_set, job_payload_key) - - # clear job request interval - interval_set = "{}:interval".format(self._key_prefix) - job_interval_key = "{}:{}".format(queue_type, queue_id) - pipe.hdel(interval_set, job_interval_key) - # clear job_queue_list - pipe.delete(job_queue_list) + job_uuid = self._decode_redis_value(job_uuid) + pipe.hdel(plan.payload_hash, plan.payload_member(job_uuid)) + + pipe.hdel(plan.interval_hash, plan.interval_member) + pipe.delete(plan.job_queue) await pipe.execute() - response.update( - { - "status": "Success", - "message": "Successfully removed all queued calls and purged related resources", - } - ) - else: - # always delete the job queue list - await self._r.delete(job_queue_list) + return self._clear_queue_purged_response() + + await self._r.delete(plan.job_queue) return response async def get_queue_length(self, queue_type, queue_id): """ - Return the current length present in redis key of type list - Redis key structure : key_prefix : queue_type : queue_id + Return the current Redis list length for key_prefix:queue_type:queue_id. """ - - # validate all the input - if not is_valid_identifier(queue_type): - raise BadArgumentException("`queue_type` has an invalid value.") - - if not is_valid_identifier(queue_id): - raise BadArgumentException("`queue_id` has an invalid value.") - - redis_key = self._key_prefix + ":" + queue_type + ":" + queue_id - current_queue_length = await self._r.llen(redis_key) - return current_queue_length + redis_key = self._queue_length_key(queue_type, queue_id) + return await self._r.llen(redis_key) async def close(self): - """ - Cleanly close the underlying Redis client / connection pool. - - This is intended to be called by tests or by application shutdown - hooks to avoid ResourceWarning: unclosed . - """ + """Cleanly close the underlying Redis client or connection pool.""" if self._r is None: return conn = self._r - # Prefer the asyncio-style aclose() if available (redis-py >= 4.2+) aclose = getattr(conn, "aclose", None) if callable(aclose): await aclose() self._r = None return - # Older / alternate API: close() [+ wait_closed()] close = getattr(conn, "close", None) if callable(close): maybe_coro = close() @@ -635,7 +192,6 @@ async def close(self): if asyncio.iscoroutine(maybe_coro): await maybe_coro - # As a final fallback, disconnect the connection pool pool = getattr(conn, "connection_pool", None) if pool is not None: disconnect = getattr(pool, "disconnect", None) diff --git a/src/fq/redis.py b/src/fq/redis.py new file mode 100644 index 0000000..404bb3c --- /dev/null +++ b/src/fq/redis.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. + +from redis import Redis as SyncRedis +from redis import RedisCluster as SyncRedisCluster +from redis.asyncio import Redis as AsyncRedis +from redis.asyncio.cluster import ClusterNode as AsyncClusterNode +from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster + +from fq.exceptions import FQException + + +def create_async_redis_client(redis_config): + if redis_config.conn_type == "unix_sock": + return AsyncRedis( + db=redis_config.db, + unix_socket_path=redis_config.unix_socket_path, + password=redis_config.password, + ) + + if redis_config.conn_type == "tcp_sock": + if redis_config.clustered: + startup_nodes = [ + AsyncClusterNode(redis_config.host, int(redis_config.port)), + ] + return AsyncRedisCluster( + startup_nodes=startup_nodes, + decode_responses=False, + password=redis_config.password, + socket_timeout=5, + ) + + return AsyncRedis( + db=redis_config.db, + host=redis_config.host, + port=int(redis_config.port), + password=redis_config.password, + ) + + raise FQException("Unknown redis conn_type: %s" % redis_config.conn_type) + + +def create_sync_redis_client(redis_config): + if redis_config.conn_type == "unix_sock": + return SyncRedis( + db=redis_config.db, + unix_socket_path=redis_config.unix_socket_path, + password=redis_config.password, + ) + + if redis_config.conn_type == "tcp_sock": + if redis_config.clustered: + return SyncRedisCluster( + host=redis_config.host, + port=int(redis_config.port), + decode_responses=False, + password=redis_config.password, + socket_timeout=5, + ) + + return SyncRedis( + db=redis_config.db, + host=redis_config.host, + port=int(redis_config.port), + password=redis_config.password, + ) + + raise FQException("Unknown redis conn_type: %s" % redis_config.conn_type) + + +async def validate_async_redis_connection(redis_client): + if redis_client is None: + raise FQException("Redis client is not initialized") + + ping = getattr(redis_client, "ping", None) + if not callable(ping): + return + + try: + result = await ping() + except Exception as exc: + raise FQException("Failed to connect to Redis: %s" % exc) from exc + + if result is False: + raise FQException("Failed to connect to Redis: ping returned False") + + +def validate_sync_redis_connection(redis_client): + if redis_client is None: + raise FQException("Redis client is not initialized") + + ping = getattr(redis_client, "ping", None) + if not callable(ping): + return + + try: + result = ping() + except Exception as exc: + raise FQException("Failed to connect to Redis: %s" % exc) from exc + + if result is False: + raise FQException("Failed to connect to Redis: ping returned False") diff --git a/src/fq/responses.py b/src/fq/responses.py new file mode 100644 index 0000000..6eb7091 --- /dev/null +++ b/src/fq/responses.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. + +from fq.utils import convert_to_str, deserialize_payload + + +def decode_redis_value(value): + if isinstance(value, bytes): + return value.decode("utf-8") + return value + + +def format_dequeue_response(dequeue_response): + if len(dequeue_response) < 4: + return {"status": "failure"} + + queue_id, job_id, payload, requeues_remaining = dequeue_response + + if payload is None: + return {"status": "failure"} + + return { + "status": "success", + "queue_id": decode_redis_value(queue_id), + "job_id": decode_redis_value(job_id), + "payload": deserialize_payload(payload), + "requeues_remaining": int(requeues_remaining), + } + + +def format_metrics_counts(enqueue_details, dequeue_details): + enqueue_counts = {} + dequeue_counts = {} + for i in range(0, len(enqueue_details), 2): + enqueue_counts[str(decode_redis_value(enqueue_details[i]))] = int( + enqueue_details[i + 1] or 0 + ) + dequeue_counts[str(decode_redis_value(dequeue_details[i]))] = int( + dequeue_details[i + 1] or 0 + ) + return enqueue_counts, dequeue_counts + + +def format_queue_types(active_queue_types, ready_queue_types): + return convert_to_str(set(active_queue_types) | set(ready_queue_types)) + + +def format_queue_ids(ready_queues, active_queues): + ready_queue_ids = {decode_redis_value(queue) for queue in ready_queues} + active_queue_ids = { + decode_redis_value(queue).split(":")[0] for queue in active_queues + } + return convert_to_str(ready_queue_ids | active_queue_ids) diff --git a/src/fq/sync/__init__.py b/src/fq/sync/__init__.py new file mode 100644 index 0000000..990d355 --- /dev/null +++ b/src/fq/sync/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. +from .queue import FQ + +__all__ = ["FQ"] diff --git a/src/fq/sync/queue.py b/src/fq/sync/queue.py new file mode 100644 index 0000000..e9652ea --- /dev/null +++ b/src/fq/sync/queue.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. + +from fq.base import BaseFQ +from fq.lua import LuaScripts +from fq.redis import create_sync_redis_client, validate_sync_redis_connection + + +class FQ(BaseFQ): + """Synchronous Flowdacity Queue API.""" + + def initialize(self): + """Set up the synchronous Redis client and register Lua scripts.""" + self._r = create_sync_redis_client(self.config.redis) + validate_sync_redis_connection(self._r) + self._register_lua_scripts() + + def _register_lua_scripts(self): + self._scripts = LuaScripts.register(self._r) + + def reload_lua_scripts(self): + """Lets user reload the Lua scripts at run time.""" + self._register_lua_scripts() + + def enqueue( + self, + payload, + interval, + job_id, + queue_id, + queue_type="default", + requeue_limit=None, + ): + """Enqueue a job into the specified queue_id and queue_type.""" + keys, args = self._build_enqueue_call( + payload, + interval, + job_id, + queue_id, + queue_type, + requeue_limit, + ) + self._scripts.enqueue(keys=keys, args=args) + return {"status": "queued"} + + def dequeue(self, queue_type="default"): + """Dequeue a ready job for queue_type, or return failure.""" + keys, args = self._build_dequeue_call(queue_type) + dequeue_response = self._scripts.dequeue(keys=keys, args=args) + return self._dequeue_response(dequeue_response) + + def finish(self, job_id, queue_id, queue_type="default"): + """Mark a dequeued job as completed successfully.""" + keys, args = self._build_finish_call(job_id, queue_id, queue_type) + finish_response = self._scripts.finish(keys=keys, args=args) + return self._finish_response(finish_response) + + def interval(self, interval, queue_id, queue_type="default"): + """Update the interval for a queue_id and queue_type.""" + keys, args = self._build_interval_call(interval, queue_id, queue_type) + interval_response = self._scripts.interval(keys=keys, args=args) + return self._interval_response(interval_response) + + def requeue(self): + """Re-queue expired active jobs back into their ready queues.""" + timestamp = self._current_timestamp() + active_queue_type_list = self._r.smembers(self._keys.active_queue_types) + for queue_type in active_queue_type_list: + queue_type = self._decode_redis_value(queue_type) + keys, args = self._build_requeue_call(queue_type, timestamp) + job_discard_list = self._scripts.requeue(keys=keys, args=args) + for job in job_discard_list: + queue_id, job_id = self._decode_requeue_job(job) + self.finish(job_id=job_id, queue_id=queue_id, queue_type=queue_type) + + def metrics(self, queue_type=None, queue_id=None): + """Return global, queue-type, or queue-specific metrics.""" + self._validate_metrics_call(queue_type, queue_id) + + if not queue_type and not queue_id: + active_queue_types = self._r.smembers(self._keys.active_queue_types) + ready_queue_types = self._r.smembers(self._keys.ready_queue_types) + + keys, args = self._build_global_metrics_call() + enqueue_details, dequeue_details = self._scripts.metrics( + keys=keys, + args=args, + ) + return self._global_metrics_response( + active_queue_types, + ready_queue_types, + enqueue_details, + dequeue_details, + ) + + if queue_type and not queue_id: + ready_queue_key, active_queue_key = self._queue_type_metrics_keys( + queue_type + ) + pipe = self._r.pipeline() + pipe.zrange(ready_queue_key, 0, -1) + pipe.zrange(active_queue_key, 0, -1) + ready_queues, active_queues = pipe.execute() + return self._queue_type_metrics_response(ready_queues, active_queues) + + if queue_type and queue_id: + keys, args = self._build_queue_metrics_call(queue_type, queue_id) + enqueue_details, dequeue_details = self._scripts.metrics( + keys=keys, + args=args, + ) + queue_length = self._r.llen(self._queue_length_key(queue_type, queue_id)) + return self._queue_metrics_response( + queue_length, + enqueue_details, + dequeue_details, + ) + + return {"status": "failure"} + + def deep_status(self): + """ + Check Redis availability. If Redis is down, set() will raise. + :return: value or None + """ + return self._r.set(self._keys.deep_status, "sharq_deep_status") + + def clear_queue(self, queue_type=None, queue_id=None, purge_all=False): + """Clear entries in a queue and optionally purge related resources.""" + plan = self._clear_queue_plan(queue_type, queue_id) + + response = self._clear_queue_empty_response() + queued_status = self._r.zrem(plan.primary_set, queue_id) + if queued_status: + response = self._clear_queue_removed_response() + + if queued_status and purge_all: + job_list = self._r.lrange(plan.job_queue, 0, -1) + pipe = self._r.pipeline() + for job_uuid in job_list: + if job_uuid is None: + continue + job_uuid = self._decode_redis_value(job_uuid) + pipe.hdel(plan.payload_hash, plan.payload_member(job_uuid)) + + pipe.hdel(plan.interval_hash, plan.interval_member) + pipe.delete(plan.job_queue) + pipe.execute() + return self._clear_queue_purged_response() + + self._r.delete(plan.job_queue) + return response + + def get_queue_length(self, queue_type, queue_id): + """ + Return the current Redis list length for key_prefix:queue_type:queue_id. + """ + redis_key = self._queue_length_key(queue_type, queue_id) + return self._r.llen(redis_key) + + def close(self): + """Close the underlying synchronous Redis client.""" + if self._r is None: + return + + conn = self._r + close = getattr(conn, "close", None) + if callable(close): + close() + self._r = None + return + + pool = getattr(conn, "connection_pool", None) + if pool is not None: + disconnect = getattr(pool, "disconnect", None) + if callable(disconnect): + disconnect() + + self._r = None diff --git a/src/fq/utils.py b/src/fq/utils.py index 66fb642..29c6168 100644 --- a/src/fq/utils.py +++ b/src/fq/utils.py @@ -74,9 +74,8 @@ def convert_to_str(queue_set): """Takes set and decodes bytes to string""" queue_list = [] for queue in list(queue_set): - try: + if isinstance(queue, (bytes, bytearray)): queue_list.append(queue.decode("utf-8")) - except Exception as e: + else: queue_list.append(queue) - pass return queue_list diff --git a/src/fq/validators.py b/src/fq/validators.py new file mode 100644 index 0000000..5b869dd --- /dev/null +++ b/src/fq/validators.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. + +from dataclasses import dataclass + +from fq.exceptions import BadArgumentException +from fq.utils import ( + is_valid_identifier, + is_valid_interval, + is_valid_requeue_limit, + serialize_payload, +) + + +INVALID_INTERVAL = "`interval` has an invalid value." +INVALID_JOB_ID = "`job_id` has an invalid value." +INVALID_QUEUE_ID = "`queue_id` has an invalid value." +INVALID_QUEUE_TYPE = "`queue_type` has an invalid value." +INVALID_REQUEUE_LIMIT = "`requeue_limit` has an invalid value." + + +@dataclass(frozen=True) +class EnqueueArguments: + serialized_payload: bytes + requeue_limit: int + + +def validate_enqueue_arguments( + payload, + interval, + job_id, + queue_id, + queue_type, + requeue_limit, + default_requeue_limit, +): + if not is_valid_interval(interval): + raise BadArgumentException(INVALID_INTERVAL) + + _validate_identifier(job_id, INVALID_JOB_ID) + _validate_identifier(queue_id, INVALID_QUEUE_ID) + _validate_identifier(queue_type, INVALID_QUEUE_TYPE) + + if requeue_limit is None: + requeue_limit = default_requeue_limit + + if not is_valid_requeue_limit(requeue_limit): + raise BadArgumentException(INVALID_REQUEUE_LIMIT) + + try: + serialized_payload = serialize_payload(payload) + except TypeError as exc: + raise BadArgumentException("can not serialize.") from exc + + return EnqueueArguments( + serialized_payload=serialized_payload, + requeue_limit=requeue_limit, + ) + + +def validate_dequeue_arguments(queue_type): + _validate_identifier(queue_type, INVALID_QUEUE_TYPE) + + +def validate_finish_arguments(job_id, queue_id, queue_type): + _validate_identifier(job_id, INVALID_JOB_ID) + _validate_identifier(queue_id, INVALID_QUEUE_ID) + _validate_identifier(queue_type, INVALID_QUEUE_TYPE) + + +def validate_interval_arguments(interval, queue_id, queue_type): + if not is_valid_interval(interval): + raise BadArgumentException(INVALID_INTERVAL) + + _validate_identifier(queue_id, INVALID_QUEUE_ID) + _validate_identifier(queue_type, INVALID_QUEUE_TYPE) + + +def validate_metrics_arguments(queue_type, queue_id): + if queue_id is not None and not is_valid_identifier(queue_id): + raise BadArgumentException(INVALID_QUEUE_ID) + + if queue_type is not None and not is_valid_identifier(queue_type): + raise BadArgumentException(INVALID_QUEUE_TYPE) + + +def validate_clear_queue_arguments(queue_type, queue_id): + if queue_id is None or not is_valid_identifier(queue_id): + raise BadArgumentException(INVALID_QUEUE_ID) + + if queue_type is None or not is_valid_identifier(queue_type): + raise BadArgumentException(INVALID_QUEUE_TYPE) + + +def validate_get_queue_length_arguments(queue_type, queue_id): + _validate_identifier(queue_type, INVALID_QUEUE_TYPE) + _validate_identifier(queue_id, INVALID_QUEUE_ID) + + +def _validate_identifier(identifier, message): + if not is_valid_identifier(identifier): + raise BadArgumentException(message) diff --git a/tests/config.py b/tests/config.py index 617030c..9d05648 100644 --- a/tests/config.py +++ b/tests/config.py @@ -1,19 +1,24 @@ # -*- coding: utf-8 -*- from copy import deepcopy +from os.path import join +from tempfile import gettempdir + + +TEST_UNIX_SOCKET_PATH = join(gettempdir(), "redis.sock") TEST_CONFIG = { - "fq": { + "queue": { + "key_prefix": "test_fq", "job_expire_interval": 5000, "job_requeue_interval": 5000, "default_job_requeue_limit": -1, }, "redis": { "db": 0, - "key_prefix": "test_fq", "conn_type": "tcp_sock", - "unix_socket_path": "/tmp/redis.sock", + "unix_socket_path": TEST_UNIX_SOCKET_PATH, "port": 6379, "host": "127.0.0.1", "clustered": False, diff --git a/tests/test_edge_cases.py b/tests/test_edge_cases.py index b84a51d..1400a31 100644 --- a/tests/test_edge_cases.py +++ b/tests/test_edge_cases.py @@ -3,18 +3,30 @@ import unittest +from contextlib import suppress +from types import SimpleNamespace from unittest.mock import patch from fq import FQ -from fq.utils import is_valid_identifier +from fq.config import FQConfig from fq.exceptions import BadArgumentException, FQException +from fq.redis import create_async_redis_client, create_sync_redis_client +from fq.responses import format_queue_ids +from fq.utils import is_valid_identifier from tests.config import build_test_config class FakeCluster: - def __init__(self, startup_nodes=None, decode_responses=False, socket_timeout=None): + def __init__( + self, + startup_nodes=None, + decode_responses=False, + password=None, + socket_timeout=None, + ): self.startup_nodes = startup_nodes or [] self.decode_responses = decode_responses + self.password = password self.socket_timeout = socket_timeout def register_script(self, _): @@ -55,7 +67,8 @@ async def set(self, key, value): class FakeRedisConnectionFailure: def __init__(self, *args, **kwargs): - pass + self.init_args = args + self.init_kwargs = kwargs async def ping(self): raise ConnectionError("boom") @@ -110,31 +123,51 @@ async def delete(self, key): self.deleted_keys.append(key) +class RecordingRedisClient: + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + class TestEdgeCases(unittest.IsolatedAsyncioTestCase): + # qlty-ignore(radarlint-python:python:S5899): unittest lifecycle hook. async def asyncSetUp(self): self.config = build_test_config() self.fq_instance = None + # qlty-ignore(radarlint-python:python:S5899): unittest lifecycle hook. async def asyncTearDown(self): """Clean up Redis state and close connections after each test.""" # If a test initialized FQ with real Redis, clean up if self.fq_instance is not None: - try: + with suppress(Exception): if self.fq_instance._r is not None: await self.fq_instance._r.flushdb() await self.fq_instance.close() - except Exception: - # Ignore errors during cleanup - tests may have mocked or closed connections - # This prevents tearDown failures from masking test failures - pass self.fq_instance = None def test_invalid_config_type_raises(self): with self.assertRaisesRegex(FQException, "Config must be a mapping"): - FQ("/tmp/does-not-exist.conf") + FQ("does-not-exist.conf") + + def test_missing_required_config_section_raises(self): + config = build_test_config() + del config["queue"] + with self.assertRaisesRegex( + FQException, "Config missing required sections: redis, queue" + ): + FQ(config) + + def test_fq_config_section_is_not_supported(self): + config = build_test_config() + config["fq"] = config.pop("queue") + with self.assertRaisesRegex( + FQException, "Config missing required sections: redis, queue" + ): + FQ(config) async def test_initialize_fails_fast_on_bad_redis(self): - with patch("fq.queue.Redis", FakeRedisConnectionFailure): + with patch("fq.redis.AsyncRedis", FakeRedisConnectionFailure): fq = FQ(self.config) with self.assertRaisesRegex(FQException, "Failed to connect to Redis"): await fq.initialize() @@ -142,12 +175,20 @@ async def test_initialize_fails_fast_on_bad_redis(self): async def test_cluster_initialization(self): """Covers clustered Redis path (queue.py lines 69-75, 104-106).""" config = build_test_config( - redis={"key_prefix": "test_fq_cluster", "clustered": True} + queue={"key_prefix": "test_fq_cluster"}, + redis={ + "clustered": True, + "password": "cluster-password", + } ) - with patch("fq.queue.RedisCluster", FakeCluster): + with patch("fq.redis.AsyncRedisCluster", FakeCluster): fq = FQ(config) await fq.initialize() self.assertIsInstance(fq.redis_client(), FakeCluster) + self.assertEqual(fq.redis_client().password, "cluster-password") + startup_node = fq.redis_client().startup_nodes[0] + self.assertEqual(startup_node.host, "127.0.0.1") + self.assertEqual(startup_node.port, 6379) await fq.close() def test_clustered_config_must_be_boolean(self): @@ -157,27 +198,49 @@ def test_clustered_config_must_be_boolean(self): ): FQ(config) + def test_unix_socket_clustered_config_must_be_boolean(self): + config = build_test_config( + redis={ + "conn_type": "unix_sock", + "clustered": "true", + } + ) + with self.assertRaisesRegex( + FQException, "Invalid config: redis.clustered must be a boolean" + ): + FQ(config) + def test_missing_required_config_key_raises_with_path(self): config = build_test_config() - del config["redis"]["key_prefix"] - with self.assertRaisesRegex(FQException, "Missing config: redis.key_prefix"): + del config["queue"]["key_prefix"] + with self.assertRaisesRegex(FQException, "Missing config: queue.key_prefix"): FQ(config) def test_invalid_config_value_raises_with_path(self): - config = build_test_config(fq={"job_expire_interval": "5000"}) + config = build_test_config(queue={"job_expire_interval": "5000"}) with self.assertRaisesRegex( FQException, - "Invalid config: fq.job_expire_interval must be a positive integer", + "Invalid config: queue.job_expire_interval must be a positive integer", ): FQ(config) + def test_invalid_redis_port_range_raises(self): + for port in (0, -1, 65536): + with self.subTest(port=port): + config = build_test_config(redis={"port": port}) + with self.assertRaisesRegex( + FQException, + "Invalid config: redis.port must be an integer between 1 and 65535", + ): + FQ(config) + async def test_dequeue_payload_none(self): """Covers dequeue branch where payload is None (queue.py line 212).""" fq = FQ(self.config) self.fq_instance = fq await fq.initialize() fake_dequeue = FakeLuaDequeue() - fq._lua_dequeue = fake_dequeue + fq._scripts = SimpleNamespace(dequeue=fake_dequeue) result = await fq.dequeue() self.assertEqual(result["status"], "failure") self.assertTrue(fake_dequeue.called) @@ -201,12 +264,14 @@ async def test_close_fallback_paths(self): async def test_deep_status_calls_set(self): """Covers deep_status (queue.py line 420).""" fq = FQ(self.config) - fq._key_prefix = fq.config["redis"]["key_prefix"] fq._r = FakeRedisForDeepStatus() await fq.deep_status() self.assertEqual( fq._r.key_set, - ("fq:deep_status:{}".format(fq._key_prefix), "sharq_deep_status"), + ( + "fq:deep_status:{}".format(fq.config.queue.key_prefix), + "sharq_deep_status", + ), ) def test_is_valid_identifier_non_string(self): @@ -215,10 +280,54 @@ def test_is_valid_identifier_non_string(self): self.assertFalse(is_valid_identifier(None)) self.assertFalse(is_valid_identifier(["a"])) + def test_format_queue_ids_deduplicates_ready_and_active_queues(self): + queue_ids = format_queue_ids( + ready_queues=[b"johndoe", b"ready-only"], + active_queues=[b"johndoe:job-1", "active-only:job-2"], + ) + + self.assertEqual(set(queue_ids), {"johndoe", "ready-only", "active-only"}) + self.assertEqual(len(queue_ids), 3) + + def test_redis_factories_pass_password_to_unix_socket_clients(self): + config = FQConfig.from_mapping( + build_test_config( + redis={ + "conn_type": "unix_sock", + "password": "socket-password", + } + ) + ) + + with patch("fq.redis.AsyncRedis", RecordingRedisClient): + async_client = create_async_redis_client(config.redis) + with patch("fq.redis.SyncRedis", RecordingRedisClient): + sync_client = create_sync_redis_client(config.redis) + + self.assertEqual(async_client.kwargs["password"], "socket-password") + self.assertEqual(sync_client.kwargs["password"], "socket-password") + + def test_redis_factories_pass_password_to_cluster_clients(self): + config = FQConfig.from_mapping( + build_test_config( + redis={ + "clustered": True, + "password": "cluster-password", + } + ) + ) + + with patch("fq.redis.AsyncRedisCluster", RecordingRedisClient): + async_client = create_async_redis_client(config.redis) + with patch("fq.redis.SyncRedisCluster", RecordingRedisClient): + sync_client = create_sync_redis_client(config.redis) + + self.assertEqual(async_client.kwargs["password"], "cluster-password") + self.assertEqual(sync_client.kwargs["password"], "cluster-password") + async def test_clear_queue_purge_all_with_mixed_job_ids(self): """Covers purge_all loop branches (queue.py lines 463-468, 474-479).""" fq = FQ(self.config) - fq._key_prefix = fq.config["redis"]["key_prefix"] fq._r = FakeRedisForClear() response = await fq.clear_queue("qt", "qid", purge_all=True) self.assertEqual(response["status"], "Success") diff --git a/tests/test_func.py b/tests/test_func.py index a2035d6..a2411ce 100644 --- a/tests/test_func.py +++ b/tests/test_func.py @@ -1,11 +1,12 @@ # -*- coding: utf-8 -*- # Copyright (c) 2014 Plivo Team. See LICENSE.txt for details. import uuid -import time import math import asyncio import unittest import msgpack +from os.path import join +from tempfile import gettempdir from unittest.mock import AsyncMock, MagicMock from fq import FQ from fq.exceptions import FQException @@ -13,6 +14,8 @@ from tests.config import build_test_config +NONEXISTENT_UNIX_SOCKET_PATH = join(gettempdir(), "redis_nonexistent.sock") + class FQTestCase(unittest.IsolatedAsyncioTestCase): """ @@ -21,6 +24,7 @@ class FQTestCase(unittest.IsolatedAsyncioTestCase): by FQ. """ + # qlty-ignore(radarlint-python:python:S5899): unittest lifecycle hook. async def asyncSetUp(self): self.queue = FQ(build_test_config()) # flush all the keys in the test db before starting test @@ -606,7 +610,6 @@ async def test_enqueue_second_job_queue_type_ready_set(self): ) # job 2 job_id = self._get_job_id() - start_time = str(generate_epoch()) await self.queue.enqueue( payload=self._test_payload_2, interval=20000, @@ -634,7 +637,6 @@ async def test_enqueue_second_job_queue_type_active_set(self): ) # job 2 job_id = self._get_job_id() - start_time = str(generate_epoch()) await self.queue.enqueue( payload=self._test_payload_2, interval=20000, @@ -712,7 +714,7 @@ async def test_dequeue_time_keeper_existence(self): queue_id=self._test_queue_id, queue_type=self._test_queue_type, ) - response = await self.queue.dequeue(queue_type=self._test_queue_type) + await self.queue.dequeue(queue_type=self._test_queue_type) time_keeper_key_name = "%s:%s:%s:time" % ( self.queue._key_prefix, @@ -1738,7 +1740,7 @@ async def test_initialize_public_method(self): # Verify initialization succeeded self.assertIsNotNone(fq._r) - self.assertIsNotNone(fq._lua_enqueue) + self.assertIsNotNone(fq._scripts.enqueue) # Cleanup await fq.close() @@ -1809,10 +1811,10 @@ async def test_close_with_none_client(self): async def test_initialize_unix_socket_connection(self): """Test initialization with Unix socket connection - tests line 59.""" config = build_test_config( + queue={"key_prefix": "test_fq_unix"}, redis={ - "key_prefix": "test_fq_unix", "conn_type": "unix_sock", - "unix_socket_path": "/tmp/redis_nonexistent.sock", + "unix_socket_path": NONEXISTENT_UNIX_SOCKET_PATH, } ) @@ -1829,14 +1831,17 @@ def mock_redis_constructor(**kwargs): return mock_redis_instance # Patch Redis to intercept the initialization - with unittest.mock.patch("fq.queue.Redis", side_effect=mock_redis_constructor): + with unittest.mock.patch( + "fq.redis.AsyncRedis", + side_effect=mock_redis_constructor, + ): fq = FQ(config) await fq.initialize() # Verify that Redis was initialized with unix_socket_path self.assertIn("unix_socket_path", redis_init_kwargs) self.assertEqual( - redis_init_kwargs["unix_socket_path"], "/tmp/redis_nonexistent.sock" + redis_init_kwargs["unix_socket_path"], NONEXISTENT_UNIX_SOCKET_PATH ) self.assertEqual(int(redis_init_kwargs["db"]), 0) @@ -1916,6 +1921,7 @@ async def test_convert_to_str_with_mixed_types(self): self.assertIn("key2", result) self.assertIn("key3", result) + # qlty-ignore(radarlint-python:python:S5899): unittest lifecycle hook. async def asyncTearDown(self): await self.queue._r.flushdb() await self.queue.close() diff --git a/tests/test_queue.py b/tests/test_queue.py index e94a2f0..e672c68 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -11,6 +11,7 @@ class FQTest(unittest.IsolatedAsyncioTestCase): """The FQTest contains test cases which validate the FQ interface.""" + # qlty-ignore(radarlint-python:python:S5899): unittest lifecycle hook. async def asyncSetUp(self): self.queue = FQ(build_test_config()) await self.queue.initialize() @@ -58,6 +59,7 @@ async def asyncSetUp(self): # flush redis before start await self.queue._r.flushdb() + # qlty-ignore(radarlint-python:python:S5899): unittest lifecycle hook. async def asyncTearDown(self): # flush redis at the end and close connection await self.queue._r.flushdb() @@ -322,7 +324,9 @@ async def test_enqueue_requeue_limit_invalid(self): ) async def test_enqueue_cannot_serialize_payload(self): - with self.assertRaisesRegex(BadArgumentException, r"can not serialize."): + with self.assertRaisesRegex( + BadArgumentException, r"can not serialize." + ) as ctx: await self.queue.enqueue( payload=self.invalid_payload, interval=self.valid_interval, @@ -330,6 +334,7 @@ async def test_enqueue_cannot_serialize_payload(self): queue_id=self.valid_queue_id, queue_type=self.valid_queue_type, ) + self.assertIsInstance(ctx.exception.__cause__, TypeError) async def test_enqueue_all_ok(self): # with a queue_type diff --git a/tests/test_sync_queue.py b/tests/test_sync_queue.py new file mode 100644 index 0000000..2ae557e --- /dev/null +++ b/tests/test_sync_queue.py @@ -0,0 +1,380 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. + +import asyncio +import time +import unittest +import uuid + +from fq import FQ as AsyncFQ +from fq.exceptions import BadArgumentException +from fq.sync import FQ +from tests.config import build_test_config + + +class SyncFQTest(unittest.TestCase): + def setUp(self): + self.queue = FQ(build_test_config(queue={"key_prefix": "test_fq_sync"})) + self.queue.initialize() + self.queue._r.flushdb() + self.queue_type = "sms" + self.queue_id = "johndoe" + self.payload = {"to": "1000000000", "message": "Hello, sync FQ"} + + def tearDown(self): + if self.queue is not None and self.queue._r is not None: + self.queue._r.flushdb() + self.queue.close() + + def _job_id(self): + return str(uuid.uuid4()) + + def test_import_namespace(self): + from fq import FQ as ImportedAsyncFQ + from fq.sync import FQ as ImportedSyncFQ + + self.assertIs(ImportedAsyncFQ, AsyncFQ) + self.assertIs(ImportedSyncFQ, FQ) + self.assertIsNot(ImportedAsyncFQ, ImportedSyncFQ) + + def test_initialize_close_and_reload_scripts(self): + self.assertIs(self.queue.redis_client(), self.queue._r) + self.assertIsNotNone(self.queue._scripts.enqueue) + + self.queue.reload_lua_scripts() + self.assertIsNotNone(self.queue._scripts.enqueue) + self.assertTrue(self.queue.deep_status()) + + self.queue.close() + self.assertIsNone(self.queue._r) + self.queue.close() + + def test_enqueue_dequeue_finish(self): + job_id = self._job_id() + response = self.queue.enqueue( + payload=self.payload, + interval=1000, + job_id=job_id, + queue_id=self.queue_id, + queue_type=self.queue_type, + ) + self.assertEqual(response, {"status": "queued"}) + self.assertEqual(self.queue.get_queue_length(self.queue_type, self.queue_id), 1) + + job = self.queue.dequeue(queue_type=self.queue_type) + self.assertEqual( + job, + { + "status": "success", + "queue_id": self.queue_id, + "job_id": job_id, + "payload": self.payload, + "requeues_remaining": -1, + }, + ) + + self.assertEqual( + self.queue.finish( + queue_type=self.queue_type, + queue_id=job["queue_id"], + job_id=job["job_id"], + ), + {"status": "success"}, + ) + self.assertEqual( + self.queue.finish( + queue_type=self.queue_type, + queue_id=job["queue_id"], + job_id=job["job_id"], + ), + {"status": "failure"}, + ) + + def test_requeue_behavior(self): + self.queue.close() + self.queue = FQ( + build_test_config( + queue={ + "job_expire_interval": 20, + "key_prefix": "test_fq_sync_requeue", + }, + ) + ) + self.queue.initialize() + self.queue._r.flushdb() + + job_id = self._job_id() + self.queue.enqueue( + payload=self.payload, + interval=1, + job_id=job_id, + queue_id=self.queue_id, + queue_type=self.queue_type, + requeue_limit=1, + ) + first_job = self.queue.dequeue(queue_type=self.queue_type) + self.assertEqual(first_job["status"], "success") + + time.sleep(0.08) + self.queue.requeue() + self.assertEqual(self.queue.get_queue_length(self.queue_type, self.queue_id), 1) + + requeued_job = self.queue.dequeue(queue_type=self.queue_type) + self.assertEqual(requeued_job["status"], "success") + self.assertEqual(requeued_job["job_id"], job_id) + self.assertEqual(requeued_job["requeues_remaining"], 0) + + def test_interval_update(self): + job_id = self._job_id() + self.queue.enqueue( + payload=self.payload, + interval=1000, + job_id=job_id, + queue_id=self.queue_id, + queue_type=self.queue_type, + ) + + response = self.queue.interval( + interval=250, + queue_id=self.queue_id, + queue_type=self.queue_type, + ) + self.assertEqual(response, {"status": "success"}) + self.assertEqual( + self.queue._r.hget( + "%s:interval" % self.queue._key_prefix, + "%s:%s" % (self.queue_type, self.queue_id), + ), + b"250", + ) + + def test_metrics(self): + response = self.queue.metrics() + self.assertEqual(response["status"], "success") + self.assertEqual(response["queue_types"], []) + self.assertEqual(sum(response["enqueue_counts"].values()), 0) + self.assertEqual(sum(response["dequeue_counts"].values()), 0) + + job_id = self._job_id() + self.queue.enqueue( + payload=self.payload, + interval=1, + job_id=job_id, + queue_id=self.queue_id, + queue_type=self.queue_type, + ) + + queue_type_metrics = self.queue.metrics(queue_type=self.queue_type) + self.assertEqual(queue_type_metrics["status"], "success") + self.assertEqual(queue_type_metrics["queue_ids"], [self.queue_id]) + + queue_metrics = self.queue.metrics( + queue_type=self.queue_type, + queue_id=self.queue_id, + ) + self.assertEqual(queue_metrics["status"], "success") + self.assertEqual(queue_metrics["queue_length"], 1) + self.assertEqual(sum(queue_metrics["enqueue_counts"].values()), 1) + + self.queue.dequeue(queue_type=self.queue_type) + global_metrics = self.queue.metrics() + self.assertEqual(global_metrics["queue_types"], [self.queue_type]) + self.assertEqual(sum(global_metrics["dequeue_counts"].values()), 1) + + def test_clear_queue(self): + job_id = self._job_id() + self.queue.enqueue( + payload=self.payload, + interval=1000, + job_id=job_id, + queue_id=self.queue_id, + queue_type=self.queue_type, + ) + + response = self.queue.clear_queue( + queue_type=self.queue_type, + queue_id=self.queue_id, + purge_all=True, + ) + self.assertEqual( + response, + { + "status": "Success", + "message": "Successfully removed all queued calls and purged related resources", + }, + ) + self.assertEqual(self.queue.get_queue_length(self.queue_type, self.queue_id), 0) + self.assertIsNone( + self.queue._r.hget( + "%s:interval" % self.queue._key_prefix, + "%s:%s" % (self.queue_type, self.queue_id), + ) + ) + + def test_validation_errors_match_async_api(self): + def collect_sync_errors(): + checks = [ + lambda: self.queue.enqueue( + payload=self.payload, + interval=0, + job_id=self._job_id(), + queue_id=self.queue_id, + queue_type=self.queue_type, + ), + lambda: self.queue.dequeue(queue_type="bad type"), + lambda: self.queue.finish( + job_id="bad id", + queue_id=self.queue_id, + queue_type=self.queue_type, + ), + lambda: self.queue.interval( + interval=0, + queue_id=self.queue_id, + queue_type=self.queue_type, + ), + lambda: self.queue.metrics(queue_id=self.queue_id), + lambda: self.queue.clear_queue( + queue_type=self.queue_type, + queue_id="bad id", + ), + lambda: self.queue.get_queue_length("bad type", self.queue_id), + ] + errors = [] + for check in checks: + with self.assertRaises(BadArgumentException) as ctx: + check() + errors.append(str(ctx.exception)) + return errors + + async def collect_async_errors(): + queue = AsyncFQ(build_test_config(queue={"key_prefix": "test_fq_async"})) + await queue.initialize() + await queue._r.flushdb() + checks = [ + lambda: queue.enqueue( + payload=self.payload, + interval=0, + job_id=self._job_id(), + queue_id=self.queue_id, + queue_type=self.queue_type, + ), + lambda: queue.dequeue(queue_type="bad type"), + lambda: queue.finish( + job_id="bad id", + queue_id=self.queue_id, + queue_type=self.queue_type, + ), + lambda: queue.interval( + interval=0, + queue_id=self.queue_id, + queue_type=self.queue_type, + ), + lambda: queue.metrics(queue_id=self.queue_id), + lambda: queue.clear_queue( + queue_type=self.queue_type, + queue_id="bad id", + ), + lambda: queue.get_queue_length("bad type", self.queue_id), + ] + errors = [] + try: + for check in checks: + with self.assertRaises(BadArgumentException) as ctx: + await check() + errors.append(str(ctx.exception)) + return errors + finally: + await queue._r.flushdb() + await queue.close() + + self.assertEqual(collect_sync_errors(), asyncio.run(collect_async_errors())) + + def test_sync_async_interoperability(self): + async def scenario(): + config = build_test_config(queue={"key_prefix": "test_fq_sync_interop"}) + async_queue = AsyncFQ(config) + sync_queue = FQ(config) + + await async_queue.initialize() + await asyncio.to_thread(sync_queue.initialize) + await async_queue._r.flushdb() + + try: + sync_job_id = self._job_id() + await asyncio.to_thread( + sync_queue.enqueue, + payload={"source": "sync"}, + interval=1, + job_id=sync_job_id, + queue_id=self.queue_id, + queue_type=self.queue_type, + ) + sync_job = await async_queue.dequeue(queue_type=self.queue_type) + self.assertEqual(sync_job["status"], "success") + self.assertEqual( + set(sync_job), + { + "status", + "queue_id", + "job_id", + "payload", + "requeues_remaining", + }, + ) + self.assertEqual(sync_job["job_id"], sync_job_id) + self.assertEqual(sync_job["payload"], {"source": "sync"}) + self.assertEqual( + await async_queue.finish( + queue_type=self.queue_type, + queue_id=sync_job["queue_id"], + job_id=sync_job["job_id"], + ), + {"status": "success"}, + ) + + async_job_id = self._job_id() + await async_queue.enqueue( + payload={"source": "async"}, + interval=1, + job_id=async_job_id, + queue_id=self.queue_id, + queue_type=self.queue_type, + ) + await asyncio.sleep(0.01) + async_job = await asyncio.to_thread( + sync_queue.dequeue, + queue_type=self.queue_type, + ) + self.assertEqual(async_job["status"], "success") + self.assertEqual( + set(async_job), + { + "status", + "queue_id", + "job_id", + "payload", + "requeues_remaining", + }, + ) + self.assertEqual(async_job["job_id"], async_job_id) + self.assertEqual(async_job["payload"], {"source": "async"}) + self.assertEqual( + await asyncio.to_thread( + sync_queue.finish, + queue_type=self.queue_type, + queue_id=async_job["queue_id"], + job_id=async_job["job_id"], + ), + {"status": "success"}, + ) + finally: + if sync_queue._r is not None: + await asyncio.to_thread(sync_queue._r.flushdb) + await asyncio.to_thread(sync_queue.close) + await async_queue.close() + + asyncio.run(scenario()) + + +if __name__ == "__main__": + unittest.main()