|
1 | 1 | import { Hono } from "hono"; |
| 2 | +import { defULID } from "@thi.ng/ksuid"; |
2 | 3 |
|
3 | | -import { getExpiryTime, put, update } from "../../lib/database"; |
4 | 4 | import { saveHourlyStat } from "../../lib/stats"; |
5 | 5 | import { getErrorKey } from "../../lib/errors"; |
6 | | -import { groupSpans } from "../../lib/spans"; |
7 | 6 | import { saveInvocation } from "../../lib/invocations"; |
| 7 | +import { store } from "../../lib/storage"; |
8 | 8 |
|
9 | 9 | const app = new Hono(); |
10 | 10 |
|
| 11 | +const id = defULID(); |
| 12 | +const getId = () => { |
| 13 | + return id.next(); |
| 14 | +}; |
| 15 | + |
| 16 | +// Holds transactions for which we don't know the invocation ID yet |
| 17 | +// This is used to avoid sending the transaction to the database before we know the invocation ID |
| 18 | +const transactionCache = {}; |
| 19 | + |
11 | 20 | app.post("/", async (c) => { |
12 | 21 | const body = await c.req.json(); |
13 | 22 |
|
14 | | - const groupedItems = {}; |
15 | | - |
16 | 23 | for (const span of body) { |
17 | 24 | if (process.env.TRACER_TOKEN && span.token !== process.env.TRACER_TOKEN) { |
18 | 25 | console.log(`Invalid token: ${span.token}`); |
19 | 26 | continue; |
20 | 27 | } |
21 | 28 |
|
22 | | - const pk = `transaction#${span.transactionId || span.transaction_id}`; |
23 | | - if (!groupedItems[pk]) groupedItems[pk] = []; |
| 29 | + const groupKey = span.transactionId || span.transaction_id; |
| 30 | + if (!transactionCache[groupKey]) transactionCache[groupKey] = []; |
24 | 31 |
|
25 | | - if (span.type === "log") { |
26 | | - // save log span |
27 | | - groupedItems[pk].push({ |
28 | | - ...span, |
29 | | - transactionId: span.transactionId || span.transaction_id, |
30 | | - type: "log", |
31 | | - }); |
32 | | - continue; |
33 | | - } |
| 32 | + transactionCache[groupKey].push(span); |
| 33 | + } |
| 34 | + |
| 35 | + // Check transactions cache to see if there's any transactions we can flush |
| 36 | + for (const [transactionId, spans] of Object.entries(transactionCache)) { |
| 37 | + const invocationEndedSpan = spans.find( |
| 38 | + (span) => |
| 39 | + span.type === "function" && span.ended && !span.id.includes("_started"), |
| 40 | + ); |
| 41 | + |
| 42 | + if (!invocationEndedSpan) { |
| 43 | + console.log( |
| 44 | + `No invocation ended span found for transaction ${spans[0].transactionId}`, |
| 45 | + ); |
| 46 | + |
| 47 | + // TODO: if we are close to running out of time, we should flush the transaction cache anyway |
34 | 48 |
|
35 | | - // ignore started spans and enrichments |
36 | | - if (span.id.endsWith("_started") || span.type === "enrichment") { |
37 | 49 | continue; |
| 50 | + } else { |
| 51 | + console.log( |
| 52 | + "Flushing transaction cache for", |
| 53 | + invocationEndedSpan.transactionId, |
| 54 | + ); |
38 | 55 | } |
39 | 56 |
|
40 | | - // save transaction span |
41 | | - groupedItems[pk].push({ |
42 | | - ...span, |
43 | | - type: "span", |
44 | | - spanType: span.type, |
45 | | - }); |
46 | | - |
47 | | - if ( |
48 | | - span.type === "function" && |
49 | | - span.ended && |
50 | | - !span.id.includes("_started") |
51 | | - ) { |
52 | | - // save function invocation details |
53 | | - await saveInvocation(span); |
54 | | - |
55 | | - // save error |
56 | | - if (span.error) { |
57 | | - const errorKey = getErrorKey(span.error); |
58 | | - await update({ |
59 | | - Key: { |
60 | | - pk: `function#${span.region}#${span.name}`, |
61 | | - sk: `error#${errorKey}`, |
62 | | - }, |
63 | | - UpdateExpression: `SET #error = :error, #lastInvocation = :lastInvocation, #lastSeen = :lastSeen, #expires = :expires, #type = :type, #name = :name, #region = :region`, |
64 | | - ExpressionAttributeValues: { |
65 | | - ":error": span.error, |
66 | | - ":lastInvocation": `${span.started}/${span.id}`, |
67 | | - ":lastSeen": new Date(span.ended).toISOString(), |
68 | | - ":expires": getExpiryTime(), |
69 | | - ":type": "error", |
70 | | - ":name": span.name, |
71 | | - ":region": span.region, |
72 | | - }, |
73 | | - ExpressionAttributeNames: { |
74 | | - "#error": "error", |
75 | | - "#lastInvocation": "lastInvocation", |
76 | | - "#lastSeen": "lastSeen", |
77 | | - "#expires": "_expires", |
78 | | - "#type": "type", |
79 | | - "#name": "name", |
80 | | - "#region": "region", |
81 | | - }, |
82 | | - }); |
83 | | - await saveHourlyStat(span.region, span.name + ".error." + errorKey, 1); |
84 | | - } |
85 | | - |
86 | | - // save function meta data |
87 | | - try { |
88 | | - await update({ |
89 | | - Key: { |
90 | | - pk: `function#${span.region}#${span.name}`, |
91 | | - sk: `function#${span.region}`, |
92 | | - }, |
93 | | - UpdateExpression: `SET lastInvocation = :lastInvocation, memoryAllocated = :memoryAllocated, #timeout = :timeout, traceStatus = :traceStatus, #expires = :expires`, |
94 | | - ExpressionAttributeValues: { |
95 | | - ":lastInvocation": span.started, |
96 | | - ":memoryAllocated": span.memoryAllocated, |
97 | | - ":timeout": span.maxFinishTime - span.started, |
98 | | - ":traceStatus": "enabled", |
99 | | - ":expires": getExpiryTime(), |
100 | | - }, |
101 | | - ExpressionAttributeNames: { |
102 | | - "#timeout": "timeout", |
103 | | - "#expires": "_expires", |
104 | | - }, |
105 | | - }); |
106 | | - } catch (e) { |
107 | | - console.log(e); |
108 | | - } |
109 | | - |
110 | | - // save stats |
111 | | - const duration = span.ended - span.started; |
112 | | - await saveHourlyStat(span.region, span.name + ".invocations", 1); |
113 | | - await saveHourlyStat(span.region, span.name + ".duration", duration); |
114 | | - await saveHourlyStat("global", "invocations", 1); |
115 | | - if (span.error) { |
116 | | - await saveHourlyStat(span.region, span.name + ".errors", 1); |
117 | | - await saveHourlyStat("global", "errors", 1); |
118 | | - } |
| 57 | + // save function invocation details |
| 58 | + await saveInvocation(invocationEndedSpan, spans); |
| 59 | + |
| 60 | + // const duration = invocationEndedSpan.ended - invocationEndedSpan.started; |
| 61 | + await saveHourlyStat( |
| 62 | + invocationEndedSpan.region, |
| 63 | + invocationEndedSpan.name + ".invocations", |
| 64 | + 1, |
| 65 | + ); |
| 66 | + // await saveHourlyStat( |
| 67 | + // invocationEndedSpan.region, |
| 68 | + // invocationEndedSpan.name + ".duration", |
| 69 | + // duration, |
| 70 | + // ); |
| 71 | + await saveHourlyStat("global", "invocations", 1); |
| 72 | + if (invocationEndedSpan.error) { |
| 73 | + await saveHourlyStat( |
| 74 | + invocationEndedSpan.region, |
| 75 | + invocationEndedSpan.name + ".errors", |
| 76 | + 1, |
| 77 | + ); |
| 78 | + await saveHourlyStat("global", "errors", 1); |
119 | 79 | } |
120 | | - } |
121 | 80 |
|
122 | | - const itemsToSave = []; |
123 | | - for (let [pk, items] of Object.entries(groupedItems)) { |
124 | | - items = groupSpans(items); |
125 | | - while (items.length) { |
126 | | - const chunk = items.splice(0, 100); |
127 | | - itemsToSave.push({ |
128 | | - pk, |
129 | | - sk: `spans#${chunk[0].started || chunk[0].sending_time}#${chunk[0].id}`, |
130 | | - type: "spans", |
131 | | - spans: chunk, |
132 | | - }); |
| 81 | + // save error |
| 82 | + if (invocationEndedSpan.error) { |
| 83 | + const errorKey = getErrorKey(invocationEndedSpan.error); |
| 84 | + await store( |
| 85 | + [ |
| 86 | + "errors", |
| 87 | + invocationEndedSpan.region, |
| 88 | + invocationEndedSpan.name, |
| 89 | + errorKey, |
| 90 | + getId(), |
| 91 | + ], |
| 92 | + { |
| 93 | + error: invocationEndedSpan.error, |
| 94 | + lastInvocation: `${invocationEndedSpan.started}/${invocationEndedSpan.id}`, |
| 95 | + lastSeen: new Date(invocationEndedSpan.ended).toISOString(), |
| 96 | + type: "error", |
| 97 | + name: invocationEndedSpan.name, |
| 98 | + region: invocationEndedSpan.region, |
| 99 | + }, |
| 100 | + ); |
| 101 | + await saveHourlyStat( |
| 102 | + invocationEndedSpan.region, |
| 103 | + invocationEndedSpan.name + ".error." + errorKey, |
| 104 | + 1, |
| 105 | + ); |
133 | 106 | } |
134 | | - } |
135 | 107 |
|
136 | | - await Promise.all(itemsToSave.map((item) => put(item), true)); |
| 108 | + // Delete the transaction from the cache |
| 109 | + delete transactionCache[transactionId]; |
| 110 | + } |
137 | 111 |
|
138 | 112 | return c.json({ success: true }); |
139 | 113 | }); |
|
0 commit comments