Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions worker/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from __future__ import annotations

import time
from typing import Dict

import redis
Expand Down Expand Up @@ -84,6 +85,13 @@ def __init__(
self.storage = storage
self.cfg = cfg

# Periodic recovery state. _last_recovery = 0 makes recovery run on the
# first consume() so leftovers from a prior crash are swept at startup.
# The interval matches the 2-minute staleness threshold in the recovery
# query. See DEV-35.
self._last_recovery = 0.0
self._recovery_interval = 120.0

# Ensure the consumer group exists. If it already exists Redis raises an
# error; ignore that specific error.
try:
Expand Down Expand Up @@ -121,6 +129,11 @@ def consume(self, consumer_name: str) -> bool:
True if a message was consumed (even if processing failed), False if
no messages were available.
"""
# Recover stuck jobs on a fixed cadence, independent of load. Doing this
# only on the idle path meant recovery never ran under sustained load —
# exactly when crashed-mid-job rows are most likely. See DEV-35.
self._maybe_recover()

# Read one message for this consumer (blocking short period)
resp = self.redis.xreadgroup(
groupname=self.cfg.consumer_group,
Expand All @@ -131,8 +144,6 @@ def consume(self, consumer_name: str) -> bool:
)

if not resp:
# No messages available; attempt recovery of stuck jobs and return.
self._recover_stuck_pending()
return False

# Response format: [(stream_name, [(msg_id, {field: value}), ...])]
Expand Down Expand Up @@ -314,6 +325,17 @@ def _handle_asset_message(self, asset_id: str, msg_id: str) -> None:
# Delegate to _handle_job using the job id we now have.
self._handle_job(job_id, msg_id)

def _maybe_recover(self) -> None:
"""Run stuck-job recovery if the recovery interval has elapsed.

Time-gated so recovery fires on a fixed cadence regardless of whether
the consumer is busy or idle.
"""
now = time.time()
if now - self._last_recovery >= self._recovery_interval:
self._last_recovery = now
self._recover_stuck_pending()

def _recover_stuck_pending(self) -> None:
"""Requeue stale pending/in_progress jobs back onto the stream.

Expand Down
56 changes: 56 additions & 0 deletions worker/tests/test_consumer_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import time
import unittest
from unittest.mock import MagicMock, patch

from worker.consumer.consumer import Consumer


def _make_consumer():
"""Build a Consumer with redis/pg mocked out (no real connections)."""
cfg = MagicMock()
cfg.stream_name = "media:jobs"
cfg.consumer_group = "media-workers"
with patch("worker.consumer.consumer.redis.Redis.from_url") as from_url:
client = MagicMock()
from_url.return_value = client
consumer = Consumer(
pg_pool=MagicMock(), redis_url="redis://x", storage=MagicMock(), cfg=cfg
)
return consumer, client


class TestPeriodicRecovery(unittest.TestCase):
"""DEV-35: recovery must fire on a cadence even under continuous load."""

def test_recovery_fires_under_load_when_interval_elapsed(self):
consumer, client = _make_consumer()
# A message is available (the loaded / busy path).
client.xreadgroup.return_value = [("media:jobs", [("1-0", {"job_id": "42"})])]

consumer._recover_stuck_pending = MagicMock()
consumer._handle_job = MagicMock()
consumer._recovery_interval = 0.0 # gate always open
consumer._last_recovery = 0.0

result = consumer.consume("worker-1")

self.assertTrue(result) # work was performed
consumer._handle_job.assert_called_once_with("42", "1-0")
consumer._recover_stuck_pending.assert_called_once() # fired despite load

def test_recovery_does_not_fire_before_interval_elapses(self):
consumer, client = _make_consumer()
client.xreadgroup.return_value = [("media:jobs", [("1-0", {"job_id": "42"})])]

consumer._recover_stuck_pending = MagicMock()
consumer._handle_job = MagicMock()
consumer._recovery_interval = 9999.0
consumer._last_recovery = time.time() # just recovered

consumer.consume("worker-1")

consumer._recover_stuck_pending.assert_not_called() # gate holds


if __name__ == "__main__":
unittest.main()