Skip to content

Commit 97eb9d4

Browse files
fix: wip
1 parent 4f35495 commit 97eb9d4

8 files changed

Lines changed: 520 additions & 9 deletions

File tree

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ dependencies = [
2222
"redis[hiredis]>=5.0.0",
2323
# Memory monitoring
2424
"psutil>=5.9.0",
25+
# Prometheus metrics
26+
"prometheus-client>=0.20.0",
2527
]
2628

2729
[project.optional-dependencies]

s3proxy/app.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
import structlog
1313
from fastapi import FastAPI, HTTPException, Request, Response
1414
from fastapi.responses import PlainTextResponse
15+
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
1516
from structlog.stdlib import BoundLogger
1617

18+
from . import metrics as _ # noqa: F401 - Import to register metrics
1719
from .config import Settings
1820
from .errors import S3Error, get_s3_error_code
1921
from .handlers import S3ProxyHandler
@@ -151,6 +153,10 @@ def _register_routes(app: FastAPI) -> None:
151153
async def health():
152154
return PlainTextResponse("ok")
153155

156+
@app.get("/metrics")
157+
async def metrics():
158+
return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST)
159+
154160
@app.api_route(
155161
"/{path:path}",
156162
methods=["GET", "PUT", "POST", "DELETE", "HEAD"],

s3proxy/concurrency.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import structlog
1313

1414
from s3proxy.errors import S3Error
15+
from s3proxy.metrics import MEMORY_LIMIT_BYTES, MEMORY_REJECTIONS, MEMORY_RESERVED_BYTES
1516

1617
logger = structlog.get_logger(__name__)
1718

@@ -25,6 +26,9 @@
2526
_active_bytes = 0
2627
_lock: asyncio.Lock | None = None
2728

29+
# Initialize memory limit metric
30+
MEMORY_LIMIT_BYTES.set(_limit_bytes)
31+
2832

2933
def _create_malloc_release() -> Callable[[], int] | None:
3034
"""Create platform-specific function to release memory back to OS.
@@ -91,10 +95,12 @@ async def try_acquire_memory(bytes_needed: int) -> int:
9195
limit_mb = _limit_bytes / 1024 / 1024
9296
logger.warning("MEMORY_REJECTED", active_mb=round(active_mb, 2),
9397
requested_mb=round(request_mb, 2), limit_mb=round(limit_mb, 2))
98+
MEMORY_REJECTIONS.inc()
9499
raise S3Error.slow_down(
95100
f"Memory limit: {active_mb:.0f}MB + {request_mb:.0f}MB > {limit_mb:.0f}MB"
96101
)
97102
_active_bytes += to_reserve
103+
MEMORY_RESERVED_BYTES.set(_active_bytes)
98104
return to_reserve
99105

100106

@@ -108,6 +114,7 @@ async def release_memory(bytes_reserved: int) -> None:
108114
lock = await _get_lock()
109115
async with lock:
110116
_active_bytes = max(0, _active_bytes - bytes_reserved)
117+
MEMORY_RESERVED_BYTES.set(_active_bytes)
111118

112119
# Run garbage collection and release memory to OS
113120
gc.collect(0)
@@ -136,6 +143,7 @@ def set_memory_limit(limit_mb: int) -> None:
136143
global _limit_mb, _limit_bytes
137144
_limit_mb = limit_mb
138145
_limit_bytes = limit_mb * 1024 * 1024
146+
MEMORY_LIMIT_BYTES.set(_limit_bytes)
139147

140148

141149
def set_active_memory(bytes_val: int) -> None:

s3proxy/crypto.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
)
1616
from structlog import BoundLogger
1717

18+
from .metrics import BYTES_DECRYPTED, BYTES_ENCRYPTED, ENCRYPTION_OPERATIONS
19+
1820
logger: BoundLogger = structlog.get_logger(__name__)
1921

2022
# Constants
@@ -187,6 +189,11 @@ def encrypt(plaintext: bytes, dek: bytes, nonce: bytes | None = None) -> bytes:
187189
aesgcm = AESGCM(dek)
188190
ciphertext_with_tag = aesgcm.encrypt(nonce, plaintext, None)
189191
result = nonce + ciphertext_with_tag
192+
193+
# Track metrics
194+
ENCRYPTION_OPERATIONS.labels(operation="encrypt").inc()
195+
BYTES_ENCRYPTED.inc(len(plaintext))
196+
190197
logger.debug(
191198
"Data encrypted",
192199
plaintext_size=len(plaintext),
@@ -222,6 +229,11 @@ def decrypt(ciphertext: bytes, dek: bytes) -> bytes:
222229
try:
223230
aesgcm = AESGCM(dek)
224231
plaintext = aesgcm.decrypt(nonce, ct_with_tag, None)
232+
233+
# Track metrics
234+
ENCRYPTION_OPERATIONS.labels(operation="decrypt").inc()
235+
BYTES_DECRYPTED.inc(len(plaintext))
236+
225237
logger.debug(
226238
"Data decrypted",
227239
ciphertext_size=len(ciphertext),

s3proxy/metrics.py

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
"""Prometheus metrics for S3Proxy."""
2+
3+
from __future__ import annotations
4+
5+
from prometheus_client import Counter, Gauge, Histogram
6+
7+
# Request metrics
8+
REQUEST_COUNT = Counter(
9+
"s3proxy_requests_total",
10+
"Total number of requests",
11+
["method", "operation", "status"],
12+
)
13+
14+
REQUEST_DURATION = Histogram(
15+
"s3proxy_request_duration_seconds",
16+
"Request duration in seconds",
17+
["method", "operation"],
18+
buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0),
19+
)
20+
21+
REQUESTS_IN_FLIGHT = Gauge(
22+
"s3proxy_requests_in_flight",
23+
"Number of requests currently being processed",
24+
["method"],
25+
)
26+
27+
# Memory/Concurrency metrics
28+
MEMORY_RESERVED_BYTES = Gauge(
29+
"s3proxy_memory_reserved_bytes",
30+
"Currently reserved memory in bytes",
31+
)
32+
33+
MEMORY_LIMIT_BYTES = Gauge(
34+
"s3proxy_memory_limit_bytes",
35+
"Configured memory limit in bytes",
36+
)
37+
38+
MEMORY_REJECTIONS = Counter(
39+
"s3proxy_memory_rejections_total",
40+
"Total number of requests rejected due to memory limits",
41+
)
42+
43+
# Encryption metrics
44+
ENCRYPTION_OPERATIONS = Counter(
45+
"s3proxy_encryption_operations_total",
46+
"Total number of encryption/decryption operations",
47+
["operation"],
48+
)
49+
50+
BYTES_ENCRYPTED = Counter(
51+
"s3proxy_bytes_encrypted_total",
52+
"Total bytes encrypted",
53+
)
54+
55+
BYTES_DECRYPTED = Counter(
56+
"s3proxy_bytes_decrypted_total",
57+
"Total bytes decrypted",
58+
)
59+
60+
61+
def get_operation_name(method: str, path: str, query: str) -> str:
62+
"""Derive S3 operation name from request attributes.
63+
64+
Args:
65+
method: HTTP method (GET, PUT, POST, DELETE, HEAD).
66+
path: Request path.
67+
query: Query string.
68+
69+
Returns:
70+
S3 operation name for metrics labeling.
71+
"""
72+
is_bucket_only = "/" not in path.strip("/") and bool(path.strip("/"))
73+
is_root = path.strip("/") == ""
74+
75+
# Root path
76+
if is_root:
77+
return "ListBuckets"
78+
79+
# Batch delete
80+
if "delete" in query and method == "POST":
81+
return "DeleteObjects"
82+
83+
# Multipart operations
84+
if "uploadId" in query:
85+
if method == "GET" and "partNumber" not in query:
86+
return "ListParts"
87+
if method == "PUT":
88+
if "x-amz-copy-source" in query:
89+
return "UploadPartCopy"
90+
return "UploadPart"
91+
if method == "POST":
92+
return "CompleteMultipartUpload"
93+
if method == "DELETE":
94+
return "AbortMultipartUpload"
95+
96+
# List/Create multipart uploads
97+
if "uploads" in query:
98+
if method == "GET":
99+
return "ListMultipartUploads"
100+
if method == "POST":
101+
return "CreateMultipartUpload"
102+
103+
# Bucket operations
104+
if is_bucket_only:
105+
if "location" in query and method == "GET":
106+
return "GetBucketLocation"
107+
if method == "PUT":
108+
return "CreateBucket"
109+
if method == "DELETE":
110+
return "DeleteBucket"
111+
if method == "HEAD":
112+
return "HeadBucket"
113+
if method == "GET":
114+
return "ListObjects"
115+
116+
# Object tagging
117+
if "tagging" in query:
118+
if method == "GET":
119+
return "GetObjectTagging"
120+
if method == "PUT":
121+
return "PutObjectTagging"
122+
if method == "DELETE":
123+
return "DeleteObjectTagging"
124+
125+
# Standard object operations
126+
if method == "GET":
127+
return "GetObject"
128+
if method == "PUT":
129+
return "PutObject"
130+
if method == "HEAD":
131+
return "HeadObject"
132+
if method == "DELETE":
133+
return "DeleteObject"
134+
135+
return "Unknown"

s3proxy/request_handler.py

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import os
6+
import time
67
from urllib.parse import parse_qs
78

89
import structlog
@@ -12,6 +13,12 @@
1213
from structlog.stdlib import BoundLogger
1314

1415
from . import concurrency, crypto
16+
from .metrics import (
17+
REQUEST_COUNT,
18+
REQUEST_DURATION,
19+
REQUESTS_IN_FLIGHT,
20+
get_operation_name,
21+
)
1522
from .errors import S3Error, raise_for_client_error, raise_for_exception
1623
from .handlers import S3ProxyHandler
1724
from .routing import RequestDispatcher
@@ -68,22 +75,32 @@ async def handle_proxy_request(
6875
Raises:
6976
S3Error: For authentication failures or S3-compatible errors.
7077
"""
78+
# Track metrics
79+
method = request.method
80+
path = request.url.path
81+
query = str(request.url.query)
82+
operation = get_operation_name(method, path, query)
83+
start_time = time.perf_counter()
84+
status_code = 200
85+
86+
REQUESTS_IN_FLIGHT.labels(method=method).inc()
87+
7188
# Check memory limit BEFORE reading body data - reject if at capacity
7289
reserved_memory = 0
73-
needs_limit = request.method in ("PUT", "POST", "GET")
90+
needs_limit = method in ("PUT", "POST", "GET")
7491
memory_limit = concurrency.get_memory_limit()
7592

7693
if memory_limit > 0 and needs_limit:
7794
content_length = int(request.headers.get("content-length", "0"))
78-
memory_needed = concurrency.estimate_memory_footprint(request.method, content_length)
95+
memory_needed = concurrency.estimate_memory_footprint(method, content_length)
7996

8097
logger.info(
8198
"REQUEST_ARRIVED - attempting to acquire memory",
8299
memory_needed_mb=round(memory_needed / 1024 / 1024, 2),
83100
active_mb=round(concurrency.get_active_memory() / 1024 / 1024, 2),
84101
limit_mb=round(memory_limit / 1024 / 1024, 2),
85-
method=request.method,
86-
path=request.url.path,
102+
method=method,
103+
path=path,
87104
content_length=content_length,
88105
)
89106
reserved_memory = await concurrency.try_acquire_memory(memory_needed)
@@ -92,22 +109,37 @@ async def handle_proxy_request(
92109
reserved_mb=round(reserved_memory / 1024 / 1024, 2),
93110
active_mb=round(concurrency.get_active_memory() / 1024 / 1024, 2),
94111
limit_mb=round(memory_limit / 1024 / 1024, 2),
95-
method=request.method,
96-
path=request.url.path,
112+
method=method,
113+
path=path,
97114
)
98115

99116
try:
100-
return await _handle_proxy_request_impl(request, handler, verifier)
117+
response = await _handle_proxy_request_impl(request, handler, verifier)
118+
if response is not None:
119+
status_code = response.status_code
120+
return response
121+
except HTTPException as e:
122+
status_code = e.status_code
123+
raise
124+
except Exception:
125+
status_code = 500
126+
raise
101127
finally:
128+
# Record metrics
129+
duration = time.perf_counter() - start_time
130+
REQUESTS_IN_FLIGHT.labels(method=method).dec()
131+
REQUEST_COUNT.labels(method=method, operation=operation, status=status_code).inc()
132+
REQUEST_DURATION.labels(method=method, operation=operation).observe(duration)
133+
102134
if reserved_memory > 0:
103135
await concurrency.release_memory(reserved_memory)
104136
logger.info(
105137
"MEMORY_RELEASED",
106138
released_mb=round(reserved_memory / 1024 / 1024, 2),
107139
active_mb=round(concurrency.get_active_memory() / 1024 / 1024, 2),
108140
limit_mb=round(memory_limit / 1024 / 1024, 2),
109-
method=request.method,
110-
path=request.url.path,
141+
method=method,
142+
path=path,
111143
)
112144

113145

0 commit comments

Comments
 (0)