-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathresponseStream.ts
More file actions
101 lines (94 loc) · 3.3 KB
/
responseStream.ts
File metadata and controls
101 lines (94 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import { getDatabase } from '@utils/mongodb/mongoClient.mjs';
export function createResponseStream(
result: any,
model: string
): ReadableStream {
const enc = new TextEncoder();
return new ReadableStream({
async start(controller) {
let finishPromptTokens = 0;
let finishCompletionTokens = 0;
let finishCachedTokens = 0;
let streamError: string | null = null;
const enq = (line: string) => controller.enqueue(enc.encode(line));
let suppressText = false;
let hasEmittedText = false;
try {
for await (const part of result.fullStream) {
if (part?.type === 'text-delta') {
if (!suppressText) {
enq(`0:${JSON.stringify(part.text ?? '')}\n`);
if (part.text) hasEmittedText = true;
}
} else if (part?.type === 'tool-call') {
enq(
`9:${JSON.stringify([
{
toolCallId: part.toolCallId,
toolName: part.toolName,
state: 'call',
},
])}\n`
);
} else if (part?.type === 'tool-result') {
// Keep allowing text if no assistant text has been emitted yet.
// This preserves a tool-first "intro sentence + cards" UX.
if (hasEmittedText) suppressText = true;
enq(
`a:${JSON.stringify([
{
toolCallId: part.toolCallId,
toolName: part.toolName,
state: 'result',
result: part.output,
},
])}\n`
);
} else if (part?.type === 'error') {
console.error(
'[hackbot][stream] OpenAI error in stream:',
part.error
);
streamError = (part.error as any)?.message ?? 'OpenAI server error';
break;
} else if (part?.type === 'finish') {
finishPromptTokens = part.totalUsage?.inputTokens ?? 0;
finishCompletionTokens = part.totalUsage?.outputTokens ?? 0;
finishCachedTokens =
part.totalUsage?.inputTokenDetails?.cacheReadTokens ?? 0;
} else if (part?.type === 'finish-step' && finishPromptTokens === 0) {
finishPromptTokens = part.usage?.inputTokens ?? 0;
finishCompletionTokens = part.usage?.outputTokens ?? 0;
finishCachedTokens =
part.usage?.inputTokenDetails?.cacheReadTokens ?? 0;
}
}
if (streamError) {
enq(
`3:${JSON.stringify('Something went wrong. Please try again.')}\n`
);
}
controller.close();
} catch (e) {
console.error('[hackbot][stream] fullStream error', e);
controller.error(e);
return;
}
if (finishPromptTokens > 0) {
getDatabase()
.then((db) =>
db.collection('hackbot_usage').insertOne({
timestamp: new Date(),
model,
promptTokens: finishPromptTokens,
completionTokens: finishCompletionTokens,
cachedPromptTokens: finishCachedTokens,
})
)
.catch((err) =>
console.error('[hackbot][stream] usage insert failed', err)
);
}
},
});
}