Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 139 additions & 2 deletions tests/unit/rpc.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, it } from 'node:test';
import { describe, it, afterEach } from 'node:test';
import assert from 'node:assert';
import { processProtocolMessage, buildMethodProxy, Transfer } from '../../src/core/rpc';
import { processProtocolMessage, buildMethodProxy, connectWorkerPort, WorkerPort, Transfer } from '../../src/core/rpc';

describe('RPC', () => {
describe('processProtocolMessage', () => {
Expand Down Expand Up @@ -149,4 +149,141 @@ describe('RPC', () => {
assert.strictEqual(transfer[0], buffer);
});
});

describe('connectWorkerPort', () => {
let originalWarn: any;

afterEach(() => {
if (originalWarn) {
console.warn = originalWarn;
originalWarn = undefined;
}
});

it('should route invocations and responses', async () => {
let messageHandler: ((data: any) => void) | undefined;
let postedMessage: any;

const port: WorkerPort = {
postMessage: (msg: any, transfer: any) => {
postedMessage = msg;
},
on: (event: string, handler: any) => {
if (event === 'message') messageHandler = handler;
}
};

const proxy = connectWorkerPort<{ add: (a: number, b: number) => Promise<number> }>(
port,
['add']
);

assert.ok(messageHandler);

// Call proxy
const promise = proxy.add(2, 3);

// Check envelope
assert.strictEqual(postedMessage.kind, 'invoke');
assert.strictEqual(postedMessage.methodName, 'add');
assert.deepStrictEqual(postedMessage.parameters, [2, 3]);

// Emulate reply
messageHandler({
kind: 'result',
correlationId: postedMessage.correlationId,
payload: 5
});

const result = await promise;
assert.strictEqual(result, 5);
});

it('should pass transfer list to postMessage', async () => {
let postedTransfer: any;

const port: WorkerPort = {
postMessage: (msg: any, transfer: any) => {
postedTransfer = transfer;
},
on: () => {}
};

const proxy = connectWorkerPort<{ send: (data: any) => Promise<void> }>(
port,
['send']
);

const buffer = new ArrayBuffer(8);
proxy.send(new Transfer({ buf: buffer }, [buffer])).catch(() => {});

assert.strictEqual(postedTransfer?.length, 1);
assert.strictEqual(postedTransfer?.[0], buffer);
});

it('should fallback to copy if transfer throws', async () => {
let callCount = 0;
let hadTransfer = false;
let fallbackMessage = null;

originalWarn = console.warn;
let warnCalled = false;
console.warn = () => { warnCalled = true; };

const port: WorkerPort = {
postMessage: (msg: any, transfer: any) => {
callCount++;
if (transfer && transfer.length > 0) {
hadTransfer = true;
throw new Error('Transfer not supported');
} else {
fallbackMessage = msg;
}
},
on: () => {}
};

const proxy = connectWorkerPort<{ send: (data: any) => Promise<void> }>(
port,
['send']
);

const buffer = new ArrayBuffer(8);
proxy.send(new Transfer({ buf: buffer }, [buffer])).catch(() => {});

assert.strictEqual(callCount, 2);
assert.strictEqual(hadTransfer, true);
assert.ok(fallbackMessage);
assert.strictEqual(warnCalled, true);
});

it('should route log messages to onLog', () => {
let messageHandler: ((data: any) => void) | undefined;
let logLevel: string | null = null;
let logArgs: any = null;

const port: WorkerPort = {
postMessage: () => {},
on: (event: string, handler: any) => {
if (event === 'message') messageHandler = handler;
}
};

connectWorkerPort(port, [], (level, args) => {
logLevel = level;
logArgs = args;
});

assert.ok(messageHandler);

messageHandler({
kind: 'log',
level: 'info',
args: ['hello world']
});

assert.strictEqual(logLevel, 'info');
assert.deepStrictEqual(logArgs, ['hello world']);
});
});
});