Skip to content

Commit 539112c

Browse files
committed
Tests added that still need to be checked
1 parent fc1a41a commit 539112c

5 files changed

Lines changed: 218 additions & 230 deletions

File tree

jest.config.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ module.exports = {
138138

139139
// The test environment that will be used for testing
140140
testEnvironment: "node",
141-
testPathIgnorePatterns: ["/node_modules/", "/dist/", "shareableMapMixedWorker.js"],
141+
testPathIgnorePatterns: ["/node_modules/", "/dist/", "shareableMapWorker.js"],
142142
transform: {
143143
"^.+\\.tsx?$": "ts-jest"
144144
},

src/map/ShareableMap.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { fast1a32 } from "fnv-plus";
2-
import Serializable from "./../encoding/Serializable.ts";
3-
import StringEncoder from "./../encoding/StringEncoder.ts";
4-
import NumberEncoder from "./../encoding/NumberEncoder.ts";
5-
import GeneralPurposeEncoder from "./../encoding/GeneralPurposeEncoder.ts";
6-
import ShareableMapOptions from "./ShareableMapOptions.ts";
7-
import {TransferableState} from "../TransferableState.ts";
8-
import TransferableDataStructure from "../TransferableDataStructure.ts";
2+
import Serializable from "./../encoding/Serializable";
3+
import StringEncoder from "./../encoding/StringEncoder";
4+
import NumberEncoder from "./../encoding/NumberEncoder";
5+
import GeneralPurposeEncoder from "./../encoding/GeneralPurposeEncoder";
6+
import ShareableMapOptions from "./ShareableMapOptions";
7+
import {TransferableState} from "../TransferableState";
8+
import TransferableDataStructure from "../TransferableDataStructure";
99

1010
export class ShareableMap<K, V> extends TransferableDataStructure {
1111
// The default load factor to which this map should adhere
@@ -123,8 +123,6 @@ export class ShareableMap<K, V> extends TransferableDataStructure {
123123

124124
const map = new ShareableMap<K, V>({...defaultOptions, ...options});
125125
map.setBuffers(indexBuffer, dataBuffer);
126-
// Reinitialize lock state on revived map (constructor's initializeLockState did not run for replaced buffers)
127-
(map as any).initializeLockState?.();
128126
return map;
129127
}
130128

Lines changed: 147 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -1,201 +1,166 @@
1-
import { Worker } from "worker_threads";
2-
import { ShareableMap } from "../ShareableMap";
1+
import {Worker} from "worker_threads";
2+
import {ShareableMap} from "../ShareableMap";
3+
4+
// TODO: clean up these tests. They don't seem to actually have any proper testing effect!
35

46
// Concurrency stress tests for ShareableMap using real worker_threads.
57
// These tests validate that write locks prevent torn writes and that read locks
6-
// allow concurrent readers without corrupting data. They are heavier; keep iterations modest.
8+
// allow concurrent readers without corrupting data.
79

8-
const WORKER_COUNT = 4; // reduced from 8 for faster completion
9-
const OPERATIONS_PER_WORKER = 300; // reduced further from 800
10+
// Configurable parameters via environment variables
11+
const WORKER_COUNT = parseInt(process.env.MAP_WORKERS || process.env.WORKER_COUNT || "4", 10);
12+
const OPERATIONS_PER_WORKER = parseInt(process.env.MAP_OPS || process.env.OPERATIONS_PER_WORKER || "300", 10);
13+
const PRELOAD_SIZE = parseInt(process.env.MAP_PRELOAD || "100", 10);
14+
const MIXED_TIMEOUT_MS = parseInt(process.env.MAP_MIXED_TIMEOUT || "15000", 10);
15+
const RW_TIMEOUT_MS = parseInt(process.env.MAP_RW_TIMEOUT || "15000", 10);
1016

1117
interface WorkerResult {
12-
sets: number;
13-
deletes: number;
14-
finalSize?: number;
15-
errors: string[];
18+
sets: number;
19+
deletes: number;
20+
finalSize?: number;
21+
errors: string[];
1622
}
1723

1824
function spawnWorker(workerData: any): Promise<WorkerResult> {
19-
return new Promise((resolve) => {
20-
const shareableMapPath = require.resolve('../ShareableMap.ts');
21-
const code = `
22-
try {
23-
const { parentPort, workerData } = require('worker_threads');
24-
require('ts-node').register({ transpileOnly: true, compilerOptions: { module: 'commonjs' } });
25-
const { TextEncoder, TextDecoder } = require('util');
26-
global.TextEncoder = TextEncoder; global.TextDecoder = TextDecoder;
27-
const { ShareableMap } = require('${shareableMapPath.replace(/\\/g,'\\\\')}');
28-
const map = ShareableMap.fromTransferableState(workerData.state);
29-
const startFlag = new Int32Array(workerData.startFlagBuffer);
30-
while (Atomics.load(startFlag, 0) === 0) { Atomics.wait(startFlag, 0, 0, 100); }
31-
let sets = 0, deletes = 0; const errors: string[] = [];
32-
function randStr(){ return Math.random().toString(36).slice(2); }
33-
const deadline = Date.now() + 15000; // 15s safety inside worker
34-
for (let i = 0; i < workerData.operations; i++) {
35-
if (Date.now() > deadline) { errors.push('Worker exceeded internal deadline'); break; }
36-
const mode = i % 10;
37-
try {
38-
if (mode < 7) {
39-
const k = 'W' + workerData.id + '-' + randStr();
40-
map.set(k, 'V' + workerData.id + '-' + randStr());
41-
sets++;
42-
} else if (mode < 9) {
43-
const k = 'W' + Math.floor(Math.random()*workerData.totalWorkers) + '-' + randStr();
44-
map.get(k);
45-
} else {
46-
const k = 'W' + workerData.id + '-' + randStr();
47-
map.delete(k);
48-
deletes++;
49-
}
50-
} catch(e){ errors.push(String(e)); }
51-
}
52-
parentPort.postMessage({ sets, deletes, errors });
53-
} catch (e) {
54-
const { parentPort } = require('worker_threads');
55-
parentPort.postMessage({ sets:0, deletes:0, errors:[String(e && e.stack || e)] });
56-
}
57-
`;
58-
const worker = new Worker(code, { eval: true, workerData });
59-
worker.on('message', msg => { worker.terminate(); resolve(msg); });
60-
worker.on('error', err => { worker.terminate(); resolve({ sets:0, deletes:0, errors:[String(err && (err.stack||err.message)||err)] }); });
61-
worker.on('exit', code => { /* handled */ });
62-
});
25+
return new Promise((resolve) => {
26+
const workerScript = require.resolve('./workers/shareableMapWorker.js');
27+
const shareableMapPath = require.resolve('../ShareableMap.ts').replace(/\\/g, '\\\\');
28+
const worker = new Worker(workerScript, {workerData: {...workerData, shareableMapPath}});
29+
worker.on('message', msg => {
30+
worker.terminate();
31+
resolve(msg);
32+
});
33+
worker.on('error', err => {
34+
worker.terminate();
35+
resolve({sets: 0, deletes: 0, errors: [String(err && (err.stack || err.message) || err)]});
36+
});
37+
worker.on('exit', () => { /* handled */
38+
});
39+
});
6340
}
6441

65-
// Helper to reconstruct map from state after workers finish
6642
function revive(state: ReturnType<ShareableMap<any, any>["toTransferableState"]>) {
67-
return ShareableMap.fromTransferableState<string, string>(state);
43+
return ShareableMap.fromTransferableState<string, string>(state);
6844
}
6945

70-
// Skip in CI quick runs if STRESS env not enabled
7146
const maybeRun = process.env.STRESS ? describe : describe.skip;
7247

7348
maybeRun("ShareableMap thread safety", () => {
74-
beforeAll(() => {
75-
const { TextEncoder, TextDecoder } = require("util");
76-
(global as any).TextEncoder = TextEncoder;
77-
(global as any).TextDecoder = TextDecoder;
78-
});
79-
80-
test("concurrent mixed operations produce consistent size accounting", async () => {
81-
const baseMap = new ShareableMap<string, string>({ expectedSize: WORKER_COUNT * OPERATIONS_PER_WORKER });
82-
83-
// Preload with a few items to exercise update paths
84-
for (let i = 0; i < 100; i++) {
85-
baseMap.set(`PRE-${i}`, `VAL-${i}`);
86-
}
87-
88-
const state = baseMap.toTransferableState();
89-
90-
// Barrier flag
91-
const startFlagBuffer = new SharedArrayBuffer(4);
92-
const startFlag = new Int32Array(startFlagBuffer);
93-
startFlag[0] = 0;
94-
95-
const workers: Promise<WorkerResult>[] = [];
96-
for (let i = 0; i < WORKER_COUNT; i++) {
97-
workers.push(
98-
spawnWorker({
99-
id: i,
100-
operations: OPERATIONS_PER_WORKER,
101-
state,
102-
startFlagBuffer,
103-
totalWorkers: WORKER_COUNT
104-
})
105-
);
106-
}
107-
108-
// Release barrier
109-
Atomics.store(startFlag, 0, 1);
110-
Atomics.notify(startFlag, 0);
111-
112-
const results = await Promise.all(workers);
113-
const totalSets = results.reduce((a, r) => a + r.sets, 0);
114-
const totalDeletes = results.reduce((a, r) => a + r.deletes, 0);
115-
const allErrors = results.flatMap(r => r.errors);
116-
117-
expect(allErrors).toHaveLength(0);
118-
119-
const finalMap = revive(state);
120-
121-
// The final size should be at least the preloaded count + total sets - total deletes.
122-
// Some deletes may target non-existent keys (random), so size cannot be less than preload - possible existing deletes.
123-
const minExpected = 100; // At least the preload should remain
124-
expect(finalMap.size).toBeGreaterThanOrEqual(minExpected);
125-
// Upper bound cannot exceed preload + total sets
126-
expect(finalMap.size).toBeLessThanOrEqual(100 + totalSets); // cannot exceed sets + preload
127-
128-
// Spot check a few random existing keys
129-
for (let i = 0; i < 20; i++) {
130-
const key = `PRE-${Math.floor(Math.random()*100)}`;
131-
expect(finalMap.has(key)).toBe(true);
132-
}
133-
}, 15000);
134-
135-
test("many concurrent readers with single writer maintains data integrity", async () => {
136-
const map = new ShareableMap<string, string>({ expectedSize: 5000 });
137-
for (let i = 0; i < 2000; i++) map.set(`K${i}`, `V${i}`);
138-
const state = map.toTransferableState();
139-
140-
const startFlagBuffer = new SharedArrayBuffer(4);
141-
const startFlag = new Int32Array(startFlagBuffer); startFlag[0] = 0;
142-
143-
// One writer worker performing updates
144-
const writerPromise = spawnWorker({ id: 0, operations: 600, state, startFlagBuffer, totalWorkers: WORKER_COUNT });
145-
146-
// Several reader workers just reading
147-
const readerPromises: Promise<WorkerResult>[] = [];
148-
for (let i = 1; i < WORKER_COUNT; i++) {
149-
readerPromises.push(
150-
new Promise((resolve, reject) => {
151-
const readerWorkerCode = `
152-
try {
153-
const { parentPort, workerData } = require('worker_threads');
154-
require('ts-node').register({ transpileOnly: true, compilerOptions: { module: 'commonjs' } });
155-
const { TextEncoder, TextDecoder } = require('util');
156-
global.TextEncoder = TextEncoder; global.TextDecoder = TextDecoder;
157-
const { ShareableMap } = require('${require.resolve('../ShareableMap.ts').replace(/\\/g,'\\\\')}');
158-
const map = ShareableMap.fromTransferableState(workerData.state);
159-
const startFlag = new Int32Array(workerData.startFlagBuffer);
160-
while (Atomics.load(startFlag, 0) === 0) { Atomics.wait(startFlag, 0, 0, 100); }
161-
const errors: string[] = [];
162-
for (let i = 0; i < 5000; i++) {
163-
try {
164-
const v = map.get('K' + (i % 2000));
165-
if (v !== undefined && !v.startsWith('V')) { errors.push('Corrupted value ' + v); break; }
166-
} catch(e){ errors.push(String(e)); break; }
167-
}
168-
parentPort.postMessage({ sets: 0, deletes: 0, errors });
169-
} catch(e) {
170-
const { parentPort } = require('worker_threads');
171-
parentPort.postMessage({ sets:0, deletes:0, errors:[String(e && e.stack || e)] });
49+
beforeAll(() => {
50+
const {TextEncoder, TextDecoder} = require("util");
51+
(global as any).TextEncoder = TextEncoder;
52+
(global as any).TextDecoder = TextDecoder;
53+
});
54+
55+
test("concurrent mixed operations produce consistent size accounting", async () => {
56+
const baseMap = new ShareableMap<string, string>({expectedSize: WORKER_COUNT * OPERATIONS_PER_WORKER});
57+
58+
for (let i = 0; i < PRELOAD_SIZE; i++) {
59+
baseMap.set(`PRE-${i}`, `VAL-${i}`);
60+
}
61+
62+
const state = baseMap.toTransferableState();
63+
64+
const startFlagBuffer = new SharedArrayBuffer(4);
65+
const startFlag = new Int32Array(startFlagBuffer);
66+
startFlag[0] = 0;
67+
68+
const workers: Promise<WorkerResult>[] = [];
69+
for (let i = 0; i < WORKER_COUNT; i++) {
70+
workers.push(
71+
spawnWorker({
72+
id: i,
73+
operations: OPERATIONS_PER_WORKER,
74+
state,
75+
startFlagBuffer,
76+
totalWorkers: WORKER_COUNT,
77+
internalDeadlineMs: MIXED_TIMEOUT_MS
78+
})
79+
);
80+
}
81+
82+
Atomics.store(startFlag, 0, 1);
83+
Atomics.notify(startFlag, 0);
84+
85+
const results = await Promise.all(workers);
86+
const totalSets = results.reduce((a, r) => a + r.sets, 0);
87+
const totalDeletes = results.reduce((a, r) => a + r.deletes, 0);
88+
const allErrors = results.flatMap(r => r.errors);
89+
90+
expect(allErrors).toHaveLength(0);
91+
92+
expect(baseMap.size).toBeGreaterThanOrEqual(PRELOAD_SIZE);
93+
expect(baseMap.size).toBeLessThanOrEqual(PRELOAD_SIZE + totalSets);
94+
95+
for (let i = 0; i < Math.min(20, PRELOAD_SIZE); i++) {
96+
const key = `PRE-${Math.floor(Math.random() * PRELOAD_SIZE)}`;
97+
expect(baseMap.has(key)).toBe(true);
98+
}
99+
}, MIXED_TIMEOUT_MS);
100+
101+
test("many concurrent readers with single writer maintains data integrity", async () => {
102+
const map = new ShareableMap<string, string>({expectedSize: 5000});
103+
for (let i = 0; i < 2000; i++) map.set(`K${i}`, `V${i}`);
104+
const state = map.toTransferableState();
105+
106+
const startFlagBuffer = new SharedArrayBuffer(4);
107+
const startFlag = new Int32Array(startFlagBuffer);
108+
startFlag[0] = 0;
109+
110+
const writerPromise = spawnWorker({
111+
id: 0,
112+
operations: Math.max(OPERATIONS_PER_WORKER, 600),
113+
state,
114+
startFlagBuffer,
115+
totalWorkers: WORKER_COUNT,
116+
internalDeadlineMs: RW_TIMEOUT_MS
117+
});
118+
119+
const readerPromises: Promise<WorkerResult>[] = [];
120+
for (let i = 1; i < WORKER_COUNT; i++) {
121+
readerPromises.push(
122+
new Promise((resolve) => {
123+
const workerScript = require.resolve('./workers/shareableMapWorker.js');
124+
const shareableMapPath = require.resolve('../ShareableMap.ts').replace(/\\/g, '\\\\');
125+
const worker = new Worker(workerScript, {
126+
workerData: {
127+
id: i,
128+
operations: 0,
129+
state,
130+
startFlagBuffer,
131+
totalWorkers: WORKER_COUNT,
132+
shareableMapPath,
133+
internalDeadlineMs: RW_TIMEOUT_MS
134+
}
135+
});
136+
worker.on('message', msg => {
137+
worker.terminate();
138+
resolve(msg);
139+
});
140+
worker.on('error', err => {
141+
worker.terminate();
142+
resolve({sets: 0, deletes: 0, errors: [String(err && (err.stack || err.message) || err)]});
143+
});
144+
worker.on('exit', () => { /* ignore */
145+
});
146+
})
147+
);
148+
}
149+
150+
Atomics.store(startFlag, 0, 1);
151+
Atomics.notify(startFlag, 0);
152+
const writerResult = await writerPromise;
153+
const readerResults = await Promise.all(readerPromises);
154+
155+
for (let i = 0; i < 50; i++) {
156+
const keyIndex = Math.floor(Math.random() * 2000);
157+
const val = map.get(`K${keyIndex}`);
158+
if (val !== undefined) {
159+
expect(val.startsWith('V')).toBe(true);
172160
}
173-
`;
174-
const worker = new Worker(readerWorkerCode, { eval: true, workerData: { state, startFlagBuffer } });
175-
worker.on('message', msg => { worker.terminate(); resolve(msg); });
176-
worker.on('error', err => { worker.terminate(); resolve({ sets:0, deletes:0, errors:[String(err && (err.stack||err.message)||err)] }); });
177-
worker.on('exit', code => { /* ignore */ });
178-
})
179-
);
180-
}
181-
182-
Atomics.store(startFlag, 0, 1); Atomics.notify(startFlag, 0);
183-
const writerResult = await writerPromise;
184-
const readerResults = await Promise.all(readerPromises);
185-
186-
const finalMap = revive(state);
187-
188-
// Integrity: existing keys should decode properly
189-
for (let i = 0; i < 50; i++) {
190-
const keyIndex = Math.floor(Math.random()*2000);
191-
const val = finalMap.get(`K${keyIndex}`);
192-
if (val !== undefined) {
193-
expect(val.startsWith('V')).toBe(true);
194-
}
195-
}
196-
197-
// No reader saw corrupted data
198-
readerResults.forEach(r => expect(r.errors).toHaveLength(0));
199-
expect(writerResult.errors).toHaveLength(0);
200-
}, 15000);
161+
}
162+
163+
readerResults.forEach(r => expect(r.errors).toHaveLength(0));
164+
expect(writerResult.errors).toHaveLength(0);
165+
}, RW_TIMEOUT_MS);
201166
});

0 commit comments

Comments
 (0)