Skip to content

Commit 11b3123

Browse files
committed
Add event-stream-adapter element
1 parent 1478e1e commit 11b3123

6 files changed

Lines changed: 573 additions & 0 deletions

File tree

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
<!DOCTYPE html>
2+
<html lang="en">
3+
<head>
4+
<meta charset="UTF-8">
5+
<title>Event Stream Adapter Custom Element</title>
6+
</head>
7+
<body>
8+
<script type="module">
9+
import { customElement } from '/assets/custom-element/custom-element.js';
10+
import { getActivityLivestreamingMetadata } from 'botframework-webchat-core';
11+
import { EventSourceParserStream } from 'eventsource-parser/stream';
12+
import createStreamCoalescer from '/assets/esm/adapter/createStreamCoalescer.js';
13+
import { forIterator } from '/assets/esm/adapter/demuxChainOfThought.js';
14+
15+
function createObservable(subscribe) {
16+
return Object.freeze({ subscribe });
17+
}
18+
19+
customElement('event-stream-adapter', currentDocument =>
20+
class EventStreamAdapterElement extends HTMLElement {
21+
static get observedAttributes() { return ['compat', 'href']; }
22+
23+
#abortController = null;
24+
#activities = [];
25+
#activityBuffer = [];
26+
#activityObservers = [];
27+
#directLine;
28+
29+
constructor() {
30+
super();
31+
32+
const self = this;
33+
34+
this.#directLine = Object.freeze({
35+
activity$: createObservable(observerOrNext => {
36+
const observer = typeof observerOrNext === 'function'
37+
? { next: observerOrNext }
38+
: observerOrNext;
39+
40+
for (const activity of self.#activityBuffer) {
41+
observer.next(activity);
42+
}
43+
44+
self.#activityObservers.push(observer);
45+
46+
return Object.freeze({
47+
unsubscribe() {
48+
const i = self.#activityObservers.indexOf(observer);
49+
i !== -1 && self.#activityObservers.splice(i, 1);
50+
}
51+
});
52+
}),
53+
54+
connectionStatus$: createObservable(observerOrNext => {
55+
const observer = typeof observerOrNext === 'function'
56+
? { next: observerOrNext }
57+
: observerOrNext;
58+
59+
observer.next(0);
60+
observer.next(1);
61+
observer.next(2);
62+
63+
return Object.freeze({ unsubscribe() {} });
64+
}),
65+
66+
end() {},
67+
68+
postActivity() {
69+
return createObservable(observerOrNext => {
70+
const observer = typeof observerOrNext === 'function'
71+
? { next: observerOrNext }
72+
: observerOrNext;
73+
74+
observer.next(crypto.randomUUID());
75+
observer.complete?.();
76+
77+
return Object.freeze({ unsubscribe() {} });
78+
});
79+
}
80+
});
81+
}
82+
83+
get compat() {
84+
return this.getAttribute('compat') || 'webchat';
85+
}
86+
87+
get directLine() {
88+
return this.#directLine;
89+
}
90+
91+
connectedCallback() {
92+
this.#load();
93+
}
94+
95+
disconnectedCallback() {
96+
this.#abortController?.abort();
97+
this.#abortController = null;
98+
}
99+
100+
#emitToAdapter(activity) {
101+
this.#activityBuffer.push(activity);
102+
103+
for (const observer of this.#activityObservers) {
104+
observer.next(activity);
105+
}
106+
}
107+
108+
#emitEvent(activity) {
109+
this.#activities.push(activity);
110+
111+
const meta = getActivityLivestreamingMetadata(activity);
112+
const allActivities = this.#activities;
113+
const detail = { activity };
114+
115+
meta && Object.defineProperty(detail, 'activities', {
116+
enumerable: true,
117+
get: () => allActivities.values()
118+
});
119+
120+
this.dispatchEvent(new CustomEvent('activity', {
121+
bubbles: true,
122+
detail: Object.freeze(detail)
123+
}));
124+
}
125+
126+
async #load() {
127+
const href = this.getAttribute('href');
128+
129+
if (!href) {
130+
return;
131+
}
132+
133+
this.#abortController?.abort();
134+
this.#abortController = new AbortController();
135+
136+
const { signal } = this.#abortController;
137+
const compat = this.compat;
138+
const useAdapter = compat === 'webchat' || compat === 'both';
139+
const useEvents = compat === 'events' || compat === 'both';
140+
141+
try {
142+
for await (const activity of forIterator({}, this.#fetchStreamed(href, signal))) {
143+
if (signal.aborted) {
144+
break;
145+
}
146+
147+
useAdapter && this.#emitToAdapter(activity);
148+
useEvents && this.#emitEvent(activity);
149+
}
150+
} catch (error) {
151+
signal.aborted || console.error('event-stream-adapter:', error);
152+
}
153+
}
154+
155+
async *#fetchStreamed(href, signal) {
156+
const typingMap = new Map();
157+
const res = await fetch(new URL(href, location.href), { signal });
158+
159+
yield* res.body
160+
.pipeThrough(new TextDecoderStream())
161+
.pipeThrough(new EventSourceParserStream())
162+
.pipeThrough(
163+
new TransformStream({
164+
transform({ data, event }, controller) {
165+
if (event === 'end') {
166+
controller.terminate();
167+
return;
168+
}
169+
170+
if (event !== 'activity') {
171+
return;
172+
}
173+
174+
const activity = JSON.parse(data);
175+
176+
if (
177+
activity.type === 'typing' &&
178+
activity.text &&
179+
activity.channelData?.streamType === 'streaming'
180+
) {
181+
const streamId = activity.channelData?.streamId || activity.id;
182+
let accumulated = typingMap.get(streamId) || '';
183+
184+
if (activity.channelData?.chunkType === 'delta') {
185+
accumulated += activity.text;
186+
activity.text = accumulated;
187+
} else {
188+
accumulated = activity.text;
189+
}
190+
191+
typingMap.set(streamId, accumulated);
192+
}
193+
194+
controller.enqueue(activity);
195+
}
196+
})
197+
)
198+
.pipeThrough(createStreamCoalescer());
199+
}
200+
}
201+
);
202+
</script>
203+
</body>
204+
</html>
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/* eslint-disable security/detect-object-injection */
2+
/*!
3+
* Copyright (C) Microsoft Corporation. All rights reserved.
4+
*/
5+
6+
const NextLivestreamSequence = Symbol();
7+
const PreviousActivitySymbol = Symbol();
8+
const SessionIdSymbol = Symbol();
9+
10+
export default class LivestreamSession {
11+
constructor(sessionId) {
12+
this[NextLivestreamSequence] = 1;
13+
this[PreviousActivitySymbol] = undefined;
14+
this[SessionIdSymbol] = sessionId;
15+
}
16+
17+
/**
18+
* Last string, useful for decompressing delta-compressed chunks.
19+
*/
20+
get previousActivity() {
21+
return this[PreviousActivitySymbol];
22+
}
23+
24+
set previousActivity(value) {
25+
this[PreviousActivitySymbol] = value;
26+
}
27+
28+
/**
29+
* Activity ID of the session (and the first activity.)
30+
*
31+
* @type {string}
32+
*/
33+
get sessionId() {
34+
return this[SessionIdSymbol];
35+
}
36+
37+
get isConcluded() {
38+
return this[NextLivestreamSequence] === Infinity;
39+
}
40+
41+
/** @return {number} */
42+
getNextLivestreamSequence(
43+
/** @type {boolean | undefined} */
44+
isFinal = false
45+
) {
46+
if (isFinal) {
47+
this.previousActivity = undefined;
48+
49+
return (this[NextLivestreamSequence] = Infinity);
50+
}
51+
52+
return this[NextLivestreamSequence]++;
53+
}
54+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/* eslint-disable security/detect-object-injection */
2+
3+
/*!
4+
* Copyright (C) Microsoft Corporation. All rights reserved.
5+
*/
6+
7+
import LivestreamSession from './LivestreamSession.js';
8+
9+
const ActiveLivestreamSymbol = Symbol();
10+
11+
export default class LivestreamSessionManager {
12+
constructor() {
13+
this[ActiveLivestreamSymbol] = new Map();
14+
}
15+
16+
*concludeAll() {
17+
for (const [sessionId, session] of this[ActiveLivestreamSymbol]) {
18+
if (!session.isConcluded) {
19+
const { previousActivity } = session;
20+
const entitiesWithoutStreamInfo = (previousActivity?.entities ?? []).filter(
21+
({ type }) => type !== 'streaminfo'
22+
);
23+
24+
yield Object.freeze({
25+
...previousActivity,
26+
channelData: Object.freeze({
27+
...previousActivity?.channelData,
28+
chunkType: undefined,
29+
streamId: sessionId,
30+
streamSequence: undefined,
31+
streamType: 'final'
32+
}),
33+
entities: Object.freeze([...entitiesWithoutStreamInfo]),
34+
id: `${sessionId}/final`,
35+
text: previousActivity?.text,
36+
type: 'message'
37+
});
38+
}
39+
}
40+
}
41+
42+
has(livestreamSessionId) {
43+
return this[ActiveLivestreamSymbol].has(livestreamSessionId);
44+
}
45+
46+
*sequence(livestreamSessionId, activity, isFinal = false) {
47+
let livestreamSession = this[ActiveLivestreamSymbol].get(livestreamSessionId);
48+
49+
if (!livestreamSession) {
50+
livestreamSession = new LivestreamSession(livestreamSessionId);
51+
52+
this[ActiveLivestreamSymbol].set(livestreamSessionId, livestreamSession);
53+
}
54+
55+
if (livestreamSession.isConcluded) {
56+
return;
57+
}
58+
59+
const streamSequence = livestreamSession.getNextLivestreamSequence(isFinal);
60+
const entitiesWithoutStreamInfo = (activity.entities ?? []).filter(({ type }) => type !== 'streaminfo');
61+
62+
// We assume the chat adapter will do delta decompression.
63+
livestreamSession.previousActivity = activity;
64+
65+
yield Object.freeze({
66+
...activity,
67+
channelData: Object.freeze({
68+
...activity.channelData,
69+
chunkType: undefined,
70+
streamId: streamSequence === 1 ? undefined : livestreamSessionId,
71+
streamSequence: streamSequence === Infinity ? undefined : streamSequence,
72+
streamType: streamSequence === Infinity ? 'final' : 'streaming'
73+
}),
74+
entities: Object.freeze([...entitiesWithoutStreamInfo]),
75+
id: streamSequence === 1 ? livestreamSessionId : activity.id,
76+
text: livestreamSession.previousActivity.text,
77+
type: streamSequence === Infinity ? 'message' : 'typing'
78+
});
79+
}
80+
}

0 commit comments

Comments
 (0)