Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 103 additions & 33 deletions src/config/tracing.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,97 @@
import { NodeSDK } from "@opentelemetry/sdk-node";
import { getNodeAutoInstrumentations } from "@opentelemetry/auto-instrumentations-node";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
import { JaegerExporter } from "@opentelemetry/exporter-jaeger";
import { resourceFromAttributes } from "@opentelemetry/resources";
import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-base";
import { trace, SpanStatusCode, Span } from "@opentelemetry/api";
import {
BatchSpanProcessor,
SimpleSpanProcessor,
SpanExporter,
} from "@opentelemetry/sdk-trace-base";
import {
trace,
SpanStatusCode,
Span,
context,
propagation,
ROOT_CONTEXT,
} from "@opentelemetry/api";
import { W3CTraceContextPropagator } from "@opentelemetry/core";

// Configure the trace exporter
const traceExporter = new OTLPTraceExporter({
url:
process.env.OTEL_EXPORTER_OTLP_ENDPOINT ||
"http://localhost:4318/v1/traces",
});
const SERVICE_NAME = "alian-structure-api";

function buildExporters(): SpanExporter[] {
const exporters: SpanExporter[] = [];

export const sdk = new NodeSDK({
resource: resourceFromAttributes({
"service.name": "alian-structure-api",
"service.version": process.env.npm_package_version || "1.0.0",
"deployment.environment": process.env.NODE_ENV || "development",
}),
spanProcessor: new BatchSpanProcessor(traceExporter),
instrumentations: [
getNodeAutoInstrumentations({
"@opentelemetry/instrumentation-fs": {
enabled: false,
},
// OTLP exporter (primary — works with Jaeger 1.41+ OTLP endpoint)
exporters.push(
new OTLPTraceExporter({
url:
process.env.OTEL_EXPORTER_OTLP_ENDPOINT ||
"http://localhost:4318/v1/traces",
}),
],
);

// Legacy Jaeger Thrift exporter (for older Jaeger deployments)
if (process.env.JAEGER_AGENT_HOST || process.env.JAEGER_ENDPOINT) {
exporters.push(
new JaegerExporter({
endpoint:
process.env.JAEGER_ENDPOINT ||
`http://${process.env.JAEGER_AGENT_HOST || "localhost"}:14268/api/traces`,
}),
);
}

return exporters;
}

const resource = resourceFromAttributes({
"service.name": SERVICE_NAME,
"service.version": process.env.npm_package_version || "1.0.0",
"deployment.environment": process.env.NODE_ENV || "development",
});

// Start the SDK
export const startTracing = async () => {
let sdk: NodeSDK;

function buildSdk(): NodeSDK {
const exporters = buildExporters();
const processors = exporters.map((exp) => new BatchSpanProcessor(exp));

return new NodeSDK({
resource,
spanProcessors: processors,
textMapPropagator: new W3CTraceContextPropagator(),
instrumentations: [
getNodeAutoInstrumentations({
// fs instrumentation is noisy; disable it
"@opentelemetry/instrumentation-fs": { enabled: false },
// Ensure HTTP instrumentation captures request/response headers
"@opentelemetry/instrumentation-http": {
headersToSpanAttributes: {
server: {
requestHeaders: ["x-request-id", "x-correlation-id"],
responseHeaders: ["content-type"],
},
},
},
}),
],
});
}

export const startTracing = async (): Promise<void> => {
try {
sdk = buildSdk();
sdk.start();
console.log("OpenTelemetry tracing initialized");
} catch (err) {
console.error("Failed to start OpenTelemetry SDK:", err);
}
};

// Graceful shutdown
export const shutdownTracing = async () => {
export const shutdownTracing = async (): Promise<void> => {
if (!sdk) return;
try {
await sdk.shutdown();
console.log("OpenTelemetry tracing shut down");
Expand All @@ -48,18 +100,18 @@ export const shutdownTracing = async () => {
}
};

// Helper to get the tracer
export const getTracer = () => {
return trace.getTracer("alian-structure-api", "1.0.0");
};
export const getTracer = () =>
trace.getTracer(SERVICE_NAME, process.env.npm_package_version || "1.0.0");

// Helper to create a span with automatic error handling
/**
* Execute `fn` inside a new active span, automatically setting OK/ERROR
* status and ending the span when the promise resolves or rejects.
*/
export const createSpan = async <T>(
name: string,
fn: (span: Span) => Promise<T>,
): Promise<T> => {
const tracer = getTracer();
return tracer.startActiveSpan(name, async (span) => {
return getTracer().startActiveSpan(name, async (span) => {
try {
const result = await fn(span);
span.setStatus({ code: SpanStatusCode.OK });
Expand All @@ -79,5 +131,23 @@ export const createSpan = async <T>(
});
};

/**
* Extract trace context from an incoming carrier (HTTP headers, WS handshake
* headers, message metadata, etc.) and return an active context so that child
* spans are correctly parented to the upstream trace.
*/
export const extractContext = (carrier: Record<string, string | string[]>) => {
return propagation.extract(ROOT_CONTEXT, carrier);
};


/**
* Inject the current trace context into an outgoing carrier so downstream
* services can continue the trace.
*/
export const injectContext = (
carrier: Record<string, string>,
ctx = context.active(),
) => {
propagation.inject(ctx, carrier);
return carrier;
};
21 changes: 21 additions & 0 deletions src/dashboard/websocket/dashboard.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
} from "@nestjs/websockets";
import { Server, Socket } from "socket.io";
import { Logger, UseFilters, UsePipes, ValidationPipe } from "@nestjs/common";
import { SpanStatusCode } from "@opentelemetry/api";
import { createSpan, extractContext } from "../../config/tracing";
import { JwtService } from "@nestjs/jwt";
import { ConnectionManagerService } from "./services/connection-manager.service";
import { EventBufferService } from "./services/event-buffer.service";
Expand Down Expand Up @@ -61,6 +63,17 @@ export class DashboardGateway
}

async handleConnection(client: Socket) {
// Extract trace context from handshake headers for distributed tracing
const carrier = client.handshake.headers as Record<string, string>;
await createSpan(`ws.connect ${client.id}`, async (span) => {
span.setAttribute("ws.client_id", client.id);
span.setAttribute("ws.namespace", "/dashboard");
const traceParent = carrier["traceparent"];
if (traceParent) span.setAttribute("ws.traceparent", traceParent);
}).catch(() => {
/* non-fatal */
});

try {
// Authenticate the client
const token = this.extractToken(client);
Expand Down Expand Up @@ -140,6 +153,14 @@ export class DashboardGateway
}

async handleDisconnect(client: Socket) {
createSpan(`ws.disconnect ${client.id}`, async (span) => {
span.setAttribute("ws.client_id", client.id);
span.setAttribute("ws.namespace", "/dashboard");
span.setStatus({ code: SpanStatusCode.OK });
}).catch(() => {
/* non-fatal */
});

const connectionInfo = this.connectionManager.getConnectionInfo(client.id);

if (connectionInfo) {
Expand Down
5 changes: 5 additions & 0 deletions src/observability/observability.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ import { RequestTimingMiddleware } from "./request-timing.middleware";
import { DatabaseTimingInterceptor } from "./database-timing.interceptor";
import { PerformanceBaselineService } from "./performance-baseline.service";
import { ObservabilityController } from "./observability.controller";
import { TracingInterceptor } from "./tracing.interceptor";

@Module({
providers: [
ProfilingService,
RequestTimingMiddleware,
PerformanceBaselineService,
{
provide: APP_INTERCEPTOR,
useClass: TracingInterceptor,
},
{
provide: APP_INTERCEPTOR,
useClass: DatabaseTimingInterceptor,
Expand Down
79 changes: 79 additions & 0 deletions src/observability/tracing.interceptor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import {
Injectable,
NestInterceptor,
ExecutionContext,
CallHandler,
} from "@nestjs/common";
import { Observable, throwError } from "rxjs";
import { tap, catchError } from "rxjs/operators";
import { Request, Response } from "express";
import { SpanStatusCode, trace, context } from "@opentelemetry/api";
import { getTracer, extractContext } from "../config/tracing";

/**
* Intercepts every HTTP request and wraps the handler execution in an
* OpenTelemetry span. Trace context is extracted from incoming headers so
* that distributed traces (coming from a gateway or upstream service) are
* properly continued rather than started fresh.
*/
@Injectable()
export class TracingInterceptor implements NestInterceptor {
intercept(ctx: ExecutionContext, next: CallHandler): Observable<any> {
const httpCtx = ctx.switchToHttp();
const req = httpCtx.getRequest<Request>();
const res = httpCtx.getResponse<Response>();

// Propagate upstream trace context if present
const parentCtx = extractContext(req.headers as Record<string, string>);
const tracer = getTracer();
const spanName = `${req.method} ${ctx.getClass().name}.${ctx.getHandler().name}`;

return new Observable((subscriber) => {
tracer.startActiveSpan(spanName, {}, parentCtx, (span) => {
span.setAttribute("http.method", req.method);
span.setAttribute("http.url", req.url);
span.setAttribute("http.route", req.route?.path ?? req.url);
span.setAttribute("handler.class", ctx.getClass().name);
span.setAttribute("handler.method", ctx.getHandler().name);

const requestId = req.headers["x-request-id"];
if (requestId) {
span.setAttribute(
"request.id",
Array.isArray(requestId) ? requestId[0] : requestId,
);
}

context.with(trace.setSpan(context.active(), span), () => {
next
.handle()
.pipe(
tap(() => {
span.setAttribute("http.status_code", res.statusCode);
span.setStatus({ code: SpanStatusCode.OK });
span.end();
}),
catchError((err: Error) => {
span.recordException(err);
span.setStatus({
code: SpanStatusCode.ERROR,
message: err.message,
});
span.setAttribute(
"http.status_code",
res.statusCode >= 400 ? res.statusCode : 500,
);
span.end();
return throwError(() => err);
}),
)
.subscribe({
next: (v) => subscriber.next(v),
error: (e) => subscriber.error(e),
complete: () => subscriber.complete(),
});
});
});
});
}
}
Loading