Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ Breaking changes in this release:
- Added pull-based capabilities system for dynamically discovering adapter capabilities at runtime, in PR [#5679](https://github.com/microsoft/BotFramework-WebChat/pull/5679), by [@pranavjoshi001](https://github.com/pranavjoshi001)
- Added Speech-to-Speech (S2S) support for real-time voice conversations, in PR [#5654](https://github.com/microsoft/BotFramework-WebChat/pull/5654), by [@pranavjoshi](https://github.com/pranavjoshi001)
- Added core mute/unmute functionality for speech-to-speech via `useRecorder` hook (silent chunks keep server connection alive), in PR [#5688](https://github.com/microsoft/BotFramework-WebChat/pull/5688), by [@pranavjoshi](https://github.com/pranavjoshi001)
- 🧪 Added incremental streaming Markdown renderer for livestreaming, in PR [#5799](https://github.com/microsoft/BotFramework-WebChat/pull/5799), by [@OEvgeny](https://github.com/OEvgeny)

### Changed

Expand Down
236 changes: 236 additions & 0 deletions __tests__/assets/custom-element/event-stream-adapter.ce.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Event Stream Adapter Custom Element</title>
</head>
<body>
<script type="module">
import { customElement } from '/assets/custom-element/custom-element.js';
import { getActivityLivestreamingMetadata } from 'botframework-webchat-core';
import { EventSourceParserStream } from 'eventsource-parser/stream';
import createStreamCoalescer from '/assets/esm/adapter/createStreamCoalescer.js';
import { forIterator } from '/assets/esm/adapter/demuxChainOfThought.js';

function createObservable(subscribe) {
return Object.freeze({ subscribe });
}

customElement('event-stream-adapter', currentDocument =>
class EventStreamAdapterElement extends HTMLElement {
static get observedAttributes() { return ['compat', 'href']; }

#abortController = null;
#activities = [];
#activityBuffer = [];
#activityObservers = [];
#activityWaiters = new Set();
#directLine;

constructor() {
super();

const self = this;

this.#directLine = Object.freeze({
activity$: createObservable(observerOrNext => {
const observer = typeof observerOrNext === 'function'
? { next: observerOrNext }
: observerOrNext;

for (const activity of self.#activityBuffer) {
observer.next(activity);
}

self.#activityObservers.push(observer);

return Object.freeze({
unsubscribe() {
const i = self.#activityObservers.indexOf(observer);
i !== -1 && self.#activityObservers.splice(i, 1);
}
});
}),

connectionStatus$: createObservable(observerOrNext => {
const observer = typeof observerOrNext === 'function'
? { next: observerOrNext }
: observerOrNext;

observer.next(0);
observer.next(1);
observer.next(2);

return Object.freeze({ unsubscribe() {} });
}),

end() {},

postActivity() {
return createObservable(observerOrNext => {
const observer = typeof observerOrNext === 'function'
? { next: observerOrNext }
: observerOrNext;

observer.next(crypto.randomUUID());
observer.complete?.();

return Object.freeze({ unsubscribe() {} });
});
}
});
}

get compat() {
return this.getAttribute('compat') || 'webchat';
}

get directLine() {
return this.#directLine;
}

connectedCallback() {
this.#load();
}

disconnectedCallback() {
this.#abortController?.abort();
this.#abortController = null;
}

#emitToAdapter(activity) {
this.#activityBuffer.push(activity);

for (const observer of this.#activityObservers) {
observer.next(activity);
}
}

#emitEvent(activity) {
this.#activities.push(activity);

for (const resolve of this.#activityWaiters) {
resolve();
}

this.#activityWaiters.clear();

const meta = getActivityLivestreamingMetadata(activity);
const element = this;
const detail = { activity };

meta && Object.defineProperty(detail, 'activities', {
enumerable: true,
get: () => element.#iterateStreamActivities(meta.sessionId)
});

this.dispatchEvent(new CustomEvent('activity', {
bubbles: true,
detail: Object.freeze(detail)
}));
}

async *#iterateStreamActivities(sessionId) {
let cursor = 0;

for (;;) {
while (cursor < this.#activities.length) {
const activity = this.#activities[cursor++];
const activityMeta = getActivityLivestreamingMetadata(activity);

if (!activityMeta || activityMeta.sessionId !== sessionId) {
continue;
}

yield activity;

if (activityMeta.type === 'final activity') {
return;
}
}

await new Promise(resolve => this.#activityWaiters.add(resolve));
}
}

async #load() {
const href = this.getAttribute('href');

if (!href) {
return;
}

this.#abortController?.abort();
this.#abortController = new AbortController();

const { signal } = this.#abortController;
const compat = this.compat;
const useAdapter = compat === 'webchat' || compat === 'both';
const useEvents = compat === 'events' || compat === 'both';

try {
for await (const activity of forIterator({}, this.#fetchStreamed(href, signal))) {
if (signal.aborted) {
break;
}

useAdapter && this.#emitToAdapter(activity);
useEvents && this.#emitEvent(activity.raw);
}
} catch (error) {
signal.aborted || console.error('event-stream-adapter:', error);
}
}

async *#fetchStreamed(href, signal) {
const typingMap = new Map();
const res = await fetch(new URL(href, location.href), { signal });

yield* res.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.pipeThrough(
new TransformStream({
transform({ data, event }, controller) {
if (event === 'end') {
controller.terminate();
return;
}

if (event !== 'activity') {
return;
}

const activity = JSON.parse(data);

activity.raw = { ...activity };

if (
activity.type === 'typing' &&
activity.text &&
activity.channelData?.streamType === 'streaming'
) {
const streamId = activity.channelData?.streamId || activity.id;
let accumulated = typingMap.get(streamId) || '';

if (activity.channelData?.chunkType === 'delta') {
accumulated += activity.text;
activity.text = accumulated;
} else {
accumulated = activity.text;
}

typingMap.set(streamId, accumulated);
}

controller.enqueue(activity);
}
})
)
.pipeThrough(createStreamCoalescer());
}
}
);
</script>
</body>
</html>
54 changes: 54 additions & 0 deletions __tests__/assets/esm/adapter/LivestreamSession.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* eslint-disable security/detect-object-injection */
/*!
* Copyright (C) Microsoft Corporation. All rights reserved.
*/

const NextLivestreamSequence = Symbol();
const PreviousActivitySymbol = Symbol();
const SessionIdSymbol = Symbol();

export default class LivestreamSession {
constructor(sessionId) {
this[NextLivestreamSequence] = 1;
this[PreviousActivitySymbol] = undefined;
this[SessionIdSymbol] = sessionId;
}

/**
* Last string, useful for decompressing delta-compressed chunks.
*/
get previousActivity() {
return this[PreviousActivitySymbol];
}

set previousActivity(value) {
this[PreviousActivitySymbol] = value;
}

/**
* Activity ID of the session (and the first activity.)
*
* @type {string}
*/
get sessionId() {
return this[SessionIdSymbol];
}

get isConcluded() {
return this[NextLivestreamSequence] === Infinity;
}

/** @return {number} */
getNextLivestreamSequence(
/** @type {boolean | undefined} */
isFinal = false
) {
if (isFinal) {
this.previousActivity = undefined;

return (this[NextLivestreamSequence] = Infinity);
}

return this[NextLivestreamSequence]++;
}
}
80 changes: 80 additions & 0 deletions __tests__/assets/esm/adapter/LivestreamSessionManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/* eslint-disable security/detect-object-injection */

/*!
* Copyright (C) Microsoft Corporation. All rights reserved.
*/

import LivestreamSession from './LivestreamSession.js';

const ActiveLivestreamSymbol = Symbol();

export default class LivestreamSessionManager {
constructor() {
this[ActiveLivestreamSymbol] = new Map();
}

*concludeAll() {
for (const [sessionId, session] of this[ActiveLivestreamSymbol]) {
if (!session.isConcluded) {
const { previousActivity } = session;
const entitiesWithoutStreamInfo = (previousActivity?.entities ?? []).filter(
({ type }) => type !== 'streaminfo'
);

yield Object.freeze({
...previousActivity,
channelData: Object.freeze({
...previousActivity?.channelData,
chunkType: undefined,
streamId: sessionId,
streamSequence: undefined,
streamType: 'final'
}),
entities: Object.freeze([...entitiesWithoutStreamInfo]),
id: `${sessionId}/final`,
text: previousActivity?.text,
type: 'message'
});
}
}
}

has(livestreamSessionId) {
return this[ActiveLivestreamSymbol].has(livestreamSessionId);
}

*sequence(livestreamSessionId, activity, isFinal = false) {
let livestreamSession = this[ActiveLivestreamSymbol].get(livestreamSessionId);

if (!livestreamSession) {
livestreamSession = new LivestreamSession(livestreamSessionId);

this[ActiveLivestreamSymbol].set(livestreamSessionId, livestreamSession);
}

if (livestreamSession.isConcluded) {
return;
}

const streamSequence = livestreamSession.getNextLivestreamSequence(isFinal);
const entitiesWithoutStreamInfo = (activity.entities ?? []).filter(({ type }) => type !== 'streaminfo');

// We assume the chat adapter will do delta decompression.
livestreamSession.previousActivity = activity;

yield Object.freeze({
...activity,
channelData: Object.freeze({
...activity.channelData,
chunkType: undefined,
streamId: streamSequence === 1 ? undefined : livestreamSessionId,
streamSequence: streamSequence === Infinity ? undefined : streamSequence,
streamType: streamSequence === Infinity ? 'final' : 'streaming'
}),
entities: Object.freeze([...entitiesWithoutStreamInfo]),
id: streamSequence === 1 ? livestreamSessionId : activity.id,
text: livestreamSession.previousActivity.text,
type: streamSequence === Infinity ? 'message' : 'typing'
});
}
}
Loading
Loading