Skip to content

Commit 09953a7

Browse files
committed
feat: use SQS to send spans
1 parent 2c4b9fd commit 09953a7

13 files changed

Lines changed: 191 additions & 82 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ Once you're done, it will display the URL for your new dashboard.
3434
### Auto-tracing
3535

3636
Once installed, all Lambda functions in your AWS account will automatically be traced,
37-
by adding a Lambda layer to them. You'll also notice a new environment variable called `AUTO_TRACE_HOST`.
37+
by adding a Lambda layer to them. You'll also notice a new environment variable called `AUTO_TRACE_QUEUE_URL`.
3838

3939
If you wish to disable tracing for a specific function, you can add the environment variable `AUTO_TRACE_EXCLUDE`
4040
with a value of `1`. This will keep the Lambda layer from being added.

packages/api/serverless.yml

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ provider:
5454
LAMBDA_LAYER_ARN: !Ref TracerLambdaLayer
5555
AUTO_TRACE_EXCLUDE: 1
5656
STORAGE_BUCKET_NAME: ${self:custom.bucketName}
57+
QUEUE_URL: !Ref Queue
58+
QUEUE_REGION: ${aws:region}
5759
httpApi:
5860
payload: "2.0"
5961
cors: true
@@ -93,7 +95,6 @@ provider:
9395
Action:
9496
- s3:GetObject
9597
- s3:PutObject
96-
- s3:DeleteObject
9798
- s3:ListBucket
9899
Resource:
99100
- arn:aws:s3:::${self:custom.bucketName}
@@ -111,12 +112,21 @@ layers:
111112
- nodejs22.x
112113

113114
functions:
115+
collector:
116+
handler: src/events/collector.handler
117+
timeout: 300
118+
memorySize: 2048
119+
reservedConcurrency: 1
120+
events:
121+
- sqs:
122+
arn: !GetAtt Queue.Arn
123+
batchSize: 100
124+
maximumBatchingWindow: 30
125+
114126
main:
115127
handler: src/index.handler
116128
timeout: 900
117129
memorySize: 1024
118-
reservedConcurrency: 1
119-
url: true
120130
events:
121131
- httpApi:
122132
path: /{proxy+}
@@ -127,5 +137,6 @@ functions:
127137
action: auto-trace
128138

129139
resources:
130-
- ${file(./src/infrastructure/database/table.yml)}
131-
- ${file(./src/infrastructure/bucket.yml)}
140+
- ${file(./src/infrastructure/table.yml)}
141+
- ${file(./src/infrastructure/bucket.yml)}
142+
- ${file(./src/infrastructure/queue.yml)}

packages/api/src/events/auto-trace.js

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import {
22
GetFunctionCommand,
3-
GetFunctionUrlConfigCommand,
43
LambdaClient,
54
ListFunctionsCommand,
65
UpdateFunctionConfigurationCommand,
@@ -11,7 +10,12 @@ import { acquireLock, releaseLock } from "../lib/locks";
1110
import Logger from "../lib/logger";
1211
import { put } from "../lib/database";
1312

14-
const supportedRuntimes = ["nodejs16.x", "nodejs18.x", "nodejs20.x", "nodejs22.x"];
13+
const supportedRuntimes = [
14+
"nodejs16.x",
15+
"nodejs18.x",
16+
"nodejs20.x",
17+
"nodejs22.x",
18+
];
1519
const lambdaExecWrapper = "/opt/nodejs/tracer_wrapper";
1620

1721
const logger = new Logger("auto-trace");
@@ -36,28 +40,7 @@ const getAccountLambdas = async () => {
3640
return lambdas;
3741
};
3842

39-
const getEdgeEndpoint = async (lambdas) => {
40-
const lambdaName = `${process.env.SERVICE}-${process.env.STAGE}-main`;
41-
42-
try {
43-
const command = new GetFunctionUrlConfigCommand({
44-
FunctionName: lambdaName,
45-
});
46-
47-
const res = await new LambdaClient().send(command);
48-
49-
return res.FunctionUrl?.replace("https://", "").replace("/", "");
50-
} catch (e) {
51-
if (e.name === "ResourceNotFoundException") {
52-
throw new Error(
53-
`Lambda ${lambdaName} not found - could not determine edge endpoint.`,
54-
);
55-
}
56-
throw e;
57-
}
58-
};
59-
60-
const updateLambda = async (lambda, arnBase, edgeEndpoint) => {
43+
const updateLambda = async (lambda, arnBase, queue, queueRegion) => {
6144
const command = new UpdateFunctionConfigurationCommand({
6245
FunctionName: lambda.FunctionName,
6346
Layers: [
@@ -70,7 +53,8 @@ const updateLambda = async (lambda, arnBase, edgeEndpoint) => {
7053
...(lambda.Environment || {}),
7154
Variables: {
7255
...(lambda.Environment?.Variables || {}),
73-
AUTO_TRACE_HOST: edgeEndpoint,
56+
AUTO_TRACE_QUEUE_URL: queue,
57+
AUTO_TRACE_QUEUE_REGION: queueRegion,
7458
AWS_LAMBDA_EXEC_WRAPPER: lambdaExecWrapper,
7559
},
7660
},
@@ -127,7 +111,11 @@ export const autoTrace = async () => {
127111
}
128112

129113
// Get our Lambda URL endpoint for the collector
130-
const edgeEndpoint = await getEdgeEndpoint();
114+
const queue = process.env.QUEUE_URL;
115+
const queueRegion = process.env.QUEUE_REGION || "eu-west-1";
116+
if (!queue) {
117+
throw new Error("QUEUE_URL is not defined");
118+
}
131119

132120
// Make sure we lock so that only one process is updating lambdas
133121
const lockAcquired = await acquireLock("auto-trace");
@@ -137,7 +125,7 @@ export const autoTrace = async () => {
137125
}
138126

139127
// List all the lambda functions in the AWS account
140-
const lambdas = await getAccountLambdas();
128+
let lambdas = await getAccountLambdas();
141129
logger.info(`Found ${lambdas.length} lambdas in the account`);
142130

143131
// Update qualifying lambdas
@@ -151,7 +139,7 @@ export const autoTrace = async () => {
151139
const isTraceStack = envVars.LAMBDA_LAYER_ARN === arn;
152140
const isUpdating = lambda.LastUpdateStatus === "InProgress";
153141
const hasDisableEnvVar = envVars.AUTO_TRACE_EXCLUDE;
154-
const hasWrongEndpoint = envVars.AUTO_TRACE_HOST !== edgeEndpoint;
142+
const hasWrongEndpoint = envVars.AUTO_TRACE_QUEUE_URL !== queue;
155143
const hasOtherWrapper =
156144
envVars.AWS_LAMBDA_EXEC_WRAPPER &&
157145
envVars.AWS_LAMBDA_EXEC_WRAPPER !== lambdaExecWrapper;
@@ -192,7 +180,7 @@ export const autoTrace = async () => {
192180
}
193181

194182
try {
195-
await updateLambda(lambda, arnBase, edgeEndpoint);
183+
await updateLambda(lambda, arnBase, queue, queueRegion);
196184

197185
logger.info(`✓ Updated ${lambda.FunctionName}`);
198186
await saveFunctionInfo(lambda, "enabled");

packages/api/src/routes/collector/index.js renamed to packages/api/src/events/collector.js

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
1-
import { Hono } from "hono";
21
import { defULID } from "@thi.ng/ksuid";
32

4-
import { saveHourlyStat } from "../../lib/stats";
5-
import { getErrorKey } from "../../lib/errors";
6-
import { saveInvocation } from "../../lib/invocations";
7-
import { store } from "../../lib/storage";
8-
9-
const app = new Hono();
3+
import { saveHourlyStat } from "../lib/stats";
4+
import { getErrorKey } from "../lib/errors";
5+
import { saveInvocation } from "../lib/invocations";
6+
import { store } from "../lib/storage";
107

118
const id = defULID();
129
const getId = () => {
@@ -17,19 +14,22 @@ const getId = () => {
1714
// This is used to avoid sending the transaction to the database before we know the invocation ID
1815
const transactionCache = {};
1916

20-
app.post("/", async (c) => {
21-
const body = await c.req.json();
17+
export const handler = async ({ Records }) => {
18+
// Read all of the messages off the queue
19+
for (const record of Records) {
20+
const body = JSON.parse(JSON.parse(record.body));
2221

23-
for (const span of body) {
24-
if (process.env.TRACER_TOKEN && span.token !== process.env.TRACER_TOKEN) {
25-
console.log(`Invalid token: ${span.token}`);
26-
continue;
27-
}
22+
for (const span of body) {
23+
if (process.env.TRACER_TOKEN && span.token !== process.env.TRACER_TOKEN) {
24+
console.log(`Invalid token: ${span.token}`);
25+
continue;
26+
}
2827

29-
const groupKey = span.transactionId || span.transaction_id;
30-
if (!transactionCache[groupKey]) transactionCache[groupKey] = [];
28+
const groupKey = span.transactionId || span.transaction_id;
29+
if (!transactionCache[groupKey]) transactionCache[groupKey] = [];
3130

32-
transactionCache[groupKey].push(span);
31+
transactionCache[groupKey].push(span);
32+
}
3333
}
3434

3535
// Check transactions cache to see if there's any transactions we can flush
@@ -108,8 +108,4 @@ app.post("/", async (c) => {
108108
// Delete the transaction from the cache
109109
delete transactionCache[transactionId];
110110
}
111-
112-
return c.json({ success: true });
113-
});
114-
115-
export default app;
111+
};

packages/api/src/index.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { handle } from "hono/aws-lambda";
44
import { secureHeaders } from "hono/secure-headers";
55
import { serveStatic } from "@hono/node-server/serve-static";
66

7-
import collectorRoute from "./routes/collector";
87
import exploreRoute from "./routes/explore";
98
import autoTraceRoute from "./routes/auto-trace";
109
import authRoute from "./routes/auth";
@@ -17,7 +16,6 @@ const app = new Hono();
1716
app.use(secureHeaders());
1817

1918
app.route("/api/auth", authRoute);
20-
app.route("/api/spans", collectorRoute);
2119
app.route("/api/auto-trace", autoTraceRoute);
2220

2321
app.use("/api/explore/*", auth);
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
Resources:
2+
Queue:
3+
Type: AWS::SQS::Queue
4+
Properties:
5+
QueueName: ${self:service}-queue-${sls:stage}
6+
VisibilityTimeout: 300
7+
MessageRetentionPeriod: 1209600
8+
QueuePolicy:
9+
Type: AWS::SQS::QueuePolicy
10+
Properties:
11+
Queues:
12+
- !Ref Queue
13+
PolicyDocument:
14+
Version: "2012-10-17"
15+
Statement:
16+
- Effect: Allow
17+
Principal:
18+
AWS: "*"
19+
Action: sqs:SendMessage
20+
Resource: !GetAtt Queue.Arn
21+
Condition:
22+
StringEquals:
23+
aws:PrincipalAccount: ${aws:accountId}
File renamed without changes.

packages/api/src/lib/spans.js

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,33 +41,20 @@ const removeGroupingKeys = (spans) => {
4141
const simplifySpans = (spans) => {
4242
return spans.map((span) => {
4343
const {
44-
info,
45-
vendor,
46-
version,
47-
runtime,
4844
token,
4945
region,
5046
type,
5147
transactionId,
5248
isMetadata,
53-
memoryAllocated,
54-
readiness,
5549
messageVersion,
5650
account,
5751
invokedArn,
5852
invokedVersion,
5953
lumigo_execution_tags_no_scrub,
6054
...rest
6155
} = span;
62-
const { traceId, tracer, logGroupName, logStreamName, ...restInfo } =
63-
info || {};
64-
return {
65-
...rest,
66-
info: {
67-
traceId: traceId?.Root,
68-
...restInfo,
69-
},
70-
};
56+
57+
return { ...rest };
7158
});
7259
};
7360

@@ -80,12 +67,10 @@ export const groupSpans = (spans) => {
8067
for (const span of spans) {
8168
const latestSpan = groupedSpans[groupedSpans.length - 1] || {};
8269
const similarSpans = groupedSpans.filter(
83-
(s) => s.groupingKey === span.groupingKey,
70+
(s) => s.extendedGroupingKey === span.extendedGroupingKey,
8471
);
8572

86-
if (span.extendedGroupingKey === latestSpan.extendedGroupingKey) {
87-
latestSpan.instances++;
88-
} else if (similarSpans.length >= 25) {
73+
if (similarSpans.length >= 20) {
8974
similarSpans[similarSpans.length - 1].instances++;
9075
} else {
9176
groupedSpans.push(span);

packages/deploy-script/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ Once you're done, it will display the URL for your new dashboard.
3434
### Auto-tracing
3535

3636
Once installed, all Lambda functions in your AWS account will automatically be traced,
37-
by adding a Lambda layer to them. You'll also notice a new environment variable called `AUTO_TRACE_HOST`.
37+
by adding a Lambda layer to them. You'll also notice a new environment variable called `AUTO_TRACE_QUEUE_URL`.
3838

3939
If you wish to disable tracing for a specific function, you can add the environment variable `AUTO_TRACE_EXCLUDE`
4040
with a value of `1`. This will keep the Lambda layer from being added.

packages/lambda-layer/index.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,12 @@ try {
44
} catch (e) {
55
config = {
66
token: "t_0000000000000000",
7-
edgeHost: process.env.AUTO_TRACE_HOST,
87
};
98
}
109

1110
const tracer = require("@lumigo/tracer")({
1211
token: config.token,
13-
edgeHost: process.env.AUTO_TRACE_HOST || config.edgeHost,
12+
edgeHost: "localhost",
1413
});
1514

1615
const verbose = process.env.TRACER_LOG_VERBOSE;

0 commit comments

Comments
 (0)