Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 6 additions & 25 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
from app.domain.rate_limit import RateLimitConfig
from app.domain.saga.models import SagaConfig
from app.events.core import UnifiedProducer
from app.events.event_store import EventStore, create_event_store
from app.events.schema.schema_registry import SchemaRegistryManager
from app.services.admin import AdminEventsService, AdminSettingsService, AdminUserService
from app.services.auth_service import AuthService
Expand Down Expand Up @@ -182,11 +181,12 @@ def get_unified_producer(
self,
broker: KafkaBroker,
schema_registry: SchemaRegistryManager,
event_repository: EventRepository,
logger: logging.Logger,
settings: Settings,
event_metrics: EventMetrics,
) -> UnifiedProducer:
return UnifiedProducer(broker, schema_registry, logger, settings, event_metrics)
return UnifiedProducer(broker, schema_registry, event_repository, logger, settings, event_metrics)

@provide
def get_idempotency_repository(self, redis_client: redis.Redis) -> RedisIdempotencyRepository:
Expand Down Expand Up @@ -276,21 +276,8 @@ class EventProvider(Provider):
scope = Scope.APP

@provide
async def get_schema_registry(self, settings: Settings, logger: logging.Logger) -> SchemaRegistryManager:
registry = SchemaRegistryManager(settings, logger)
await registry.initialize_schemas()
return registry

@provide
def get_event_store(
self,
schema_registry: SchemaRegistryManager,
logger: logging.Logger,
event_metrics: EventMetrics,
) -> EventStore:
return create_event_store(
schema_registry=schema_registry, logger=logger, event_metrics=event_metrics, ttl_days=90
)
def get_schema_registry(self, settings: Settings, logger: logging.Logger) -> SchemaRegistryManager:
return SchemaRegistryManager(settings, logger)


class KubernetesProvider(Provider):
Expand Down Expand Up @@ -481,14 +468,12 @@ def get_event_service(self, event_repository: EventRepository) -> EventService:
@provide
def get_kafka_event_service(
self,
event_repository: EventRepository,
kafka_producer: UnifiedProducer,
settings: Settings,
logger: logging.Logger,
event_metrics: EventMetrics,
) -> KafkaEventService:
return KafkaEventService(
event_repository=event_repository,
kafka_producer=kafka_producer,
settings=settings,
logger=logger,
Expand Down Expand Up @@ -632,15 +617,15 @@ def get_execution_service(
self,
execution_repository: ExecutionRepository,
kafka_producer: UnifiedProducer,
event_store: EventStore,
event_repository: EventRepository,
settings: Settings,
logger: logging.Logger,
execution_metrics: ExecutionMetrics,
) -> ExecutionService:
return ExecutionService(
execution_repo=execution_repository,
producer=kafka_producer,
event_store=event_store,
event_repository=event_repository,
settings=settings,
logger=logger,
execution_metrics=execution_metrics,
Expand Down Expand Up @@ -874,14 +859,12 @@ def get_event_replay_service(
self,
replay_repository: ReplayRepository,
kafka_producer: UnifiedProducer,
event_store: EventStore,
replay_metrics: ReplayMetrics,
logger: logging.Logger,
) -> EventReplayService:
return EventReplayService(
repository=replay_repository,
producer=kafka_producer,
event_store=event_store,
replay_metrics=replay_metrics,
logger=logger,
)
Expand All @@ -901,7 +884,6 @@ async def get_event_replay_service(
self,
replay_repository: ReplayRepository,
kafka_producer: UnifiedProducer,
event_store: EventStore,
replay_metrics: ReplayMetrics,
logger: logging.Logger,
database: Database,
Expand All @@ -910,7 +892,6 @@ async def get_event_replay_service(
service = EventReplayService(
repository=replay_repository,
producer=kafka_producer,
event_store=event_store,
replay_metrics=replay_metrics,
logger=logger,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from app.domain.enums.replay import ReplayStatus
from app.domain.events import (
DomainEvent,
DomainEventAdapter,
EventBrowseResult,
EventDetail,
EventExportRow,
Expand All @@ -26,7 +27,6 @@
EventTypeCount,
HourlyEventCount,
UserEventCount,
domain_event_adapter,
)
from app.domain.replay.models import ReplayFilter, ReplaySessionState

Expand Down Expand Up @@ -59,7 +59,7 @@ async def browse_events(
total = await query.count()

docs = await query.sort([(sort_by, sort_order)]).skip(skip).limit(limit).to_list()
events = [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
events = [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]

return EventBrowseResult(events=events, total=total, skip=skip, limit=limit)

Expand All @@ -68,7 +68,7 @@ async def get_event_detail(self, event_id: str) -> EventDetail | None:
if not doc:
return None

event = domain_event_adapter.validate_python(doc, from_attributes=True)
event = DomainEventAdapter.validate_python(doc, from_attributes=True)

related_query = {"metadata.correlation_id": doc.metadata.correlation_id, "event_id": {"$ne": event_id}}
related_docs = await (
Expand Down
28 changes: 17 additions & 11 deletions backend/app/db/repositories/event_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from beanie.odm.operators.find import BaseFindOperator
from beanie.operators import GTE, LT, LTE, Eq, In, Not, Or, RegEx
from monggregate import Pipeline, S
from pymongo.errors import DuplicateKeyError

from app.core.tracing import EventAttributes
from app.core.tracing.utils import add_span_attributes
Expand All @@ -14,13 +15,13 @@
from app.domain.events import (
ArchivedEvent,
DomainEvent,
DomainEventAdapter,
EventAggregationResult,
EventListResult,
EventReplayInfo,
EventStatistics,
EventTypeCount,
ServiceEventCount,
domain_event_adapter,
)


Expand All @@ -41,6 +42,7 @@ def _build_time_filter(self, start_time: datetime | None, end_time: datetime | N
return {key: value for key, value in {"$gte": start_time, "$lte": end_time}.items() if value is not None}

async def store_event(self, event: DomainEvent) -> str:
"""Idempotent event store — silently ignores duplicates by event_id."""
data = event.model_dump(exclude_none=True)
data.setdefault("stored_at", datetime.now(timezone.utc))
doc = EventDocument(**data)
Expand All @@ -51,7 +53,11 @@ async def store_event(self, event: DomainEvent) -> str:
str(EventAttributes.EXECUTION_ID): event.aggregate_id or "",
}
)
await doc.insert()
try:
await doc.insert()
except DuplicateKeyError:
self.logger.debug(f"Event {event.event_id} already stored, skipping")
return event.event_id
self.logger.debug(f"Stored event {event.event_id} of type {event.event_type}")
return event.event_id

Expand All @@ -73,7 +79,7 @@ async def get_event(self, event_id: str) -> DomainEvent | None:
doc = await EventDocument.find_one(EventDocument.event_id == event_id)
if not doc:
return None
return domain_event_adapter.validate_python(doc, from_attributes=True)
return DomainEventAdapter.validate_python(doc, from_attributes=True)

async def get_events_by_type(
self,
Expand All @@ -94,7 +100,7 @@ async def get_events_by_type(
.limit(limit)
.to_list()
)
return [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
return [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]

async def get_events_by_aggregate(
self, aggregate_id: str, event_types: list[EventType] | None = None, limit: int = 100
Expand All @@ -105,7 +111,7 @@ async def get_events_by_aggregate(
docs = (
await EventDocument.find(*conditions).sort([("timestamp", SortDirection.ASCENDING)]).limit(limit).to_list()
)
return [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
return [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]

async def get_events_by_correlation(
self, correlation_id: str, limit: int = 100, skip: int = 0, user_id: str | None = None,
Expand All @@ -119,7 +125,7 @@ async def get_events_by_correlation(
.sort([("timestamp", SortDirection.ASCENDING)])
.skip(skip).limit(limit).to_list()
)
events = [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
events = [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]
total_count = await EventDocument.find(condition).count()
total_count = max(total_count, skip + len(events))
return EventListResult(
Expand Down Expand Up @@ -152,7 +158,7 @@ async def get_events_by_user(
.limit(limit)
.to_list()
)
return [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
return [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]

async def get_execution_events(
self, execution_id: str, limit: int = 100, skip: int = 0, exclude_system_events: bool = False
Expand All @@ -172,7 +178,7 @@ async def get_execution_events(
.sort([("timestamp", SortDirection.ASCENDING)])
.skip(skip).limit(limit).to_list()
)
events = [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
events = [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]
total_count = await EventDocument.find(*conditions).count()
total_count = max(total_count, skip + len(events))
return EventListResult(
Expand Down Expand Up @@ -307,7 +313,7 @@ async def get_user_events_paginated(
.sort([("timestamp", sort_direction)])
.skip(skip).limit(limit).to_list()
)
events = [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
events = [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]
total_count = await EventDocument.find(*conditions).count()
total_count = max(total_count, skip + len(events))
return EventListResult(
Expand All @@ -334,7 +340,7 @@ async def query_events(
.sort([(sort_field, SortDirection.DESCENDING)])
.skip(skip).limit(limit).to_list()
)
events = [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
events = [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]
total_count = await EventDocument.find(query).count()
total_count = max(total_count, skip + len(events))
return EventListResult(
Expand Down Expand Up @@ -398,7 +404,7 @@ async def get_aggregate_replay_info(self, aggregate_id: str) -> EventReplayInfo
)

async for doc in EventDocument.aggregate(pipeline.export()):
events = [domain_event_adapter.validate_python(e) for e in doc["events"]]
events = [DomainEventAdapter.validate_python(e) for e in doc["events"]]
return EventReplayInfo(
events=events,
event_count=doc["event_count"],
Expand Down
4 changes: 2 additions & 2 deletions backend/app/domain/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
CreatePodCommandEvent,
DeletePodCommandEvent,
DomainEvent,
DomainEventAdapter,
EventMetadata,
# Execution Events
ExecutionAcceptedEvent,
Expand Down Expand Up @@ -90,7 +91,6 @@
UserRegisteredEvent,
UserSettingsUpdatedEvent,
UserUpdatedEvent,
domain_event_adapter,
)

__all__ = [
Expand Down Expand Up @@ -119,7 +119,7 @@
"DomainEvent",
"EventMetadata",
"ResourceUsageDomain",
"domain_event_adapter",
"DomainEventAdapter",
# Execution Events
"ExecutionRequestedEvent",
"ExecutionAcceptedEvent",
Expand Down
2 changes: 1 addition & 1 deletion backend/app/domain/events/typed.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,4 +703,4 @@ class ArchivedEvent(AvroBase):
]

# TypeAdapter for polymorphic loading - validates raw data to correct typed event
domain_event_adapter: TypeAdapter[DomainEvent] = TypeAdapter(DomainEvent)
DomainEventAdapter: TypeAdapter[DomainEvent] = TypeAdapter(DomainEvent)
26 changes: 7 additions & 19 deletions backend/app/events/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,24 @@
from faststream import StreamMessage
from faststream.kafka import KafkaBroker

from app.domain.events.typed import DomainEvent
from app.domain.events.typed import DomainEvent, DomainEventAdapter
from app.events.schema.schema_registry import SchemaRegistryManager
from app.settings import Settings


def create_avro_decoder(
schema_registry: SchemaRegistryManager,
) -> Any:
"""Create a custom Avro decoder closure for FastStream subscribers.

The decoder receives a StreamMessage whose body is Confluent wire-format
Avro bytes (magic byte + 4-byte schema ID + Avro payload). We delegate
deserialization to SchemaRegistryManager which resolves the schema from
the registry and decodes into the concrete DomainEvent subclass.
"""

async def avro_decoder(msg: StreamMessage[Any]) -> DomainEvent:
return await schema_registry.deserialize_event(msg.body, msg.raw_message.topic)

return avro_decoder


def create_broker(
settings: Settings,
schema_registry: SchemaRegistryManager,
logger: logging.Logger,
) -> KafkaBroker:
"""Create a KafkaBroker with Avro decoder for standalone workers."""

async def avro_decoder(msg: StreamMessage[Any]) -> DomainEvent:
payload = await schema_registry.serializer.decode_message(msg.body)
return DomainEventAdapter.validate_python(payload)

return KafkaBroker(
settings.KAFKA_BOOTSTRAP_SERVERS,
decoder=create_avro_decoder(schema_registry),
decoder=avro_decoder,
logger=logger,
)
6 changes: 5 additions & 1 deletion backend/app/events/core/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from app.core.metrics import EventMetrics
from app.core.tracing.utils import inject_trace_context
from app.db.repositories.event_repository import EventRepository
from app.dlq.models import DLQMessage, DLQMessageStatus
from app.domain.enums.kafka import KafkaTopic
from app.domain.events.typed import DomainEvent
Expand All @@ -27,18 +28,21 @@ def __init__(
self,
broker: KafkaBroker,
schema_registry_manager: SchemaRegistryManager,
event_repository: EventRepository,
logger: logging.Logger,
settings: Settings,
event_metrics: EventMetrics,
):
self._broker = broker
self._schema_registry = schema_registry_manager
self._event_repository = event_repository
self.logger = logger
self._event_metrics = event_metrics
self._topic_prefix = settings.KAFKA_TOPIC_PREFIX

async def produce(self, event_to_produce: DomainEvent, key: str) -> None:
"""Produce a message to Kafka."""
"""Persist event to MongoDB, then publish to Kafka."""
await self._event_repository.store_event(event_to_produce)
topic = f"{self._topic_prefix}{EVENT_TYPE_TO_TOPIC[event_to_produce.event_type]}"
Comment thread
HardMax71 marked this conversation as resolved.
Outdated
try:
serialized_value = await self._schema_registry.serialize_event(event_to_produce)
Comment thread
HardMax71 marked this conversation as resolved.
Outdated
Expand Down
Loading
Loading