Skip to content

Commit f765535

Browse files
fix worker for di
1 parent caeb0b2 commit f765535

12 files changed

Lines changed: 38 additions & 103 deletions

File tree

app/infrastructure/containers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ class ApplicationStartupContainer(containers.DeclarativeContainer):
146146
)
147147

148148

149+
149150
class WorkerStartupContainer(containers.DeclarativeContainer):
150151
redis_client = Singleton(lambda: aioredis.from_url(settings.REDIS_DSN))
151152

app/infrastructure/db/sqlalchemy.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,11 @@ def get_engine() -> AsyncEngine:
5757
session_factory = async_sessionmaker(bind=get_engine(), expire_on_commit=False)
5858

5959

60-
async def verify_db_connection(engine: AsyncEngine) -> None:
61-
connection = await engine.connect()
62-
await connection.close()
63-
64-
6560
async def close_db_connections() -> None:
6661
await get_engine().dispose()
6762

6863

69-
def provide_transaction_session(func: Callable) -> Callable:
64+
def provide_session(func: Callable) -> Callable:
7065
"""
7166
Provides a database session to an async function if one is not already passed.
7267
@@ -78,7 +73,14 @@ def provide_transaction_session(func: Callable) -> Callable:
7873
async def wrapper(*args: Any, **kwargs: Any) -> Any:
7974
if kwargs.get("session"):
8075
return await func(*args, **kwargs)
76+
8177
async with session_factory() as session:
82-
return await func(*args, **kwargs, session=session)
78+
try:
79+
return await func(*args, **kwargs, session=session)
80+
except Exception:
81+
await session.rollback()
82+
raise
83+
finally:
84+
await session.close()
8385

8486
return wrapper

app/infrastructure/worker/tasks/simple_task.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from typing import Any
23

34
from dependency_injector.wiring import inject, Provide
@@ -13,6 +14,7 @@ async def heartbeat_task(
1314
ctx: dict[str, Any],
1415
bot: Bot = Provide[WorkerStartupContainer.bot],
1516
):
16-
# logger.info("Heartbeat task started")
17-
18-
logger.info(f"Heartbeat task executed {[account.id for account in bot.bot_accounts]}")
17+
task_name = asyncio.current_task().get_name()
18+
logger.info(f"Task {task_name} Heartbeat task executed start bot id {id(bot)}")
19+
await asyncio.sleep(10)
20+
logger.info(f"Task {task_name} Heartbeat task executed end bot id {id(bot)}")

app/infrastructure/worker/worker.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
from app.infrastructure.worker.tasks.simple_task import heartbeat_task
1616
from app.logger import logger
1717

18-
# `saq` import its own settings and hides our module
19-
from app.settings import settings as app_settings
18+
from app.settings import settings
2019

2120
SaqCtx = Dict[str, Any]
2221

2322

23+
# queue = Queue(aioredis.from_url(settings.REDIS_DSN), name="bot_template")
24+
queue = Queue.from_url(settings.REDIS_DSN, name="bot_template_worker")
2425

25-
queue = Queue(aioredis.from_url(app_settings.REDIS_DSN), name="bot_refactor")
2626

2727
@inject
2828
async def _startup_with_injection(
@@ -45,11 +45,7 @@ async def _shutdown_with_injection(
4545
async def startup(ctx: SaqCtx) -> None:
4646
worker_startup_container = WorkerStartupContainer()
4747

48-
queue.add_cron_job(
49-
CronJob(function=heartbeat_task, cron="*/5 * * * * *", unique=True)
50-
)
5148
worker_startup_container.wire(modules=[__name__, "app.infrastructure.worker.tasks"])
52-
5349
await _startup_with_injection()
5450

5551
logger.info("Worker started")
@@ -60,21 +56,18 @@ async def shutdown(ctx: SaqCtx) -> None:
6056
logger.info("Worker stopped")
6157

6258

63-
settings = {
59+
saq_settings = {
6460
"queue": queue,
6561
"functions": [],
66-
# "cron_jobs": [
67-
# CronJob(
68-
# function=heartbeat_task,
69-
# cron="*/5 * * * * *",
70-
# unique=True,
71-
# # timeout=app_settings.PERIODIC_TASKS_DEFAULT_TIMEOUT,
72-
# # heartbeat=app_settings.PERIODIC_TASKS_DEFAULT_HEARTBEAT,
73-
# # retries=app_settings.PERIODIC_TASKS_DEFAULT_RETRIES,
74-
# # ttl=app_settings.PERIODIC_TASKS_DEFAULT_TTL,
75-
# ),
76-
# ],
77-
"concurrency": 8,
62+
"cron_jobs": [
63+
CronJob(
64+
function=heartbeat_task,
65+
cron="* * * * * */5",
66+
unique=False,
67+
timeout=15,
68+
),
69+
],
70+
"concurrency": settings.WORKER_CONCURRENCY,
7871
"startup": startup,
7972
"shutdown": shutdown,
8073
}

app/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def get_application() -> FastAPI:
5353

5454
application = FastAPI(title=strings.BOT_PROJECT_NAME, openapi_url=None)
5555

56-
# put bot to state for tests
56+
# put bot to state only for tests
5757
application.state.bot = main_container.bot()
5858

5959
application.add_event_handler(

app/presentation/bot/commands/sample_record.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,14 @@
55

66
from app.application.use_cases.interfaces import ISampleRecordUseCases
77
from app.infrastructure.containers import BotSampleRecordCommandContainer
8-
from app.infrastructure.db.sqlalchemy import provide_transaction_session
8+
from app.infrastructure.db.sqlalchemy import provide_session
99
from app.presentation.bot.commands.command_listing import SampleRecordCommands
1010
from app.presentation.bot.command_handlers.sample_record import CreateSampleRecordHandler
1111

1212
collector = HandlerCollector()
1313

14-
1514
@collector.command(**SampleRecordCommands.CREATE_RECORD.command_data())
16-
@provide_transaction_session
15+
@provide_session
1716
@inject
1817
async def create_sample_record(
1918
message: IncomingMessage,

app/presentation/bot/decorators/__init__.py

Whitespace-only changes.

app/presentation/bot/decorators/bot_exception_answer.py

Lines changed: 0 additions & 53 deletions
This file was deleted.

app/settings.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,11 @@ def _build_credentials_from_string(
7777
REDIS_DSN: str
7878
REDIS_CONNECTION_POOL_SIZE: int = 10
7979

80-
# healthcheck
81-
WORKER_TIMEOUT_SEC: float = 4
80+
# worker
81+
WORKER_CONCURRENCY: int = 2
82+
WORKERS_COUNT: int = 1
8283

8384
BOTX_CALLBACK_TIMEOUT_IN_SECONDS = 30
84-
8585
BOT_ASYNC_CLIENT_TIMEOUT_IN_SECONDS = 60
8686

8787

docker-compose.dev.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ services:
2121
- REDIS_DSN=redis://redis/0
2222
- DEBUG=true
2323
# '$$' prevents docker-compose from interpolating a value
24-
command: /bin/sh -c 'PYTHONPATH="$$PYTHONPATH:$$PWD" saq app.infrastructure.worker.worker.settings'
24+
command: /bin/sh -c 'PYTHONPATH="$$PYTHONPATH:$$PWD" saq app.infrastructure.worker.worker.saq_settings --web --workers 2'
25+
ports:
26+
- "8081:8080"
2527
env_file:
2628
- .env
2729
restart: always

0 commit comments

Comments
 (0)