diff --git a/src/config/tracing.ts b/src/config/tracing.ts index 2247c7c..332f1cd 100644 --- a/src/config/tracing.ts +++ b/src/config/tracing.ts @@ -1,36 +1,88 @@ import { NodeSDK } from "@opentelemetry/sdk-node"; import { getNodeAutoInstrumentations } from "@opentelemetry/auto-instrumentations-node"; import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http"; +import { JaegerExporter } from "@opentelemetry/exporter-jaeger"; import { resourceFromAttributes } from "@opentelemetry/resources"; -import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-base"; -import { trace, SpanStatusCode, Span } from "@opentelemetry/api"; +import { + BatchSpanProcessor, + SimpleSpanProcessor, + SpanExporter, +} from "@opentelemetry/sdk-trace-base"; +import { + trace, + SpanStatusCode, + Span, + context, + propagation, + ROOT_CONTEXT, +} from "@opentelemetry/api"; +import { W3CTraceContextPropagator } from "@opentelemetry/core"; -// Configure the trace exporter -const traceExporter = new OTLPTraceExporter({ - url: - process.env.OTEL_EXPORTER_OTLP_ENDPOINT || - "http://localhost:4318/v1/traces", -}); +const SERVICE_NAME = "alian-structure-api"; + +function buildExporters(): SpanExporter[] { + const exporters: SpanExporter[] = []; -export const sdk = new NodeSDK({ - resource: resourceFromAttributes({ - "service.name": "alian-structure-api", - "service.version": process.env.npm_package_version || "1.0.0", - "deployment.environment": process.env.NODE_ENV || "development", - }), - spanProcessor: new BatchSpanProcessor(traceExporter), - instrumentations: [ - getNodeAutoInstrumentations({ - "@opentelemetry/instrumentation-fs": { - enabled: false, - }, + // OTLP exporter (primary — works with Jaeger 1.41+ OTLP endpoint) + exporters.push( + new OTLPTraceExporter({ + url: + process.env.OTEL_EXPORTER_OTLP_ENDPOINT || + "http://localhost:4318/v1/traces", }), - ], + ); + + // Legacy Jaeger Thrift exporter (for older Jaeger deployments) + if (process.env.JAEGER_AGENT_HOST || process.env.JAEGER_ENDPOINT) { + exporters.push( + new JaegerExporter({ + endpoint: + process.env.JAEGER_ENDPOINT || + `http://${process.env.JAEGER_AGENT_HOST || "localhost"}:14268/api/traces`, + }), + ); + } + + return exporters; +} + +const resource = resourceFromAttributes({ + "service.name": SERVICE_NAME, + "service.version": process.env.npm_package_version || "1.0.0", + "deployment.environment": process.env.NODE_ENV || "development", }); -// Start the SDK -export const startTracing = async () => { +let sdk: NodeSDK; + +function buildSdk(): NodeSDK { + const exporters = buildExporters(); + const processors = exporters.map((exp) => new BatchSpanProcessor(exp)); + + return new NodeSDK({ + resource, + spanProcessors: processors, + textMapPropagator: new W3CTraceContextPropagator(), + instrumentations: [ + getNodeAutoInstrumentations({ + // fs instrumentation is noisy; disable it + "@opentelemetry/instrumentation-fs": { enabled: false }, + // Ensure HTTP instrumentation captures request/response headers + "@opentelemetry/instrumentation-http": { + headersToSpanAttributes: { + server: { + requestHeaders: ["x-request-id", "x-correlation-id"], + responseHeaders: ["content-type"], + }, + }, + }, + }), + ], + }); +} + +export const startTracing = async (): Promise => { try { + sdk = buildSdk(); sdk.start(); console.log("OpenTelemetry tracing initialized"); } catch (err) { @@ -38,8 +90,8 @@ export const startTracing = async () => { } }; -// Graceful shutdown -export const shutdownTracing = async () => { +export const shutdownTracing = async (): Promise => { + if (!sdk) return; try { await sdk.shutdown(); console.log("OpenTelemetry tracing shut down"); @@ -48,18 +100,18 @@ export const shutdownTracing = async () => { } }; -// Helper to get the tracer -export const getTracer = () => { - return trace.getTracer("alian-structure-api", "1.0.0"); -}; +export const getTracer = () => + trace.getTracer(SERVICE_NAME, process.env.npm_package_version || "1.0.0"); -// Helper to create a span with automatic error handling +/** + * Execute `fn` inside a new active span, automatically setting OK/ERROR + * status and ending the span when the promise resolves or rejects. + */ export const createSpan = async ( name: string, fn: (span: Span) => Promise, ): Promise => { - const tracer = getTracer(); - return tracer.startActiveSpan(name, async (span) => { + return getTracer().startActiveSpan(name, async (span) => { try { const result = await fn(span); span.setStatus({ code: SpanStatusCode.OK }); @@ -79,5 +131,23 @@ export const createSpan = async ( }); }; +/** + * Extract trace context from an incoming carrier (HTTP headers, WS handshake + * headers, message metadata, etc.) and return an active context so that child + * spans are correctly parented to the upstream trace. + */ +export const extractContext = (carrier: Record) => { + return propagation.extract(ROOT_CONTEXT, carrier); +}; - +/** + * Inject the current trace context into an outgoing carrier so downstream + * services can continue the trace. + */ +export const injectContext = ( + carrier: Record, + ctx = context.active(), +) => { + propagation.inject(ctx, carrier); + return carrier; +}; diff --git a/src/dashboard/websocket/dashboard.gateway.ts b/src/dashboard/websocket/dashboard.gateway.ts index 3b4ef23..9b2dd1a 100644 --- a/src/dashboard/websocket/dashboard.gateway.ts +++ b/src/dashboard/websocket/dashboard.gateway.ts @@ -11,6 +11,8 @@ import { } from "@nestjs/websockets"; import { Server, Socket } from "socket.io"; import { Logger, UseFilters, UsePipes, ValidationPipe } from "@nestjs/common"; +import { SpanStatusCode } from "@opentelemetry/api"; +import { createSpan, extractContext } from "../../config/tracing"; import { JwtService } from "@nestjs/jwt"; import { ConnectionManagerService } from "./services/connection-manager.service"; import { EventBufferService } from "./services/event-buffer.service"; @@ -61,6 +63,17 @@ export class DashboardGateway } async handleConnection(client: Socket) { + // Extract trace context from handshake headers for distributed tracing + const carrier = client.handshake.headers as Record; + await createSpan(`ws.connect ${client.id}`, async (span) => { + span.setAttribute("ws.client_id", client.id); + span.setAttribute("ws.namespace", "/dashboard"); + const traceParent = carrier["traceparent"]; + if (traceParent) span.setAttribute("ws.traceparent", traceParent); + }).catch(() => { + /* non-fatal */ + }); + try { // Authenticate the client const token = this.extractToken(client); @@ -140,6 +153,14 @@ export class DashboardGateway } async handleDisconnect(client: Socket) { + createSpan(`ws.disconnect ${client.id}`, async (span) => { + span.setAttribute("ws.client_id", client.id); + span.setAttribute("ws.namespace", "/dashboard"); + span.setStatus({ code: SpanStatusCode.OK }); + }).catch(() => { + /* non-fatal */ + }); + const connectionInfo = this.connectionManager.getConnectionInfo(client.id); if (connectionInfo) { diff --git a/src/observability/observability.module.ts b/src/observability/observability.module.ts index 31411b0..c56687f 100644 --- a/src/observability/observability.module.ts +++ b/src/observability/observability.module.ts @@ -6,12 +6,17 @@ import { RequestTimingMiddleware } from "./request-timing.middleware"; import { DatabaseTimingInterceptor } from "./database-timing.interceptor"; import { PerformanceBaselineService } from "./performance-baseline.service"; import { ObservabilityController } from "./observability.controller"; +import { TracingInterceptor } from "./tracing.interceptor"; @Module({ providers: [ ProfilingService, RequestTimingMiddleware, PerformanceBaselineService, + { + provide: APP_INTERCEPTOR, + useClass: TracingInterceptor, + }, { provide: APP_INTERCEPTOR, useClass: DatabaseTimingInterceptor, diff --git a/src/observability/tracing.interceptor.ts b/src/observability/tracing.interceptor.ts new file mode 100644 index 0000000..0a2a448 --- /dev/null +++ b/src/observability/tracing.interceptor.ts @@ -0,0 +1,79 @@ +import { + Injectable, + NestInterceptor, + ExecutionContext, + CallHandler, +} from "@nestjs/common"; +import { Observable, throwError } from "rxjs"; +import { tap, catchError } from "rxjs/operators"; +import { Request, Response } from "express"; +import { SpanStatusCode, trace, context } from "@opentelemetry/api"; +import { getTracer, extractContext } from "../config/tracing"; + +/** + * Intercepts every HTTP request and wraps the handler execution in an + * OpenTelemetry span. Trace context is extracted from incoming headers so + * that distributed traces (coming from a gateway or upstream service) are + * properly continued rather than started fresh. + */ +@Injectable() +export class TracingInterceptor implements NestInterceptor { + intercept(ctx: ExecutionContext, next: CallHandler): Observable { + const httpCtx = ctx.switchToHttp(); + const req = httpCtx.getRequest(); + const res = httpCtx.getResponse(); + + // Propagate upstream trace context if present + const parentCtx = extractContext(req.headers as Record); + const tracer = getTracer(); + const spanName = `${req.method} ${ctx.getClass().name}.${ctx.getHandler().name}`; + + return new Observable((subscriber) => { + tracer.startActiveSpan(spanName, {}, parentCtx, (span) => { + span.setAttribute("http.method", req.method); + span.setAttribute("http.url", req.url); + span.setAttribute("http.route", req.route?.path ?? req.url); + span.setAttribute("handler.class", ctx.getClass().name); + span.setAttribute("handler.method", ctx.getHandler().name); + + const requestId = req.headers["x-request-id"]; + if (requestId) { + span.setAttribute( + "request.id", + Array.isArray(requestId) ? requestId[0] : requestId, + ); + } + + context.with(trace.setSpan(context.active(), span), () => { + next + .handle() + .pipe( + tap(() => { + span.setAttribute("http.status_code", res.statusCode); + span.setStatus({ code: SpanStatusCode.OK }); + span.end(); + }), + catchError((err: Error) => { + span.recordException(err); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err.message, + }); + span.setAttribute( + "http.status_code", + res.statusCode >= 400 ? res.statusCode : 500, + ); + span.end(); + return throwError(() => err); + }), + ) + .subscribe({ + next: (v) => subscriber.next(v), + error: (e) => subscriber.error(e), + complete: () => subscriber.complete(), + }); + }); + }); + }); + } +}