Skip to content

Commit 5e71fbf

Browse files
authored
new types, updated rate limit service (#102)
* new types, updated rate limit service * more typed stuff, also updated openapi spec and regenerated types * frontend linting fix
1 parent 0c0b21f commit 5e71fbf

92 files changed

Lines changed: 1284 additions & 1631 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

backend/app/api/routes/dlq.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,7 @@ async def retry_dlq_messages(
6363
retry_request: ManualRetryRequest, dlq_manager: FromDishka[DLQManager]
6464
) -> DLQBatchRetryResponse:
6565
result = await dlq_manager.retry_messages_batch(retry_request.event_ids)
66-
return DLQBatchRetryResponse(
67-
total=result.total,
68-
successful=result.successful,
69-
failed=result.failed,
70-
details=[
71-
{"event_id": d.event_id, "status": d.status, **({"error": d.error} if d.error else {})}
72-
for d in result.details
73-
],
74-
)
66+
return DLQBatchRetryResponse.model_validate(result, from_attributes=True)
7567

7668

7769
@router.post("/retry-policy", response_model=MessageResponse)

backend/app/api/routes/events.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import logging
33
from datetime import datetime, timedelta, timezone
4-
from typing import Annotated, Any, Dict, List
4+
from typing import Annotated, Any
55

66
from dishka import FromDishka
77
from dishka.integrations.fastapi import DishkaRoute
@@ -74,7 +74,7 @@ async def get_execution_events(
7474
async def get_user_events(
7575
current_user: Annotated[UserResponse, Depends(current_user)],
7676
event_service: FromDishka[EventService],
77-
event_types: List[EventType] | None = Query(None),
77+
event_types: list[EventType] | None = Query(None),
7878
start_time: datetime | None = Query(None),
7979
end_time: datetime | None = Query(None),
8080
limit: int = Query(100, ge=1, le=1000),
@@ -259,12 +259,12 @@ async def publish_custom_event(
259259
return PublishEventResponse(event_id=event_id, status="published", timestamp=datetime.now(timezone.utc))
260260

261261

262-
@router.post("/aggregate", response_model=List[Dict[str, Any]])
262+
@router.post("/aggregate", response_model=list[dict[str, Any]])
263263
async def aggregate_events(
264264
current_user: Annotated[UserResponse, Depends(current_user)],
265265
aggregation: EventAggregationRequest,
266266
event_service: FromDishka[EventService],
267-
) -> List[Dict[str, Any]]:
267+
) -> list[dict[str, Any]]:
268268
result = await event_service.aggregate_events(
269269
user_id=current_user.user_id,
270270
user_role=current_user.role,
@@ -275,10 +275,10 @@ async def aggregate_events(
275275
return result.results
276276

277277

278-
@router.get("/types/list", response_model=List[str])
278+
@router.get("/types/list", response_model=list[str])
279279
async def list_event_types(
280280
current_user: Annotated[UserResponse, Depends(current_user)], event_service: FromDishka[EventService]
281-
) -> List[str]:
281+
) -> list[str]:
282282
event_types = await event_service.list_event_types(user_id=current_user.user_id, user_role=current_user.role)
283283
return event_types
284284

backend/app/api/routes/execution.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from app.domain.enums.user import UserRole
1515
from app.domain.events.typed import BaseEvent, DomainEvent, EventMetadata
1616
from app.domain.exceptions import DomainError
17+
from app.domain.idempotency import KeyStrategy
1718
from app.schemas_pydantic.execution import (
1819
CancelExecutionRequest,
1920
CancelResponse,
@@ -88,15 +89,15 @@ async def create_execution(
8889
# Check for duplicate request using custom key
8990
idempotency_result = await idempotency_manager.check_and_reserve(
9091
event=pseudo_event,
91-
key_strategy="custom",
92+
key_strategy=KeyStrategy.CUSTOM,
9293
custom_key=f"http:{current_user.user_id}:{idempotency_key}",
9394
ttl_seconds=86400, # 24 hours TTL for HTTP idempotency
9495
)
9596

9697
if idempotency_result.is_duplicate:
9798
cached_json = await idempotency_manager.get_cached_json(
9899
event=pseudo_event,
99-
key_strategy="custom",
100+
key_strategy=KeyStrategy.CUSTOM,
100101
custom_key=f"http:{current_user.user_id}:{idempotency_key}",
101102
)
102103
return ExecutionResponse.model_validate_json(cached_json)
@@ -119,7 +120,7 @@ async def create_execution(
119120
await idempotency_manager.mark_completed_with_json(
120121
event=pseudo_event,
121122
cached_json=response_model.model_dump_json(),
122-
key_strategy="custom",
123+
key_strategy=KeyStrategy.CUSTOM,
123124
custom_key=f"http:{current_user.user_id}:{idempotency_key}",
124125
)
125126

@@ -131,7 +132,7 @@ async def create_execution(
131132
await idempotency_manager.mark_failed(
132133
event=pseudo_event,
133134
error=str(e),
134-
key_strategy="custom",
135+
key_strategy=KeyStrategy.CUSTOM,
135136
custom_key=f"http:{current_user.user_id}:{idempotency_key}",
136137
)
137138
raise
@@ -141,7 +142,7 @@ async def create_execution(
141142
await idempotency_manager.mark_failed(
142143
event=pseudo_event,
143144
error=str(e),
144-
key_strategy="custom",
145+
key_strategy=KeyStrategy.CUSTOM,
145146
custom_key=f"http:{current_user.user_id}:{idempotency_key}",
146147
)
147148
raise HTTPException(status_code=500, detail="Internal server error during script execution") from e

backend/app/core/adaptive_sampling.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import threading
33
import time
44
from collections import deque
5-
from typing import Sequence, Tuple
5+
from collections.abc import Sequence
66

77
from opentelemetry.context import Context
88
from opentelemetry.sdk.trace.sampling import Decision, Sampler, SamplingResult
@@ -170,7 +170,7 @@ def _is_error(self, attributes: Attributes | None) -> bool:
170170

171171
return False
172172

173-
def _calculate_metrics(self) -> Tuple[float, int]:
173+
def _calculate_metrics(self) -> tuple[float, int]:
174174
"""Calculate current error rate and request rate"""
175175
now = time.time()
176176
minute_ago = now - 60

backend/app/core/correlation.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import uuid
22
from datetime import datetime, timezone
3-
from typing import Any, Dict
3+
from typing import Any
44

55
from starlette.datastructures import MutableHeaders
66
from starlette.types import ASGIApp, Message, Receive, Scope, Send
@@ -23,11 +23,11 @@ def get_correlation_id() -> str:
2323
return correlation_id_context.get() or ""
2424

2525
@staticmethod
26-
def set_request_metadata(metadata: Dict[str, Any]) -> None:
26+
def set_request_metadata(metadata: dict[str, Any]) -> None:
2727
request_metadata_context.set(metadata)
2828

2929
@staticmethod
30-
def get_request_metadata() -> Dict[str, Any]:
30+
def get_request_metadata() -> dict[str, Any]:
3131
return request_metadata_context.get() or {}
3232

3333
@staticmethod

backend/app/core/dishka_lifespan.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import asyncio
24
import logging
35
from contextlib import AsyncExitStack, asynccontextmanager

backend/app/core/logging.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
import logging
44
import re
55
from datetime import datetime, timezone
6-
from typing import Any, Dict
6+
from typing import Any
77

88
from opentelemetry import trace
99

1010
correlation_id_context: contextvars.ContextVar[str | None] = contextvars.ContextVar("correlation_id", default=None)
1111

12-
request_metadata_context: contextvars.ContextVar[Dict[str, Any] | None] = contextvars.ContextVar(
12+
request_metadata_context: contextvars.ContextVar[dict[str, Any] | None] = contextvars.ContextVar(
1313
"request_metadata", default=None
1414
)
1515

backend/app/core/metrics/rate_limit.py

Lines changed: 17 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -5,103 +5,40 @@ class RateLimitMetrics(BaseMetrics):
55
"""Metrics for rate limiting system."""
66

77
def _create_instruments(self) -> None:
8-
# Request metrics
9-
self.requests_total = self._meter.create_counter(
8+
self._requests = self._meter.create_counter(
109
name="rate_limit.requests.total",
1110
description="Total number of rate limit checks",
1211
unit="1",
1312
)
14-
self.allowed = self._meter.create_counter(
13+
self._allowed = self._meter.create_counter(
1514
name="rate_limit.allowed.total",
1615
description="Number of allowed requests",
1716
unit="1",
1817
)
19-
self.rejected = self._meter.create_counter(
18+
self._rejected = self._meter.create_counter(
2019
name="rate_limit.rejected.total",
2120
description="Number of rejected requests",
2221
unit="1",
2322
)
24-
self.bypass = self._meter.create_counter(
23+
self._bypass = self._meter.create_counter(
2524
name="rate_limit.bypass.total",
2625
description="Number of bypassed rate limit checks",
2726
unit="1",
2827
)
2928

30-
# Performance metrics
31-
self.check_duration = self._meter.create_histogram(
32-
name="rate_limit.check.duration",
33-
description="Duration of rate limit checks",
34-
unit="ms",
35-
)
36-
self.redis_duration = self._meter.create_histogram(
37-
name="rate_limit.redis.duration",
38-
description="Duration of Redis operations",
39-
unit="ms",
40-
)
41-
self.algorithm_duration = self._meter.create_histogram(
42-
name="rate_limit.algorithm.duration",
43-
description="Time to execute rate limit algorithm",
44-
unit="ms",
45-
)
46-
47-
# Usage metrics
48-
self.remaining = self._meter.create_histogram(
49-
name="rate_limit.remaining",
50-
description="Remaining requests in rate limit window",
51-
unit="1",
52-
)
53-
self.quota_usage = self._meter.create_histogram(
54-
name="rate_limit.quota.usage",
55-
description="Percentage of quota used",
56-
unit="%",
57-
)
58-
self.window_size = self._meter.create_histogram(
59-
name="rate_limit.window.size",
60-
description="Size of rate limit window",
61-
unit="s",
62-
)
63-
64-
# Configuration metrics - using histograms to record absolute values
65-
# We record the current value, and Grafana queries the latest
66-
self.active_rules = self._meter.create_histogram(
67-
name="rate_limit.active.rules",
68-
description="Number of active rate limit rules",
69-
unit="1",
70-
)
71-
self.custom_users = self._meter.create_histogram(
72-
name="rate_limit.custom.users",
73-
description="Number of users with custom rate limits",
74-
unit="1",
75-
)
76-
self.bypass_users = self._meter.create_histogram(
77-
name="rate_limit.bypass.users",
78-
description="Number of users with rate limit bypass",
79-
unit="1",
80-
)
29+
def record_request(self, endpoint: str, authenticated: bool, algorithm: str) -> None:
30+
"""Record a rate limit check request."""
31+
attrs = {"endpoint": endpoint, "authenticated": str(authenticated).lower(), "algorithm": algorithm}
32+
self._requests.add(1, attrs)
8133

82-
# Token bucket specific metrics
83-
self.token_bucket_tokens = self._meter.create_histogram(
84-
name="rate_limit.token_bucket.tokens",
85-
description="Current tokens in token bucket",
86-
unit="1",
87-
)
88-
self.token_bucket_refill_rate = self._meter.create_histogram(
89-
name="rate_limit.token_bucket.refill_rate",
90-
description="Token bucket refill rate",
91-
unit="tokens/s",
92-
)
34+
def record_allowed(self, endpoint: str, group: str) -> None:
35+
"""Record an allowed request."""
36+
self._allowed.add(1, {"endpoint": endpoint, "group": group})
9337

94-
# Error metrics
95-
self.redis_errors = self._meter.create_counter(
96-
name="rate_limit.redis.errors.total",
97-
description="Number of Redis errors",
98-
unit="1",
99-
)
100-
self.config_errors = self._meter.create_counter(
101-
name="rate_limit.config.errors.total",
102-
description="Number of configuration load errors",
103-
unit="1",
104-
)
38+
def record_rejected(self, endpoint: str, group: str) -> None:
39+
"""Record a rejected request."""
40+
self._rejected.add(1, {"endpoint": endpoint, "group": group})
10541

106-
# Authenticated vs anonymous checks can be derived from labels on requests_total
107-
# No separate ip/user counters to avoid duplication and complexity.
42+
def record_bypass(self, endpoint: str) -> None:
43+
"""Record a bypassed rate limit check."""
44+
self._bypass.add(1, {"endpoint": endpoint})

backend/app/core/middlewares/cache.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1-
from typing import Dict
2-
31
from starlette.datastructures import MutableHeaders
42
from starlette.types import ASGIApp, Message, Receive, Scope, Send
53

64

75
class CacheControlMiddleware:
86
def __init__(self, app: ASGIApp):
97
self.app = app
10-
self.cache_policies: Dict[str, str] = {
8+
self.cache_policies: dict[str, str] = {
119
"/api/v1/k8s-limits": "public, max-age=300", # 5 minutes
1210
"/api/v1/example-scripts": "public, max-age=600", # 10 minutes
1311
"/api/v1/auth/verify-token": "private, no-cache", # 30 seconds

backend/app/core/providers.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import logging
24
from typing import AsyncIterator
35

@@ -111,12 +113,12 @@ async def get_redis_client(self, settings: Settings, logger: logging.Logger) ->
111113
socket_timeout=5,
112114
)
113115
# Test connection
114-
await client.ping() # type: ignore[misc] # redis-py dual sync/async return type
116+
await client.ping() # type: ignore[misc] # redis-py returns Awaitable[bool] | bool
115117
logger.info(f"Redis connected: {settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}")
116118
try:
117119
yield client
118120
finally:
119-
await client.aclose()
121+
await client.close()
120122

121123
@provide
122124
def get_rate_limit_service(
@@ -383,7 +385,9 @@ class SSEProvider(Provider):
383385
scope = Scope.APP
384386

385387
@provide
386-
async def get_sse_redis_bus(self, redis_client: redis.Redis, logger: logging.Logger) -> AsyncIterator[SSERedisBus]:
388+
async def get_sse_redis_bus(
389+
self, redis_client: redis.Redis, logger: logging.Logger
390+
) -> AsyncIterator[SSERedisBus]:
387391
bus = SSERedisBus(redis_client, logger)
388392
yield bus
389393

@@ -480,9 +484,10 @@ async def get_user_settings_service(
480484
repository: UserSettingsRepository,
481485
kafka_event_service: KafkaEventService,
482486
event_bus_manager: EventBusManager,
487+
settings: Settings,
483488
logger: logging.Logger,
484489
) -> UserSettingsService:
485-
service = UserSettingsService(repository, kafka_event_service, logger)
490+
service = UserSettingsService(repository, kafka_event_service, settings, logger)
486491
await service.initialize(event_bus_manager)
487492
return service
488493

0 commit comments

Comments
 (0)