diff --git a/worker/consumer/consumer.py b/worker/consumer/consumer.py index 0aecbb9..a23eb30 100644 --- a/worker/consumer/consumer.py +++ b/worker/consumer/consumer.py @@ -27,6 +27,7 @@ from __future__ import annotations +import time from typing import Dict import redis @@ -84,6 +85,13 @@ def __init__( self.storage = storage self.cfg = cfg + # Periodic recovery state. _last_recovery = 0 makes recovery run on the + # first consume() so leftovers from a prior crash are swept at startup. + # The interval matches the 2-minute staleness threshold in the recovery + # query. See DEV-35. + self._last_recovery = 0.0 + self._recovery_interval = 120.0 + # Ensure the consumer group exists. If it already exists Redis raises an # error; ignore that specific error. try: @@ -121,6 +129,11 @@ def consume(self, consumer_name: str) -> bool: True if a message was consumed (even if processing failed), False if no messages were available. """ + # Recover stuck jobs on a fixed cadence, independent of load. Doing this + # only on the idle path meant recovery never ran under sustained load — + # exactly when crashed-mid-job rows are most likely. See DEV-35. + self._maybe_recover() + # Read one message for this consumer (blocking short period) resp = self.redis.xreadgroup( groupname=self.cfg.consumer_group, @@ -131,8 +144,6 @@ def consume(self, consumer_name: str) -> bool: ) if not resp: - # No messages available; attempt recovery of stuck jobs and return. - self._recover_stuck_pending() return False # Response format: [(stream_name, [(msg_id, {field: value}), ...])] @@ -314,6 +325,17 @@ def _handle_asset_message(self, asset_id: str, msg_id: str) -> None: # Delegate to _handle_job using the job id we now have. self._handle_job(job_id, msg_id) + def _maybe_recover(self) -> None: + """Run stuck-job recovery if the recovery interval has elapsed. + + Time-gated so recovery fires on a fixed cadence regardless of whether + the consumer is busy or idle. + """ + now = time.time() + if now - self._last_recovery >= self._recovery_interval: + self._last_recovery = now + self._recover_stuck_pending() + def _recover_stuck_pending(self) -> None: """Requeue stale pending/in_progress jobs back onto the stream. diff --git a/worker/tests/test_consumer_recovery.py b/worker/tests/test_consumer_recovery.py new file mode 100644 index 0000000..d76cb40 --- /dev/null +++ b/worker/tests/test_consumer_recovery.py @@ -0,0 +1,56 @@ +import time +import unittest +from unittest.mock import MagicMock, patch + +from worker.consumer.consumer import Consumer + + +def _make_consumer(): + """Build a Consumer with redis/pg mocked out (no real connections).""" + cfg = MagicMock() + cfg.stream_name = "media:jobs" + cfg.consumer_group = "media-workers" + with patch("worker.consumer.consumer.redis.Redis.from_url") as from_url: + client = MagicMock() + from_url.return_value = client + consumer = Consumer( + pg_pool=MagicMock(), redis_url="redis://x", storage=MagicMock(), cfg=cfg + ) + return consumer, client + + +class TestPeriodicRecovery(unittest.TestCase): + """DEV-35: recovery must fire on a cadence even under continuous load.""" + + def test_recovery_fires_under_load_when_interval_elapsed(self): + consumer, client = _make_consumer() + # A message is available (the loaded / busy path). + client.xreadgroup.return_value = [("media:jobs", [("1-0", {"job_id": "42"})])] + + consumer._recover_stuck_pending = MagicMock() + consumer._handle_job = MagicMock() + consumer._recovery_interval = 0.0 # gate always open + consumer._last_recovery = 0.0 + + result = consumer.consume("worker-1") + + self.assertTrue(result) # work was performed + consumer._handle_job.assert_called_once_with("42", "1-0") + consumer._recover_stuck_pending.assert_called_once() # fired despite load + + def test_recovery_does_not_fire_before_interval_elapses(self): + consumer, client = _make_consumer() + client.xreadgroup.return_value = [("media:jobs", [("1-0", {"job_id": "42"})])] + + consumer._recover_stuck_pending = MagicMock() + consumer._handle_job = MagicMock() + consumer._recovery_interval = 9999.0 + consumer._last_recovery = time.time() # just recovered + + consumer.consume("worker-1") + + consumer._recover_stuck_pending.assert_not_called() # gate holds + + +if __name__ == "__main__": + unittest.main()