Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions worker/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,12 @@ def process_asset_dispatch(
raise ValueError(f"Unknown asset type: {typ}")

except Exception as e:
# Do not touch assets.status here. The consumer (_handle_job) owns the
# asset state transition: it marks the asset failed only after the retry
# cap is hit, and ready on success. Writing 'failed' on every exception
# — including RetryableException — left the asset stuck failed across
# retries even though the job was still pending. See DEV-34.
logger.error("Failed to process asset %s: %s", asset_id, e, exc_info=True)
with pg_pool.get_pg_conn() as conn:
cur = conn.cursor()
cur.execute(
"UPDATE assets SET status = %s, error_reason = %s WHERE asset_id = %s",
(AssetStatus.FAILED.value, str(e), asset_id)
)
conn.commit()
raise
finally:
if local_raw_file and os.path.exists(local_raw_file):
Expand Down
54 changes: 54 additions & 0 deletions worker/tests/test_processor_dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import unittest
from unittest.mock import MagicMock, patch

from worker.processing.processor import process_asset_dispatch, AssetStatus


class TestDispatchFailureDoesNotTouchAssetState(unittest.TestCase):
"""DEV-34: the processor must NOT mark assets.status=failed on exception.

The consumer (_handle_job) owns the asset state transition — it only marks
the asset failed after the retry cap. If the processor stamps 'failed' on
every exception, retryable failures leave the asset stuck failed mid-retry.
"""

def _make_pg_pool(self, asset_row):
cursor = MagicMock()
cursor.fetchone.return_value = asset_row
conn = MagicMock()
conn.cursor.return_value = cursor
pg_pool = MagicMock()
pg_pool.get_pg_conn.return_value.__enter__.return_value = conn
return pg_pool, cursor

@patch("worker.processing.processor.get_extension_for_mime", return_value="jpg")
@patch("worker.processing.processor.compute_file_hash", return_value="")
@patch("worker.processing.processor.process_image_file")
def test_processing_failure_leaves_asset_status_untouched(
self, mock_process_image, _mock_hash, _mock_ext
):
mock_process_image.side_effect = RuntimeError("boom")

# (asset_id, type, status, original_url, mime_type, content_hash)
asset_row = ("asset-1", "image", "uploaded", "gs://raw/asset-1", "image/jpeg", None)
pg_pool, cursor = self._make_pg_pool(asset_row)
storage = MagicMock()
cfg = MagicMock()
cfg.temp_dir = "/tmp"

# The exception must propagate so the consumer can decide retry vs. fail.
with self.assertRaises(RuntimeError):
process_asset_dispatch("asset-1", pg_pool, storage, cfg)

# No executed statement may set the asset to 'failed'.
for call in cursor.execute.call_args_list:
params = call.args[1] if len(call.args) > 1 else ()
self.assertNotIn(
AssetStatus.FAILED.value,
tuple(params),
msg=f"processor wrote a failed-status update: {call.args}",
)


if __name__ == "__main__":
unittest.main()