Skip to content

Commit ffb8b57

Browse files
committed
fix: changed client stream caller signature
Related #501 [ci skip]
1 parent 632281c commit ffb8b57

4 files changed

Lines changed: 42 additions & 69 deletions

File tree

src/RPC/RPCClient.ts

Lines changed: 16 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class RPCClient<M extends ClientManifest> {
7575
case 'SERVER':
7676
return (params) => this.serverStreamCaller(method, params);
7777
case 'CLIENT':
78-
return (f) => this.clientStreamCaller(method, f);
78+
return () => this.clientStreamCaller(method);
7979
case 'DUPLEX':
8080
return (f) => this.duplexStreamCaller(method, f);
8181
case 'RAW':
@@ -142,8 +142,6 @@ class RPCClient<M extends ClientManifest> {
142142
return this.rawMethodsProxy as MapRawCallers<M>;
143143
}
144144

145-
// Convenience methods
146-
147145
@ready(new rpcErrors.ErrorRpcDestroyed())
148146
public async unaryCaller<I extends JSONValue, O extends JSONValue>(
149147
method: string,
@@ -183,21 +181,22 @@ class RPCClient<M extends ClientManifest> {
183181
@ready(new rpcErrors.ErrorRpcDestroyed())
184182
public async clientStreamCaller<I extends JSONValue, O extends JSONValue>(
185183
method: string,
186-
f: (output: Promise<O>) => AsyncIterable<I | undefined>,
187-
): Promise<void> {
188-
const callerInterface = await this.rawClientStreamCaller<I, O>(method);
189-
const writer = callerInterface.writable.getWriter();
190-
let running = true;
191-
for await (const value of f(callerInterface.output)) {
192-
if (value === undefined) {
193-
await writer.close();
194-
running = false;
184+
): Promise<{
185+
output: Promise<O>;
186+
writable: WritableStream<I>;
187+
}> {
188+
const callerInterface = await this.rawDuplexStreamCaller<I, O>(method);
189+
const reader = callerInterface.readable.getReader();
190+
const output = reader.read().then(({ value, done }) => {
191+
if (done) {
192+
throw new rpcErrors.ErrorRpcRemoteError('Stream ended before response');
195193
}
196-
// Write while running otherwise consume until ended
197-
if (running) await writer.write(value);
198-
}
199-
// If ended before finish running then close writer
200-
if (running) await writer.close();
194+
return value;
195+
});
196+
return {
197+
output,
198+
writable: callerInterface.writable,
199+
};
201200
}
202201

203202
@ready(new rpcErrors.ErrorRpcDestroyed())
@@ -263,29 +262,6 @@ class RPCClient<M extends ClientManifest> {
263262
tempWriter.releaseLock();
264263
return streamPair;
265264
}
266-
267-
protected async rawClientStreamCaller<
268-
I extends JSONValue,
269-
O extends JSONValue,
270-
>(
271-
method: string,
272-
): Promise<{
273-
output: Promise<O>;
274-
writable: WritableStream<I>;
275-
}> {
276-
const callerInterface = await this.rawDuplexStreamCaller<I, O>(method);
277-
const reader = callerInterface.readable.getReader();
278-
const output = reader.read().then(({ value, done }) => {
279-
if (done) {
280-
throw new rpcErrors.ErrorRpcRemoteError('Stream ended before response');
281-
}
282-
return value;
283-
});
284-
return {
285-
output,
286-
writable: callerInterface.writable,
287-
};
288-
}
289265
}
290266

291267
export default RPCClient;

src/RPC/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ type ServerCallerImplementation<
161161
type ClientCallerImplementation<
162162
I extends JSONValue = JSONValue,
163163
O extends JSONValue = JSONValue,
164-
> = (f: (output: Promise<O>) => AsyncIterable<I | undefined>) => Promise<void>;
164+
> = () => Promise<{ output: Promise<O>; writable: WritableStream<I> }>;
165165

166166
type DuplexCallerImplementation<
167167
I extends JSONValue = JSONValue,

tests/RPC/RPC.test.ts

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -208,17 +208,14 @@ describe('RPC', () => {
208208
logger,
209209
});
210210

211-
await rpcClient.methods.testMethod(async function* (output) {
212-
for (const value of values) {
213-
yield value;
214-
}
215-
// Ending writes
216-
yield undefined;
217-
// Checking output
218-
const expectedResult = values.reduce((p, c) => p + c);
219-
await expect(output).resolves.toEqual(expectedResult);
220-
});
221-
211+
const { output, writable } = await rpcClient.methods.testMethod();
212+
const writer = writable.getWriter();
213+
for (const value of values) {
214+
await writer.write(value);
215+
}
216+
await writer.close();
217+
const expectedResult = values.reduce((p, c) => p + c);
218+
await expect(output).resolves.toEqual(expectedResult);
222219
await rpcServer.destroy();
223220
await rpcClient.destroy();
224221
},

tests/RPC/RPCClient.test.ts

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -175,16 +175,16 @@ describe(`${RPCClient.name}`, () => {
175175
streamPairCreateCallback: async () => streamPair,
176176
logger,
177177
});
178-
await rpcClient.clientStreamCaller<JSONValue, JSONValue>(
179-
methodName,
180-
async function* (output) {
181-
for (const param of params) {
182-
yield param;
183-
}
184-
yield undefined;
185-
expect(await output).toStrictEqual(message.result);
186-
},
187-
);
178+
const { output, writable } = await rpcClient.clientStreamCaller<
179+
JSONValue,
180+
JSONValue
181+
>(methodName);
182+
const writer = writable.getWriter();
183+
for (const param of params) {
184+
await writer.write(param);
185+
}
186+
await writer.close();
187+
expect(await output).toStrictEqual(message.result);
188188
const expectedOutput = params.map((v) =>
189189
JSON.stringify({
190190
method: methodName,
@@ -518,13 +518,13 @@ describe(`${RPCClient.name}`, () => {
518518
streamPairCreateCallback: async () => streamPair,
519519
logger,
520520
});
521-
await rpcClient.methods.client(async function* (output) {
522-
for (const param of params) {
523-
yield param;
524-
}
525-
yield undefined;
526-
expect(await output).toStrictEqual(message.result);
527-
});
521+
const { output, writable } = await rpcClient.methods.client();
522+
const writer = writable.getWriter();
523+
for (const param of params) {
524+
await writer.write(param);
525+
}
526+
expect(await output).toStrictEqual(message.result);
527+
await writer.close();
528528
const expectedOutput = params.map((v) =>
529529
JSON.stringify({
530530
method: 'client',

0 commit comments

Comments
 (0)