Skip to content

Commit 3e92b0c

Browse files
committed
Add test for reading after the stream end
1 parent 0e73a5f commit 3e92b0c

1 file changed

Lines changed: 30 additions & 1 deletion

File tree

test/client.e2e.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Offset, ProximaStreamClient, StreamRegistryClient, StreamEndpoint } from "../src"
22
import { strict as assert } from "assert";
3-
import { firstValueFrom, take, toArray } from "rxjs";
3+
import { EMPTY, Observable, firstValueFrom, isEmpty, lastValueFrom, take, timeout, toArray } from "rxjs";
44

55
const streamTests = [{
66
stream: "proxima.eth-main.blocks.1_0",
@@ -10,6 +10,23 @@ const streamTests = [{
1010
timestamp: 1438473600000,
1111
}];
1212

13+
async function getStats(registry: StreamRegistryClient, name: string): Promise<StreamEndpoint | undefined> {
14+
const streams = await registry.getStreams();
15+
for (const stream of streams) {
16+
if (stream.name == name) {
17+
return Object.values(stream.endpoints)[0];
18+
}
19+
}
20+
}
21+
22+
async function emitsNothingFor<T>(observable: Observable<T>, time: number): Promise<boolean> {
23+
return lastValueFrom(
24+
observable
25+
.pipe(timeout({each: time, with: () => EMPTY}))
26+
.pipe(isEmpty())
27+
);
28+
}
29+
1330
describe("StreamRegistryClient", () => {
1431
it("should get all streams", async () => {
1532
const registry = new StreamRegistryClient();
@@ -113,4 +130,16 @@ describe("ProximaStreamClient", () => {
113130
expect(lastEvents).toHaveLength(0);
114131
});
115132
}
133+
134+
it("should return zero events if fetching from last offset", async () => {
135+
const stream = "proxima.exchange-rates.0_1";
136+
const client = new ProximaStreamClient({registry});
137+
const stats = await getStats(registry, stream);
138+
expect(stats).not.toBeUndefined();
139+
const pauseable = await client.streamEvents(stream, stats!.stats.end!);
140+
const subscription = pauseable.observable.subscribe({error: e => fail(e)});
141+
const empty = await emitsNothingFor(pauseable.observable, 1000);
142+
subscription.unsubscribe();
143+
expect(empty).toBeTruthy();
144+
});
116145
});

0 commit comments

Comments
 (0)