Skip to content

Commit 2454249

Browse files
authored
Fix: offload of kafka-internal metrics to faststream middleware (#243)
* feat: removed calculation/acquisition of kafka-internal metrics (using faststream middleware for that) * fix: removed try-catch-raise * fix: detected issues * fix: logging caught error in event repo
1 parent d31dfd3 commit 2454249

13 files changed

Lines changed: 87 additions & 199 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
<a href="https://github.com/HardMax71/Integr8sCode/actions/workflows/security.yml">
1313
<img src="https://img.shields.io/github/actions/workflow/status/HardMax71/Integr8sCode/security.yml?branch=main&label=security&logo=shieldsdotio&logoColor=white" alt="Security Scan Status" />
1414
</a>
15-
<a href="https://github.com/HardMax71/Integr8sCode/actions/workflows/vulture.yml">
16-
<img src="https://img.shields.io/github/actions/workflow/status/HardMax71/Integr8sCode/vulture.yml?branch=main&label=dead%20code&logo=python&logoColor=white" alt="Dead Code Check" />
15+
<a href="https://github.com/HardMax71/Integr8sCode/actions/workflows/grimp.yml">
16+
<img src="https://img.shields.io/github/actions/workflow/status/HardMax71/Integr8sCode/grimp.yml?branch=main&label=dead%20code&logo=python&logoColor=white" alt="Dead Code Check" />
1717
</a>
1818
<a href="https://github.com/HardMax71/Integr8sCode/actions/workflows/docker.yml">
1919
<img src="https://img.shields.io/github/actions/workflow/status/HardMax71/Integr8sCode/docker.yml?branch=main&label=docker&logo=docker&logoColor=white" alt="Docker Scan Status" />

backend/app/core/metrics/events.py

Lines changed: 5 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22

33

44
class EventMetrics(BaseMetrics):
5-
"""Metrics for event processing and Kafka."""
5+
"""Metrics for domain-level event processing.
6+
7+
Transport-level Kafka metrics (produced/consumed/errors) are handled
8+
automatically by KafkaTelemetryMiddleware on the broker.
9+
"""
610

711
def _create_instruments(self) -> None:
8-
# Core event metrics
912
self.event_published = self._meter.create_counter(
1013
name="events.published.total", description="Total number of events published", unit="1"
1114
)
@@ -18,33 +21,6 @@ def _create_instruments(self) -> None:
1821
name="event.processing.errors.total", description="Total number of event processing errors", unit="1"
1922
)
2023

21-
# Event bus metrics
22-
self.event_bus_queue_size = self._meter.create_up_down_counter(
23-
name="event.bus.queue.size", description="Size of event bus message queue", unit="1"
24-
)
25-
26-
# Event replay metrics
27-
self.event_replay_operations = self._meter.create_counter(
28-
name="event.replay.operations.total", description="Total number of event replay operations", unit="1"
29-
)
30-
31-
# Kafka-specific metrics
32-
self.kafka_messages_produced = self._meter.create_counter(
33-
name="kafka.messages.produced.total", description="Total number of messages produced to Kafka", unit="1"
34-
)
35-
36-
self.kafka_messages_consumed = self._meter.create_counter(
37-
name="kafka.messages.consumed.total", description="Total number of messages consumed from Kafka", unit="1"
38-
)
39-
40-
self.kafka_production_errors = self._meter.create_counter(
41-
name="kafka.production.errors.total", description="Total number of Kafka production errors", unit="1"
42-
)
43-
44-
self.kafka_consumption_errors = self._meter.create_counter(
45-
name="kafka.consumption.errors.total", description="Total number of Kafka consumption errors", unit="1"
46-
)
47-
4824
def record_event_published(self, event_type: str, event_category: str | None = None) -> None:
4925
if event_category is None:
5026
event_category = event_type.split(".")[0] if "." in event_type else event_type
@@ -54,12 +30,6 @@ def record_event_published(self, event_type: str, event_category: str | None = N
5430
def record_event_processing_duration(self, duration_seconds: float, event_type: str) -> None:
5531
self.event_processing_duration.record(duration_seconds, attributes={"event_type": event_type})
5632

57-
def record_event_replay_operation(self, operation: str, status: str) -> None:
58-
self.event_replay_operations.add(1, attributes={"operation": operation, "status": status})
59-
60-
def record_event_stored(self, event_type: str, collection: str) -> None:
61-
self.event_published.add(1, attributes={"event_type": event_type, "aggregate_type": collection})
62-
6333
def record_events_processing_failed(
6434
self, topic: str, event_type: str, consumer_group: str, error_type: str
6535
) -> None:
@@ -72,50 +42,3 @@ def record_events_processing_failed(
7242
"error_type": error_type,
7343
},
7444
)
75-
76-
def record_event_store_duration(self, duration: float, operation: str, collection: str) -> None:
77-
self.event_processing_duration.record(duration, attributes={"operation": operation, "collection": collection})
78-
79-
def record_event_store_failed(self, event_type: str, error_type: str) -> None:
80-
self.event_processing_errors.add(
81-
1, attributes={"event_type": event_type, "error_type": error_type, "operation": "store"}
82-
)
83-
84-
def record_event_query_duration(self, duration: float, query_type: str, collection: str) -> None:
85-
self.event_processing_duration.record(
86-
duration, attributes={"operation": f"query_{query_type}", "collection": collection}
87-
)
88-
89-
def record_processing_duration(
90-
self, duration_seconds: float, topic: str, event_type: str, consumer_group: str
91-
) -> None:
92-
self.event_processing_duration.record(
93-
duration_seconds, attributes={"topic": topic, "event_type": event_type, "consumer_group": consumer_group}
94-
)
95-
96-
def record_kafka_message_produced(self, topic: str, partition: int = -1) -> None:
97-
self.kafka_messages_produced.add(
98-
1, attributes={"topic": topic, "partition": str(partition) if partition >= 0 else "auto"}
99-
)
100-
101-
def record_kafka_message_consumed(self, topic: str, consumer_group: str) -> None:
102-
self.kafka_messages_consumed.add(1, attributes={"topic": topic, "consumer_group": consumer_group})
103-
104-
def record_kafka_production_error(self, topic: str, error_type: str) -> None:
105-
self.kafka_production_errors.add(1, attributes={"topic": topic, "error_type": error_type})
106-
107-
def record_kafka_consumption_error(self, topic: str, consumer_group: str, error_type: str) -> None:
108-
self.kafka_consumption_errors.add(
109-
1, attributes={"topic": topic, "consumer_group": consumer_group, "error_type": error_type}
110-
)
111-
112-
def update_event_bus_queue_size(self, delta: int, queue_name: str = "default") -> None:
113-
self.event_bus_queue_size.add(delta, attributes={"queue": queue_name})
114-
115-
def set_event_bus_queue_size(self, size: int, queue_name: str = "default") -> None:
116-
key = f"_event_bus_size_{queue_name}"
117-
current_val = getattr(self, key, 0)
118-
delta = size - current_val
119-
if delta != 0:
120-
self.event_bus_queue_size.add(delta, attributes={"queue": queue_name})
121-
setattr(self, key, size)

backend/app/core/providers.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from app.dlq.manager import DLQManager
4444
from app.domain.saga import SagaConfig
4545
from app.events import UnifiedProducer
46+
from app.events.core.transport import KafkaEventTransport
4647
from app.services.admin import AdminEventsService, AdminSettingsService, AdminUserService
4748
from app.services.admin.admin_execution_service import AdminExecutionService
4849
from app.services.auth_service import AuthService
@@ -167,14 +168,20 @@ class MessagingProvider(Provider):
167168
scope = Scope.APP
168169

169170
@provide
170-
def get_unified_producer(
171+
def get_kafka_event_transport(
171172
self,
172173
broker: KafkaBroker,
173-
event_repository: EventRepository,
174174
logger: structlog.stdlib.BoundLogger,
175-
event_metrics: EventMetrics,
175+
) -> KafkaEventTransport:
176+
return KafkaEventTransport(broker, logger)
177+
178+
@provide
179+
def get_unified_producer(
180+
self,
181+
event_repository: EventRepository,
182+
transport: KafkaEventTransport,
176183
) -> UnifiedProducer:
177-
return UnifiedProducer(broker, event_repository, logger, event_metrics)
184+
return UnifiedProducer(event_repository, transport)
178185

179186
@provide
180187
def get_idempotency_repository(self, redis_client: redis.Redis) -> RedisIdempotencyRepository:
@@ -622,14 +629,12 @@ def get_kubernetes_worker(
622629
kafka_producer: UnifiedProducer,
623630
settings: Settings,
624631
logger: structlog.stdlib.BoundLogger,
625-
event_metrics: EventMetrics,
626632
) -> KubernetesWorker:
627633
return KubernetesWorker(
628634
api_client=api_client,
629635
producer=kafka_producer,
630636
settings=settings,
631637
logger=logger,
632-
event_metrics=event_metrics,
633638
)
634639

635640

backend/app/db/repositories/event_repository.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,13 @@ async def store_event(self, event: DomainEvent) -> str:
5858
return event.event_id
5959

6060
async def mark_publish_failed(self, event_id: str) -> None:
61-
"""Mark an event as failed to publish to Kafka for later retry."""
62-
await EventDocument.find_one(
63-
EventDocument.event_id == event_id,
64-
).update({"$set": {"publish_failed": True, "publish_failed_at": datetime.now(timezone.utc)}})
61+
"""Best-effort mark of an event as failed to publish. Never raises."""
62+
try:
63+
await EventDocument.find_one(
64+
EventDocument.event_id == event_id,
65+
).update({"$set": {"publish_failed": True, "publish_failed_at": datetime.now(timezone.utc)}})
66+
except Exception as exc:
67+
self.logger.warning("Could not mark event as publish-failed", event_id=event_id, error=str(exc))
6568

6669
async def get_event(self, event_id: str) -> DomainEvent | None:
6770
doc = await EventDocument.find_one(EventDocument.event_id == event_id)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from .producer import UnifiedProducer
2+
from .transport import KafkaEventTransport
23

34
__all__ = [
5+
"KafkaEventTransport",
46
"UnifiedProducer",
57
]
Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,23 @@
1-
import structlog
2-
from faststream.kafka import KafkaBroker
3-
4-
from app.core.metrics import EventMetrics
51
from app.db import EventRepository
62
from app.domain.events import DomainEvent
3+
from app.events.core.transport import KafkaEventTransport
74

85

96
class UnifiedProducer:
10-
"""Kafka producer backed by FastStream KafkaBroker.
7+
"""Orchestrates the store-then-publish outbox pattern.
118
12-
FastStream handles Pydantic JSON serialization natively.
13-
The broker's lifecycle is managed externally (FastStream app or FastAPI lifespan).
9+
Persists the event to MongoDB, then delegates to KafkaEventTransport
10+
for Kafka delivery. On transport failure the event is marked as
11+
failed-to-publish before the exception propagates.
1412
"""
1513

1614
def __init__(
1715
self,
18-
broker: KafkaBroker,
1916
event_repository: EventRepository,
20-
logger: structlog.stdlib.BoundLogger,
21-
event_metrics: EventMetrics,
17+
transport: KafkaEventTransport,
2218
):
23-
self._broker = broker
2419
self._event_repository = event_repository
25-
self.logger = logger
26-
self._event_metrics = event_metrics
20+
self._transport = transport
2721

2822
async def produce(self, event_to_produce: DomainEvent, key: str) -> None:
2923
"""Persist event to MongoDB, then publish to Kafka.
@@ -32,19 +26,8 @@ async def produce(self, event_to_produce: DomainEvent, key: str) -> None:
3226
in MongoDB before the exception propagates.
3327
"""
3428
await self._event_repository.store_event(event_to_produce)
35-
topic = event_to_produce.event_type
3629
try:
37-
await self._broker.publish(
38-
message=event_to_produce,
39-
topic=topic,
40-
key=key.encode(),
41-
)
42-
43-
self._event_metrics.record_kafka_message_produced(topic)
44-
self.logger.debug("Event sent to topic", event_type=event_to_produce.event_type, topic=topic)
45-
46-
except Exception as e:
47-
self._event_metrics.record_kafka_production_error(topic=topic, error_type=type(e).__name__)
48-
self.logger.error("Failed to produce message", topic=topic, error=str(e))
30+
await self._transport.publish(event_to_produce, event_to_produce.event_type, key)
31+
except Exception:
4932
await self._event_repository.mark_publish_failed(event_to_produce.event_id)
5033
raise
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import structlog
2+
from faststream.kafka import KafkaBroker
3+
4+
from app.domain.events import DomainEvent
5+
6+
7+
class KafkaEventTransport:
8+
"""Publishes events to Kafka.
9+
10+
Transport-level metrics (produced count, error count, latency) are
11+
recorded automatically by KafkaTelemetryMiddleware on the broker.
12+
"""
13+
14+
def __init__(
15+
self,
16+
broker: KafkaBroker,
17+
logger: structlog.stdlib.BoundLogger,
18+
):
19+
self._broker = broker
20+
self._logger = logger
21+
22+
async def publish(self, event: DomainEvent, topic: str, key: str) -> None:
23+
"""Publish event to Kafka."""
24+
await self._broker.publish(
25+
message=event,
26+
topic=topic,
27+
key=key.encode(),
28+
)
29+
self._logger.debug("Event sent to topic", event_type=event.event_type, topic=topic)

backend/app/events/handlers.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,14 @@
3434
async def _track_consumed(
3535
metrics: EventMetrics, event: DomainEvent, consumer_group: str, coro: Awaitable[None],
3636
) -> None:
37-
"""Record consumption metric, await *coro*, and record failure metric on error."""
38-
metrics.record_kafka_message_consumed(topic=event.event_type, consumer_group=consumer_group)
37+
"""Await *coro* and record domain-level failure metric on error."""
3938
try:
4039
await coro
4140
except Exception as e:
4241
metrics.record_events_processing_failed(
4342
topic=event.event_type, event_type=event.event_type,
4443
consumer_group=consumer_group, error_type=type(e).__name__,
4544
)
46-
metrics.record_kafka_consumption_error(
47-
topic=event.event_type, consumer_group=consumer_group, error_type=type(e).__name__,
48-
)
4945
raise
5046

5147

@@ -266,9 +262,7 @@ def register_sse_subscriber(broker: KafkaBroker, settings: Settings) -> None:
266262
async def on_sse_event(
267263
body: DomainEvent,
268264
sse_bus: FromDishka[SSERedisBus],
269-
event_metrics: FromDishka[EventMetrics],
270265
) -> None:
271-
event_metrics.record_kafka_message_consumed(topic=body.event_type, consumer_group=group_id)
272266
execution_id = getattr(body, "execution_id", None)
273267
if execution_id:
274268
sse_data = SSEExecutionEventData(**{

backend/app/services/k8s_worker/worker.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from kubernetes_asyncio import client as k8s_client
88
from kubernetes_asyncio.client.rest import ApiException
99

10-
from app.core.metrics import EventMetrics, ExecutionMetrics, KubernetesMetrics
10+
from app.core.metrics import ExecutionMetrics, KubernetesMetrics
1111
from app.domain.enums import ExecutionErrorType
1212
from app.domain.events import (
1313
CreatePodCommandEvent,
@@ -41,9 +41,7 @@ def __init__(
4141
producer: UnifiedProducer,
4242
settings: Settings,
4343
logger: structlog.stdlib.BoundLogger,
44-
event_metrics: EventMetrics,
4544
):
46-
self._event_metrics = event_metrics
4745
self.logger = logger
4846
self.metrics = KubernetesMetrics(settings)
4947
self.execution_metrics = ExecutionMetrics(settings)

0 commit comments

Comments
 (0)