|
2 | 2 |
|
3 | 3 | from __future__ import annotations |
4 | 4 |
|
| 5 | +from unittest.mock import patch |
| 6 | + |
| 7 | +import pytest |
5 | 8 | from pytest import LogCaptureFixture |
6 | 9 |
|
7 | 10 | from palace.manager.celery.tasks import lexile |
@@ -57,6 +60,41 @@ def test_run_lexile_db_update_queues_worker_when_configured( |
57 | 60 | lexile.run_lexile_db_update.delay().wait() |
58 | 61 | assert "Lexile DB update task queued" in caplog.text |
59 | 62 |
|
| 63 | + def test_run_lexile_db_update_skipped_when_lock_already_held( |
| 64 | + self, |
| 65 | + db: DatabaseTransactionFixture, |
| 66 | + celery_fixture: CeleryFixture, |
| 67 | + redis_fixture: RedisFixture, |
| 68 | + caplog: LogCaptureFixture, |
| 69 | + ) -> None: |
| 70 | + """Orchestrator skips when lock is already held (worker in progress).""" |
| 71 | + db.integration_configuration( |
| 72 | + protocol=LexileDBService, |
| 73 | + goal=Goals.METADATA_GOAL, |
| 74 | + settings=LexileDBSettings( |
| 75 | + username="user", |
| 76 | + password="pass", |
| 77 | + base_url="https://api.example.com", |
| 78 | + ), |
| 79 | + ) |
| 80 | + caplog.set_level(LogLevel.info) |
| 81 | + with patch.object(lexile.RedisLock, "locked", return_value=True): |
| 82 | + lexile.run_lexile_db_update.delay().wait() |
| 83 | + assert "Lexile DB update already in progress, skipping." in caplog.text |
| 84 | + assert "Lexile DB update task queued" not in caplog.text |
| 85 | + |
| 86 | + def test_lexile_db_update_task_skipped_when_not_configured( |
| 87 | + self, |
| 88 | + db: DatabaseTransactionFixture, |
| 89 | + celery_fixture: CeleryFixture, |
| 90 | + redis_fixture: RedisFixture, |
| 91 | + caplog: LogCaptureFixture, |
| 92 | + ) -> None: |
| 93 | + """Worker skips when no Lexile DB integration exists.""" |
| 94 | + caplog.set_level(LogLevel.info) |
| 95 | + lexile.lexile_db_update_task.delay(force=False).wait() |
| 96 | + assert "Lexile DB update skipped" in caplog.text |
| 97 | + |
60 | 98 | def test_lexile_db_update_task_adds_classification( |
61 | 99 | self, |
62 | 100 | db: DatabaseTransactionFixture, |
@@ -179,3 +217,57 @@ def test_lexile_db_update_task_creates_timestamp( |
179 | 217 | assert stamp is not None |
180 | 218 | assert stamp.finish is not None |
181 | 219 | assert "Processed" in (stamp.achievements or "") |
| 220 | + |
| 221 | + def test_lexile_db_update_task_continues_to_next_batch( |
| 222 | + self, |
| 223 | + db: DatabaseTransactionFixture, |
| 224 | + celery_fixture: CeleryFixture, |
| 225 | + redis_fixture: RedisFixture, |
| 226 | + http_client: MockHttpClientFixture, |
| 227 | + ) -> None: |
| 228 | + """Worker calls task.replace() with correct args when full batch returned.""" |
| 229 | + db.integration_configuration( |
| 230 | + protocol=LexileDBService, |
| 231 | + goal=Goals.METADATA_GOAL, |
| 232 | + settings=LexileDBSettings( |
| 233 | + username="user", |
| 234 | + password="pass", |
| 235 | + base_url="https://api.example.com", |
| 236 | + ), |
| 237 | + ) |
| 238 | + # Create exactly BATCH_SIZE (10) ISBNs so the task replaces to continue. |
| 239 | + for i in range(lexile.BATCH_SIZE): |
| 240 | + db.identifier( |
| 241 | + identifier_type=Identifier.ISBN, |
| 242 | + foreign_id=f"9780123456{i:03d}", |
| 243 | + ) |
| 244 | + |
| 245 | + for i in range(lexile.BATCH_SIZE): |
| 246 | + http_client.queue_response( |
| 247 | + 200, |
| 248 | + content={ |
| 249 | + "meta": {"total_count": 1}, |
| 250 | + "objects": [{"lexile": 600 + i}], |
| 251 | + }, |
| 252 | + ) |
| 253 | + |
| 254 | + replace_calls: list[tuple] = [] |
| 255 | + |
| 256 | + def capture_replace(*args: object, **kwargs: object) -> None: |
| 257 | + replace_calls.append((args, kwargs)) |
| 258 | + raise RuntimeError("Replace captured (avoid actual replacement)") |
| 259 | + |
| 260 | + with patch.object( |
| 261 | + lexile.lexile_db_update_task, "replace", side_effect=capture_replace |
| 262 | + ): |
| 263 | + with pytest.raises(RuntimeError, match="Replace captured"): |
| 264 | + lexile.lexile_db_update_task.delay(force=False).wait() |
| 265 | + |
| 266 | + assert len(replace_calls) == 1 |
| 267 | + (args, kwargs) = replace_calls[0] |
| 268 | + assert len(args) == 1 |
| 269 | + sig = args[0] |
| 270 | + assert sig.kwargs.get("force") is False |
| 271 | + assert sig.kwargs.get("offset") == lexile.BATCH_SIZE |
| 272 | + assert "timestamp_id" in sig.kwargs |
| 273 | + assert sig.kwargs["timestamp_id"] is not None |
0 commit comments