Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,20 +276,17 @@ class EventProvider(Provider):
scope = Scope.APP

@provide
async def get_schema_registry(self, settings: Settings, logger: logging.Logger) -> SchemaRegistryManager:
registry = SchemaRegistryManager(settings, logger)
await registry.initialize_schemas()
return registry
def get_schema_registry(self, settings: Settings, logger: logging.Logger) -> SchemaRegistryManager:
return SchemaRegistryManager(settings, logger)

@provide
def get_event_store(
self,
schema_registry: SchemaRegistryManager,
logger: logging.Logger,
event_metrics: EventMetrics,
) -> EventStore:
return create_event_store(
schema_registry=schema_registry, logger=logger, event_metrics=event_metrics, ttl_days=90
logger=logger, event_metrics=event_metrics, ttl_days=90
)


Expand Down Expand Up @@ -874,14 +871,12 @@ def get_event_replay_service(
self,
replay_repository: ReplayRepository,
kafka_producer: UnifiedProducer,
event_store: EventStore,
replay_metrics: ReplayMetrics,
logger: logging.Logger,
) -> EventReplayService:
return EventReplayService(
repository=replay_repository,
producer=kafka_producer,
event_store=event_store,
replay_metrics=replay_metrics,
logger=logger,
)
Expand All @@ -901,7 +896,6 @@ async def get_event_replay_service(
self,
replay_repository: ReplayRepository,
kafka_producer: UnifiedProducer,
event_store: EventStore,
replay_metrics: ReplayMetrics,
logger: logging.Logger,
database: Database,
Expand All @@ -910,7 +904,6 @@ async def get_event_replay_service(
service = EventReplayService(
repository=replay_repository,
producer=kafka_producer,
event_store=event_store,
replay_metrics=replay_metrics,
logger=logger,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from app.domain.enums.replay import ReplayStatus
from app.domain.events import (
DomainEvent,
DomainEventAdapter,
EventBrowseResult,
EventDetail,
EventExportRow,
Expand All @@ -26,7 +27,6 @@
EventTypeCount,
HourlyEventCount,
UserEventCount,
domain_event_adapter,
)
from app.domain.replay.models import ReplayFilter, ReplaySessionState

Expand Down Expand Up @@ -59,7 +59,7 @@ async def browse_events(
total = await query.count()

docs = await query.sort([(sort_by, sort_order)]).skip(skip).limit(limit).to_list()
events = [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
events = [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]

return EventBrowseResult(events=events, total=total, skip=skip, limit=limit)

Expand All @@ -68,7 +68,7 @@ async def get_event_detail(self, event_id: str) -> EventDetail | None:
if not doc:
return None

event = domain_event_adapter.validate_python(doc, from_attributes=True)
event = DomainEventAdapter.validate_python(doc, from_attributes=True)

related_query = {"metadata.correlation_id": doc.metadata.correlation_id, "event_id": {"$ne": event_id}}
related_docs = await (
Expand Down
20 changes: 10 additions & 10 deletions backend/app/db/repositories/event_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
from app.domain.events import (
ArchivedEvent,
DomainEvent,
DomainEventAdapter,
EventAggregationResult,
EventListResult,
EventReplayInfo,
EventStatistics,
EventTypeCount,
ServiceEventCount,
domain_event_adapter,
)


Expand Down Expand Up @@ -73,7 +73,7 @@ async def get_event(self, event_id: str) -> DomainEvent | None:
doc = await EventDocument.find_one(EventDocument.event_id == event_id)
if not doc:
return None
return domain_event_adapter.validate_python(doc, from_attributes=True)
return DomainEventAdapter.validate_python(doc, from_attributes=True)

async def get_events_by_type(
self,
Expand All @@ -94,7 +94,7 @@ async def get_events_by_type(
.limit(limit)
.to_list()
)
return [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
return [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]

async def get_events_by_aggregate(
self, aggregate_id: str, event_types: list[EventType] | None = None, limit: int = 100
Expand All @@ -105,7 +105,7 @@ async def get_events_by_aggregate(
docs = (
await EventDocument.find(*conditions).sort([("timestamp", SortDirection.ASCENDING)]).limit(limit).to_list()
)
return [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
return [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]

async def get_events_by_correlation(
self, correlation_id: str, limit: int = 100, skip: int = 0, user_id: str | None = None,
Expand All @@ -119,7 +119,7 @@ async def get_events_by_correlation(
.sort([("timestamp", SortDirection.ASCENDING)])
.skip(skip).limit(limit).to_list()
)
events = [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
events = [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]
total_count = await EventDocument.find(condition).count()
total_count = max(total_count, skip + len(events))
return EventListResult(
Expand Down Expand Up @@ -152,7 +152,7 @@ async def get_events_by_user(
.limit(limit)
.to_list()
)
return [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
return [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]

async def get_execution_events(
self, execution_id: str, limit: int = 100, skip: int = 0, exclude_system_events: bool = False
Expand All @@ -172,7 +172,7 @@ async def get_execution_events(
.sort([("timestamp", SortDirection.ASCENDING)])
.skip(skip).limit(limit).to_list()
)
events = [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
events = [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]
total_count = await EventDocument.find(*conditions).count()
total_count = max(total_count, skip + len(events))
return EventListResult(
Expand Down Expand Up @@ -307,7 +307,7 @@ async def get_user_events_paginated(
.sort([("timestamp", sort_direction)])
.skip(skip).limit(limit).to_list()
)
events = [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
events = [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]
total_count = await EventDocument.find(*conditions).count()
total_count = max(total_count, skip + len(events))
return EventListResult(
Expand All @@ -334,7 +334,7 @@ async def query_events(
.sort([(sort_field, SortDirection.DESCENDING)])
.skip(skip).limit(limit).to_list()
)
events = [domain_event_adapter.validate_python(d, from_attributes=True) for d in docs]
events = [DomainEventAdapter.validate_python(d, from_attributes=True) for d in docs]
total_count = await EventDocument.find(query).count()
total_count = max(total_count, skip + len(events))
return EventListResult(
Expand Down Expand Up @@ -398,7 +398,7 @@ async def get_aggregate_replay_info(self, aggregate_id: str) -> EventReplayInfo
)

async for doc in EventDocument.aggregate(pipeline.export()):
events = [domain_event_adapter.validate_python(e) for e in doc["events"]]
events = [DomainEventAdapter.validate_python(e) for e in doc["events"]]
return EventReplayInfo(
events=events,
event_count=doc["event_count"],
Expand Down
4 changes: 2 additions & 2 deletions backend/app/domain/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
CreatePodCommandEvent,
DeletePodCommandEvent,
DomainEvent,
DomainEventAdapter,
EventMetadata,
# Execution Events
ExecutionAcceptedEvent,
Expand Down Expand Up @@ -90,7 +91,6 @@
UserRegisteredEvent,
UserSettingsUpdatedEvent,
UserUpdatedEvent,
domain_event_adapter,
)

__all__ = [
Expand Down Expand Up @@ -119,7 +119,7 @@
"DomainEvent",
"EventMetadata",
"ResourceUsageDomain",
"domain_event_adapter",
"DomainEventAdapter",
# Execution Events
"ExecutionRequestedEvent",
"ExecutionAcceptedEvent",
Expand Down
2 changes: 1 addition & 1 deletion backend/app/domain/events/typed.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,4 +703,4 @@ class ArchivedEvent(AvroBase):
]

# TypeAdapter for polymorphic loading - validates raw data to correct typed event
domain_event_adapter: TypeAdapter[DomainEvent] = TypeAdapter(DomainEvent)
DomainEventAdapter: TypeAdapter[DomainEvent] = TypeAdapter(DomainEvent)
26 changes: 7 additions & 19 deletions backend/app/events/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,24 @@
from faststream import StreamMessage
from faststream.kafka import KafkaBroker

from app.domain.events.typed import DomainEvent
from app.domain.events.typed import DomainEvent, DomainEventAdapter
from app.events.schema.schema_registry import SchemaRegistryManager
from app.settings import Settings


def create_avro_decoder(
schema_registry: SchemaRegistryManager,
) -> Any:
"""Create a custom Avro decoder closure for FastStream subscribers.

The decoder receives a StreamMessage whose body is Confluent wire-format
Avro bytes (magic byte + 4-byte schema ID + Avro payload). We delegate
deserialization to SchemaRegistryManager which resolves the schema from
the registry and decodes into the concrete DomainEvent subclass.
"""

async def avro_decoder(msg: StreamMessage[Any]) -> DomainEvent:
return await schema_registry.deserialize_event(msg.body, msg.raw_message.topic)

return avro_decoder


def create_broker(
settings: Settings,
schema_registry: SchemaRegistryManager,
logger: logging.Logger,
) -> KafkaBroker:
"""Create a KafkaBroker with Avro decoder for standalone workers."""

async def avro_decoder(msg: StreamMessage[Any]) -> DomainEvent:
payload = await schema_registry.serializer.decode_message(msg.body)
return DomainEventAdapter.validate_python(payload)

return KafkaBroker(
settings.KAFKA_BOOTSTRAP_SERVERS,
decoder=create_avro_decoder(schema_registry),
decoder=avro_decoder,
logger=logger,
)
21 changes: 8 additions & 13 deletions backend/app/events/event_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,18 @@
from app.core.tracing.utils import add_span_attributes
from app.db.docs import EventDocument
from app.domain.enums.events import EventType
from app.domain.events.typed import DomainEvent
from app.events.schema.schema_registry import SchemaRegistryManager
from app.domain.events.typed import DomainEvent, DomainEventAdapter


class EventStore:
def __init__(
self,
schema_registry: SchemaRegistryManager,
logger: logging.Logger,
event_metrics: EventMetrics,
ttl_days: int = 90,
batch_size: int = 100,
):
self.metrics = event_metrics
self.schema_registry = schema_registry
self.logger = logger
self.ttl_days = ttl_days
self.batch_size = batch_size
Expand Down Expand Up @@ -115,7 +112,7 @@ async def get_event(self, event_id: str) -> DomainEvent | None:
if not doc:
return None

event = self.schema_registry.deserialize_json(doc.model_dump())
event = DomainEventAdapter.validate_python(doc.model_dump())

duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_by_id", "event_store")
Expand All @@ -141,7 +138,7 @@ async def get_events_by_type(
.limit(limit)
.to_list()
)
events = [self.schema_registry.deserialize_json(doc.model_dump()) for doc in docs]
events = [DomainEventAdapter.validate_python(doc.model_dump()) for doc in docs]

duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_by_type", "event_store")
Expand All @@ -158,7 +155,7 @@ async def get_execution_events(
query["event_type"] = {"$in": event_types}

docs = await EventDocument.find(query).sort([("timestamp", SortDirection.ASCENDING)]).to_list()
events = [self.schema_registry.deserialize_json(doc.model_dump()) for doc in docs]
events = [DomainEventAdapter.validate_python(doc.model_dump()) for doc in docs]

duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_execution_events", "event_store")
Expand All @@ -180,7 +177,7 @@ async def get_user_events(
query["timestamp"] = tr

docs = await EventDocument.find(query).sort([("timestamp", SortDirection.DESCENDING)]).limit(limit).to_list()
events = [self.schema_registry.deserialize_json(doc.model_dump()) for doc in docs]
events = [DomainEventAdapter.validate_python(doc.model_dump()) for doc in docs]

duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_user_events", "event_store")
Expand All @@ -201,7 +198,7 @@ async def get_security_events(
query["timestamp"] = tr

docs = await EventDocument.find(query).sort([("timestamp", SortDirection.DESCENDING)]).limit(limit).to_list()
events = [self.schema_registry.deserialize_json(doc.model_dump()) for doc in docs]
events = [DomainEventAdapter.validate_python(doc.model_dump()) for doc in docs]

duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_security_events", "event_store")
Expand All @@ -214,7 +211,7 @@ async def get_correlation_chain(self, correlation_id: str) -> list[DomainEvent]:
.sort([("timestamp", SortDirection.ASCENDING)])
.to_list()
)
events = [self.schema_registry.deserialize_json(doc.model_dump()) for doc in docs]
events = [DomainEventAdapter.validate_python(doc.model_dump()) for doc in docs]

duration = asyncio.get_running_loop().time() - start
self.metrics.record_event_query_duration(duration, "get_correlation_chain", "event_store")
Expand All @@ -238,7 +235,7 @@ async def replay_events(
query["event_type"] = {"$in": event_types}

async for doc in EventDocument.find(query).sort([("timestamp", SortDirection.ASCENDING)]):
event = self.schema_registry.deserialize_json(doc.model_dump())
event = DomainEventAdapter.validate_python(doc.model_dump())
if callback:
await callback(event)
count += 1
Expand Down Expand Up @@ -316,14 +313,12 @@ async def health_check(self) -> dict[str, Any]:


def create_event_store(
schema_registry: SchemaRegistryManager,
logger: logging.Logger,
event_metrics: EventMetrics,
ttl_days: int = 90,
batch_size: int = 100,
) -> EventStore:
return EventStore(
schema_registry=schema_registry,
logger=logger,
event_metrics=event_metrics,
ttl_days=ttl_days,
Expand Down
Loading
Loading