From 0c90cda62abd6e89fd8ae14ec17c1bd21899a237 Mon Sep 17 00:00:00 2001 From: Shantanu Mane Date: Tue, 16 Jun 2026 22:46:34 +0530 Subject: [PATCH] fix(worker): let consumer own asset state on failure - Stop the processor stamping assets.status=failed on every exception - Retryable failures no longer leave the asset stuck failed mid-retry - Consumer already marks failed only past the retry cap, ready on success - Add regression test asserting no failed-status write on processing error --- worker/processing/processor.py | 12 +++--- worker/tests/test_processor_dispatch.py | 54 +++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 7 deletions(-) create mode 100644 worker/tests/test_processor_dispatch.py diff --git a/worker/processing/processor.py b/worker/processing/processor.py index 2b78e86..50c38c1 100644 --- a/worker/processing/processor.py +++ b/worker/processing/processor.py @@ -204,14 +204,12 @@ def process_asset_dispatch( raise ValueError(f"Unknown asset type: {typ}") except Exception as e: + # Do not touch assets.status here. The consumer (_handle_job) owns the + # asset state transition: it marks the asset failed only after the retry + # cap is hit, and ready on success. Writing 'failed' on every exception + # — including RetryableException — left the asset stuck failed across + # retries even though the job was still pending. See DEV-34. logger.error("Failed to process asset %s: %s", asset_id, e, exc_info=True) - with pg_pool.get_pg_conn() as conn: - cur = conn.cursor() - cur.execute( - "UPDATE assets SET status = %s, error_reason = %s WHERE asset_id = %s", - (AssetStatus.FAILED.value, str(e), asset_id) - ) - conn.commit() raise finally: if local_raw_file and os.path.exists(local_raw_file): diff --git a/worker/tests/test_processor_dispatch.py b/worker/tests/test_processor_dispatch.py new file mode 100644 index 0000000..76cec9a --- /dev/null +++ b/worker/tests/test_processor_dispatch.py @@ -0,0 +1,54 @@ +import unittest +from unittest.mock import MagicMock, patch + +from worker.processing.processor import process_asset_dispatch, AssetStatus + + +class TestDispatchFailureDoesNotTouchAssetState(unittest.TestCase): + """DEV-34: the processor must NOT mark assets.status=failed on exception. + + The consumer (_handle_job) owns the asset state transition — it only marks + the asset failed after the retry cap. If the processor stamps 'failed' on + every exception, retryable failures leave the asset stuck failed mid-retry. + """ + + def _make_pg_pool(self, asset_row): + cursor = MagicMock() + cursor.fetchone.return_value = asset_row + conn = MagicMock() + conn.cursor.return_value = cursor + pg_pool = MagicMock() + pg_pool.get_pg_conn.return_value.__enter__.return_value = conn + return pg_pool, cursor + + @patch("worker.processing.processor.get_extension_for_mime", return_value="jpg") + @patch("worker.processing.processor.compute_file_hash", return_value="") + @patch("worker.processing.processor.process_image_file") + def test_processing_failure_leaves_asset_status_untouched( + self, mock_process_image, _mock_hash, _mock_ext + ): + mock_process_image.side_effect = RuntimeError("boom") + + # (asset_id, type, status, original_url, mime_type, content_hash) + asset_row = ("asset-1", "image", "uploaded", "gs://raw/asset-1", "image/jpeg", None) + pg_pool, cursor = self._make_pg_pool(asset_row) + storage = MagicMock() + cfg = MagicMock() + cfg.temp_dir = "/tmp" + + # The exception must propagate so the consumer can decide retry vs. fail. + with self.assertRaises(RuntimeError): + process_asset_dispatch("asset-1", pg_pool, storage, cfg) + + # No executed statement may set the asset to 'failed'. + for call in cursor.execute.call_args_list: + params = call.args[1] if len(call.args) > 1 else () + self.assertNotIn( + AssetStatus.FAILED.value, + tuple(params), + msg=f"processor wrote a failed-status update: {call.args}", + ) + + +if __name__ == "__main__": + unittest.main()