Skip to content

Commit 069b6eb

Browse files
kamirclaude
andcommitted
feat: merge LFS proxy into unified proxy as feature-flagged module
Embed LFS (Large File Support) as a module inside the existing Kafka proxy behind KAFSCALE_PROXY_LFS_ENABLED (default: off). When enabled, the produce path detects LFS_BLOB headers, uploads payloads to S3, and replaces record values with JSON envelopes — all before the existing partition-aware fan-out runs. Key design: - Insert between parse and route in handleProduceRouting(); setting payload=nil forces re-encode via the existing fanOut path - Zero overhead when disabled (nil pointer check only) - cmd/lfs-proxy/ stays as-is with deprecation warning - All env vars (KAFSCALE_LFS_PROXY_*) reused unchanged New files in cmd/proxy/: lfs.go - lfsModule struct, initLFSModule(), integration adapter lfs_rewrite.go - rewriteProduceRecords(), batch/record/header helpers lfs_record.go - record encoding, varint helpers lfs_s3.go - s3Uploader (PutObject, multipart, presign) lfs_http.go - HTTP API (/lfs/produce, /lfs/download, /lfs/uploads) lfs_metrics.go - Prometheus metrics lfs_tracker.go - async Kafka-based event log lfs_tracker_types.go - event type definitions lfs_sasl_encode.go - SASL handshake + produce encoding for HTTP path lfs_backend_auth.go - backend TLS wrapping + SASL auth lfs_backend_tls.go - backend TLS config builder lfs_http_tls.go - HTTP server TLS config builder lfs_swagger.go - Swagger UI handler lfs_uuid.go - UUID generation openapi.yaml - embedded OpenAPI spec lfs_test.go - 9 tests (blob detection, passthrough, checksum, mixed records, CRC validation, nil module) Review fixes applied: - lfsDropHeader allocates new slice (no input mutation) - connectBackend uses struct fields, not per-call os.Getenv - forwardToBackend: removed unused addr param, added conn deadline - rewriteProduceRequest returns orphans; caller tracks on produce failure - Test uses errors.As for proper ChecksumError assertion - lfsGetClientIP: bool from strings.Cut named 'found' not 'err' Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent bd854e5 commit 069b6eb

18 files changed

Lines changed: 4892 additions & 1 deletion

cmd/lfs-proxy/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ func main() {
114114
}
115115
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: logLevel}))
116116

117+
logger.Warn("DEPRECATED: standalone lfs-proxy is deprecated; use the unified proxy with KAFSCALE_PROXY_LFS_ENABLED=true instead")
118+
117119
addr := envOrDefault("KAFSCALE_LFS_PROXY_ADDR", defaultProxyAddr)
118120
healthAddr := strings.TrimSpace(os.Getenv("KAFSCALE_LFS_PROXY_HEALTH_ADDR"))
119121
metricsAddr := strings.TrimSpace(os.Getenv("KAFSCALE_LFS_PROXY_METRICS_ADDR"))

0 commit comments

Comments
 (0)