-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathindex.ts
More file actions
81 lines (61 loc) · 2.16 KB
/
index.ts
File metadata and controls
81 lines (61 loc) · 2.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import { getGlobal, registerGlobal } from "../utils/globals.js";
import { NoopInputStreamManager } from "./noopManager.js";
import { InputStreamManager, InputStreamOncePromise } from "./types.js";
import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
const API_NAME = "input-streams";
const NOOP_MANAGER = new NoopInputStreamManager();
export class InputStreamsAPI implements InputStreamManager {
private static _instance?: InputStreamsAPI;
private constructor() {}
public static getInstance(): InputStreamsAPI {
if (!this._instance) {
this._instance = new InputStreamsAPI();
}
return this._instance;
}
setGlobalManager(manager: InputStreamManager): boolean {
return registerGlobal(API_NAME, manager);
}
#getManager(): InputStreamManager {
return getGlobal(API_NAME) ?? NOOP_MANAGER;
}
public setRunId(runId: string, streamsVersion?: string): void {
this.#getManager().setRunId(runId, streamsVersion);
}
public on(
streamId: string,
handler: (data: unknown) => void | Promise<void>
): { off: () => void } {
return this.#getManager().on(streamId, handler);
}
public once(streamId: string, options?: InputStreamOnceOptions): InputStreamOncePromise<unknown> {
return this.#getManager().once(streamId, options);
}
public peek(streamId: string): unknown | undefined {
return this.#getManager().peek(streamId);
}
public lastSeqNum(streamId: string): number | undefined {
return this.#getManager().lastSeqNum(streamId);
}
public setLastSeqNum(streamId: string, seqNum: number): void {
this.#getManager().setLastSeqNum(streamId, seqNum);
}
public shiftBuffer(streamId: string): boolean {
return this.#getManager().shiftBuffer(streamId);
}
public disconnectStream(streamId: string): void {
this.#getManager().disconnectStream(streamId);
}
public clearHandlers(): void {
this.#getManager().clearHandlers();
}
public reset(): void {
this.#getManager().reset();
}
public disconnect(): void {
this.#getManager().disconnect();
}
public connectTail(runId: string, fromSeq?: number): void {
this.#getManager().connectTail(runId, fromSeq);
}
}