Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/app/core/k8s_clients.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# mypy: disable-error-code="slop-any-check"
# Rationale: kubernetes client 31.0.0 has no type annotations (all Any)
import logging
from dataclasses import dataclass

Expand Down
118 changes: 31 additions & 87 deletions backend/app/core/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,20 @@
import logging
import re
from datetime import datetime, timezone
from typing import Any, Dict
from typing import Any

from opentelemetry import trace

correlation_id_context: contextvars.ContextVar[str | None] = contextvars.ContextVar("correlation_id", default=None)

request_metadata_context: contextvars.ContextVar[Dict[str, Any] | None] = contextvars.ContextVar(
request_metadata_context: contextvars.ContextVar[dict[str, Any] | None] = contextvars.ContextVar(
"request_metadata", default=None
)


class CorrelationFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
correlation_id = correlation_id_context.get()
if correlation_id:
record.correlation_id = correlation_id

metadata = request_metadata_context.get()
if metadata:
record.request_method = metadata.get("method")
record.request_path = metadata.get("path")
# Client IP is now safely extracted without DNS lookup
if metadata.get("client"):
record.client_host = metadata["client"].get("host")

return True


class JSONFormatter(logging.Formatter):
"""JSON formatter that reads context directly from typed sources."""

def _sanitize_sensitive_data(self, data: str) -> str:
"""Remove or mask sensitive information from log data."""
# Mask API keys, tokens, and similar sensitive data
Expand Down Expand Up @@ -59,89 +44,48 @@ def _sanitize_sensitive_data(self, data: str) -> str:
return data

def format(self, record: logging.LogRecord) -> str:
# Sanitize the message
message = self._sanitize_sensitive_data(record.getMessage())

log_data = {
log_data: dict[str, Any] = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": message,
"message": self._sanitize_sensitive_data(record.getMessage()),
}

if hasattr(record, "correlation_id"):
log_data["correlation_id"] = record.correlation_id

if hasattr(record, "request_method"):
log_data["request_method"] = record.request_method

if hasattr(record, "request_path"):
log_data["request_path"] = record.request_path

if hasattr(record, "client_host"):
log_data["client_host"] = record.client_host

# OpenTelemetry trace context (hexadecimal ids)
if hasattr(record, "trace_id"):
log_data["trace_id"] = record.trace_id
if hasattr(record, "span_id"):
log_data["span_id"] = record.span_id

if record.exc_info:
exc_text = self.formatException(record.exc_info)
log_data["exc_info"] = self._sanitize_sensitive_data(exc_text)

if hasattr(record, "stack_info") and record.stack_info:
stack_text = self.formatStack(record.stack_info)
log_data["stack_info"] = self._sanitize_sensitive_data(stack_text)
# Correlation context - read directly from typed ContextVar
(v := correlation_id_context.get()) and log_data.update(correlation_id=v)

# Request metadata - read directly from typed ContextVar
metadata = request_metadata_context.get() or {}
(v := metadata.get("method")) and log_data.update(request_method=v)
(v := metadata.get("path")) and log_data.update(request_path=v)
(v := (metadata.get("client") or {}).get("host")) and log_data.update(client_host=v)

# OpenTelemetry trace context - read directly from typed trace API
span = trace.get_current_span()
if span.is_recording():
span_context = span.get_span_context()
if span_context.is_valid:
log_data["trace_id"] = format(span_context.trace_id, "032x")
log_data["span_id"] = format(span_context.span_id, "016x")

record.exc_info and log_data.update(
exc_info=self._sanitize_sensitive_data(self.formatException(record.exc_info))
)
record.stack_info and log_data.update(
stack_info=self._sanitize_sensitive_data(self.formatStack(record.stack_info))
)

return json.dumps(log_data, ensure_ascii=False)


LOG_LEVELS: dict[str, int] = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}


def setup_logger(log_level: str) -> logging.Logger:
"""Create and configure the application logger. Called by DI with Settings.LOG_LEVEL."""
new_logger = logging.getLogger("integr8scode")
new_logger.handlers.clear()

console_handler = logging.StreamHandler()
formatter = JSONFormatter()

console_handler.setFormatter(formatter)

correlation_filter = CorrelationFilter()
console_handler.addFilter(correlation_filter)

class TracingFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
# Inline minimal helpers to avoid circular import on tracing.utils
span = trace.get_current_span()
trace_id = None
span_id = None
if span and span.is_recording():
span_context = span.get_span_context()
if span_context.is_valid:
trace_id = format(span_context.trace_id, "032x")
span_id = format(span_context.span_id, "016x")
if trace_id:
record.trace_id = trace_id
if span_id:
record.span_id = span_id
return True

console_handler.addFilter(TracingFilter())

console_handler.setFormatter(JSONFormatter())
new_logger.addHandler(console_handler)

level = LOG_LEVELS.get(log_level.upper(), logging.DEBUG)
new_logger.setLevel(level)
new_logger.setLevel(logging.getLevelNamesMapping().get(log_level.upper(), logging.DEBUG))

return new_logger
67 changes: 41 additions & 26 deletions backend/app/core/metrics/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ def _create_instruments(self) -> None:
name="coordinator.scheduling.decisions.total", description="Total scheduling decisions made", unit="1"
)

# Internal state tracking for gauge-like counters
self._active_executions_current: int = 0
self._exec_request_queue_size: int = 0
self._resource_cpu: float = 0.0
self._resource_memory: float = 0.0
self._resource_usage_cpu: float = 0.0
self._resource_usage_memory: float = 0.0
self._rate_limiter_user: int = 0
self._rate_limiter_global: int = 0

def record_coordinator_processing_time(self, duration_seconds: float) -> None:
self.coordinator_processing_time.record(duration_seconds)

Expand All @@ -78,8 +88,7 @@ def update_active_executions_gauge(self, count: int) -> None:
"""Update the count of active executions (absolute value)."""
# Reset to 0 then set to new value (for gauge-like behavior)
# This is a workaround since we're using up_down_counter
current_val = getattr(self, "_active_executions_current", 0)
delta = count - current_val
delta = count - self._active_executions_current
if delta != 0:
self.coordinator_active_executions.add(delta)
self._active_executions_current = count
Expand All @@ -103,12 +112,10 @@ def record_queue_wait_time_by_priority(self, wait_seconds: float, priority: str,

def update_execution_request_queue_size(self, size: int) -> None:
"""Update the execution-only request queue depth (absolute value)."""
key = "_exec_request_queue_size"
current_val = getattr(self, key, 0)
delta = size - current_val
delta = size - self._exec_request_queue_size
if delta != 0:
self.execution_request_queue_depth.add(delta)
setattr(self, key, size)
self._exec_request_queue_size = size

def record_rate_limited(self, limit_type: str, user_id: str) -> None:
self.coordinator_rate_limited.add(1, attributes={"limit_type": limit_type, "user_id": user_id})
Expand All @@ -124,30 +131,34 @@ def record_resource_allocation(self, resource_type: str, amount: float, executio
)

# Update gauge for current allocation
key = f"_resource_{resource_type}"
current_val = getattr(self, key, 0.0)
new_val = current_val + amount
setattr(self, key, new_val)
if resource_type == "cpu":
self._resource_cpu += amount
elif resource_type == "memory":
Comment thread
HardMax71 marked this conversation as resolved.
Outdated
self._resource_memory += amount

def record_resource_release(self, resource_type: str, amount: float, execution_id: str) -> None:
self.coordinator_resource_allocations.add(
-1, attributes={"resource_type": resource_type, "execution_id": execution_id}
)

# Update gauge for current allocation
key = f"_resource_{resource_type}"
current_val = getattr(self, key, 0.0)
new_val = max(0.0, current_val - amount)
setattr(self, key, new_val)
if resource_type == "cpu":
self._resource_cpu = max(0.0, self._resource_cpu - amount)
elif resource_type == "memory":
self._resource_memory = max(0.0, self._resource_memory - amount)

def update_resource_usage(self, resource_type: str, usage_percent: float) -> None:
# Record as a gauge-like metric
key = f"_resource_usage_{resource_type}"
current_val = getattr(self, key, 0.0)
delta = usage_percent - current_val
if delta != 0:
self.coordinator_resource_utilization.add(delta, attributes={"resource_type": resource_type})
setattr(self, key, usage_percent)
if resource_type == "cpu":
delta = usage_percent - self._resource_usage_cpu
if delta != 0:
self.coordinator_resource_utilization.add(delta, attributes={"resource_type": resource_type})
self._resource_usage_cpu = usage_percent
elif resource_type == "memory":
delta = usage_percent - self._resource_usage_memory
if delta != 0:
self.coordinator_resource_utilization.add(delta, attributes={"resource_type": resource_type})
self._resource_usage_memory = usage_percent

def record_scheduling_decision(self, decision: str, reason: str) -> None:
self.coordinator_scheduling_decisions.add(1, attributes={"decision": decision, "reason": reason})
Expand All @@ -167,12 +178,16 @@ def record_priority_change(self, execution_id: str, old_priority: str, new_prior

def update_rate_limiter_tokens(self, limit_type: str, tokens: int) -> None:
# Track tokens as gauge-like metric
key = f"_rate_limiter_{limit_type}"
current_val = getattr(self, key, 0)
delta = tokens - current_val
if delta != 0:
self.coordinator_resource_utilization.add(delta, attributes={"resource_type": f"rate_limit_{limit_type}"})
setattr(self, key, tokens)
if limit_type == "user":
delta = tokens - self._rate_limiter_user
if delta != 0:
self.coordinator_resource_utilization.add(delta, attributes={"resource_type": f"rate_limit_{limit_type}"})
self._rate_limiter_user = tokens
elif limit_type == "global":
delta = tokens - self._rate_limiter_global
if delta != 0:
self.coordinator_resource_utilization.add(delta, attributes={"resource_type": f"rate_limit_{limit_type}"})
self._rate_limiter_global = tokens
Comment thread
HardMax71 marked this conversation as resolved.

def record_rate_limit_reset(self, limit_type: str, user_id: str) -> None:
self.coordinator_scheduling_decisions.add(
Expand Down
2 changes: 2 additions & 0 deletions backend/app/events/core/consumer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# mypy: disable-error-code="slop-any-check"
# Rationale: aiokafka message headers are untyped (Any)
import asyncio
import logging
from collections.abc import Awaitable, Callable
Expand Down
4 changes: 2 additions & 2 deletions backend/app/services/admin/admin_user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ async def create_user(self, *, admin_username: str, user_data: UserCreate) -> Us
username=user_data.username,
email=user_data.email,
hashed_password=hashed_password,
role=getattr(user_data, "role", UserRole.USER),
is_active=getattr(user_data, "is_active", True),
role=user_data.role,
is_active=user_data.is_active,
is_superuser=False,
)
created = await self._users.create_user(create_data)
Expand Down
5 changes: 2 additions & 3 deletions backend/app/services/coordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async def _on_stop(self) -> None:
await self.queue_manager.stop()

# Close idempotency manager
if hasattr(self, "idempotency_manager") and self.idempotency_manager:
if self.idempotency_manager:
await self.idempotency_manager.close()

self.logger.info(f"ExecutionCoordinator service stopped. Active executions: {len(self._active_executions)}")
Expand Down Expand Up @@ -360,8 +360,7 @@ async def _schedule_execution(self, event: ExecutionRequestedEvent) -> None:

# Track metrics
queue_time = start_time - event.timestamp.timestamp()
priority = getattr(event, "priority", QueuePriority.NORMAL.value)
self.metrics.record_coordinator_queue_time(queue_time, QueuePriority(priority).name)
self.metrics.record_coordinator_queue_time(queue_time, QueuePriority(event.priority).name)

scheduling_duration = time.time() - start_time
self.metrics.record_coordinator_scheduling_duration(scheduling_duration)
Expand Down
2 changes: 1 addition & 1 deletion backend/app/services/event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def _filter_to_mongo_query(flt: EventFilter) -> dict[str, Any]:
query["metadata.user_id"] = flt.user_id
if flt.service_name:
query["metadata.service_name"] = flt.service_name
if getattr(flt, "status", None):
if flt.status:
query["status"] = flt.status

if flt.start_time or flt.end_time:
Expand Down
2 changes: 2 additions & 0 deletions backend/app/services/idempotency/middleware.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# mypy: disable-error-code="slop-any-check"
# Rationale: Works with dynamically typed event data
"""Idempotent event processing middleware"""

import asyncio
Expand Down
4 changes: 3 additions & 1 deletion backend/app/services/idempotency/redis_repository.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# mypy: disable-error-code="slop-any-check"
# Rationale: redis pipeline.execute() returns List[Any]
from __future__ import annotations

import json
Expand All @@ -14,7 +16,7 @@ def _iso(dt: datetime) -> str:
return dt.astimezone(timezone.utc).isoformat()


def _json_default(obj: Any) -> str:
def _json_default(obj: datetime | Any) -> str:
Comment thread
HardMax71 marked this conversation as resolved.
Outdated
if isinstance(obj, datetime):
return _iso(obj)
return str(obj)
Expand Down
2 changes: 2 additions & 0 deletions backend/app/services/notification_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# mypy: disable-error-code="slop-any-check"
# Rationale: Works with dynamically typed notification data
import asyncio
import logging
from dataclasses import dataclass, field
Expand Down
2 changes: 2 additions & 0 deletions backend/app/services/pod_monitor/event_mapper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# mypy: disable-error-code="slop-any-check"
# Rationale: kubernetes client 31.0.0 has no type annotations (all Any)
import ast
import json
import logging
Expand Down
2 changes: 2 additions & 0 deletions backend/app/services/rate_limit_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# mypy: disable-error-code="slop-any-check"
# Rationale: redis commands return untyped values
import json
import math
import re
Expand Down
Loading
Loading