-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathconftest.py
More file actions
93 lines (74 loc) · 2.04 KB
/
conftest.py
File metadata and controls
93 lines (74 loc) · 2.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import os
import random
import string
from typing import AsyncGenerator, TypeVar
import pytest
from taskiq_psqlpy.broker import PSQLPyBroker
from taskiq_psqlpy.result_backend import PSQLPyResultBackend
_ReturnType = TypeVar("_ReturnType")
@pytest.fixture(scope="session")
def anyio_backend() -> str:
"""
Anyio backend.
Backend for anyio pytest plugin.
:return: backend name.
"""
return "asyncio"
@pytest.fixture
def postgres_table() -> str:
"""
Name of a postgresql table for current test.
:return: random string.
"""
return "".join(
random.choice(
string.ascii_lowercase,
)
for _ in range(10)
)
@pytest.fixture
def postgresql_dsn() -> str:
"""
DSN to PostgreSQL.
:return: dsn to PostgreSQL.
"""
return (
os.environ.get("POSTGRESQL_URL")
or "postgresql://postgres:postgres@localhost:5432/taskiqpsqlpy"
)
@pytest.fixture()
async def psqlpy_result_backend(
postgresql_dsn: str,
postgres_table: str,
) -> AsyncGenerator[PSQLPyResultBackend[_ReturnType], None]:
backend: PSQLPyResultBackend[_ReturnType] = PSQLPyResultBackend(
dsn=postgresql_dsn,
table_name=postgres_table,
)
await backend.startup()
yield backend
async with backend._database_pool.acquire() as conn:
_ = await conn.execute(
querystring=f"DROP TABLE {postgres_table}",
)
await backend.shutdown()
@pytest.fixture()
async def psqlpy_broker(
postgresql_dsn: str,
postgres_table: str,
) -> AsyncGenerator[PSQLPyBroker, None]:
"""
Fixture to set up and tear down the broker.
Initializes the broker with test parameters.
"""
broker = PSQLPyBroker(
dsn=postgresql_dsn,
channel_name=f"{postgres_table}_channel",
table_name=postgres_table,
)
await broker.startup()
yield broker
assert broker.write_pool
async with broker.write_pool.acquire() as conn:
_ = await conn.execute(f"DROP TABLE {postgres_table}")
await broker.shutdown()