-
-
Notifications
You must be signed in to change notification settings - Fork 117
Expand file tree
/
Copy pathtest_simple_retry.py
More file actions
81 lines (71 loc) · 2.22 KB
/
test_simple_retry.py
File metadata and controls
81 lines (71 loc) · 2.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import uuid
import pytest
from mock import AsyncMock
from taskiq.formatters.json_formatter import JSONFormatter
from taskiq.message import TaskiqMessage
from taskiq.middlewares.simple_retry_middleware import SimpleRetryMiddleware
from taskiq.result import TaskiqResult
@pytest.fixture
def broker() -> AsyncMock:
mocked_broker = AsyncMock()
mocked_broker.id_generator = lambda: uuid.uuid4().hex
mocked_broker.formatter = JSONFormatter()
return mocked_broker
@pytest.mark.anyio
async def test_successful_retry(broker: AsyncMock) -> None:
middleware = SimpleRetryMiddleware()
middleware.set_broker(broker)
await middleware.on_error(
TaskiqMessage(
task_id="test_id",
task_name="meme",
queue="taskiq",
labels={
"retry_on_error": "True",
},
args=[],
kwargs={},
),
TaskiqResult(is_err=True, return_value=None, execution_time=0.0),
Exception(),
)
resend: TaskiqMessage = broker.kick.await_args.args[0]
assert resend.task_name == "meme"
assert resend.labels["_retries"] == "1"
@pytest.mark.anyio
async def test_no_retry(broker: AsyncMock) -> None:
middleware = SimpleRetryMiddleware()
middleware.set_broker(broker)
await middleware.on_error(
TaskiqMessage(
task_id="test_id",
task_name="meme",
queue="taskiq",
labels={},
args=[],
kwargs={},
),
TaskiqResult(is_err=True, return_value=None, execution_time=0.0),
Exception(),
)
broker.kick.assert_not_called()
@pytest.mark.anyio
async def test_max_retries(broker: AsyncMock) -> None:
middleware = SimpleRetryMiddleware(default_retry_count=3)
middleware.set_broker(broker)
await middleware.on_error(
TaskiqMessage(
task_id="test_id",
task_name="meme",
queue="taskiq",
labels={
"retry_on_error": "True",
"_retries": "2",
},
args=[],
kwargs={},
),
TaskiqResult(is_err=True, return_value=None, execution_time=0.0),
Exception(),
)
broker.kick.assert_not_called()