Skip to content

Commit 4e1cb79

Browse files
committed
Fixed issue where workers could compete over locks and break
1 parent ba797cc commit 4e1cb79

1 file changed

Lines changed: 80 additions & 79 deletions

File tree

src/map/ShareableMap.ts

Lines changed: 80 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,21 @@ export class ShareableMap<K, V> extends TransferableDataStructure {
3131
// Which position in the index array is used to check if the map is locked by other threads (=> UInt32)
3232
private static readonly INDEX_LOCK_OFFSET = 12;
3333
private static readonly INDEX_TOTAL_USED_SPACE_OFFSET = 16;
34-
// Which position in the index array is used to count the amount of held read locks (=> UInt32)
35-
private static readonly INDEX_READ_COUNT_OFFSET = 20;
34+
// Bytes 20-23 (offset 20) were previously used for a separate read-count field.
35+
// The combined lock word at INDEX_LOCK_OFFSET now encodes both the writer flag and the
36+
// active reader count in a single Int32, so this slot is reserved / unused.
3637

3738
/**
38-
* Lock states for the ShareableMap
39+
* The single lock word encoding:
40+
* -1 → exclusive write lock held
41+
* 0 → unlocked
42+
* N>0 → N active concurrent readers
43+
*
44+
* Encoding both state and reader count in one Int32 makes "check no writer +
45+
* register as reader" a single compareExchange — eliminating the TOCTOU window
46+
* that existed when a separate INDEX_READ_COUNT_OFFSET field was used.
3947
*/
40-
private static readonly LOCK_STATE = {
41-
UNLOCKED: 0, // No locks held
42-
WRITE_LOCKED: 1, // Exclusive write lock
43-
READ_LOCKED: 2 // One or more read locks
44-
};
48+
private static readonly WRITE_LOCK_VALUE = -1;
4549

4650

4751
private indexMem!: SharedArrayBuffer | ArrayBuffer;
@@ -812,45 +816,51 @@ export class ShareableMap<K, V> extends TransferableDataStructure {
812816
* Acquires a read lock on the map. Multiple readers can hold read locks simultaneously,
813817
* but no writers can access the map while any read locks are held.
814818
*
815-
* @param timeout Optional timeout in milliseconds. If not provided, will wait indefinitely.
816-
* @returns true if the lock was acquired, false if it timed out
819+
* The lock word encodes both state and reader count in a single Int32:
820+
* -1 → write lock held (readers must wait)
821+
* 0 → unlocked
822+
* N>0 → N active readers
823+
*
824+
* A CAS loop is used so that "verify no writer is present" and "register as a reader"
825+
* are a single atomic operation, eliminating any TOCTOU window.
826+
*
827+
* @param timeout Optional timeout in milliseconds. Defaults to 500ms.
828+
* @returns true if the lock was acquired
829+
* @throws if the timeout expires before the lock is acquired
817830
*/
818831
private acquireReadLock(timeout: number = 500): boolean {
819832
if (!(this.indexMem instanceof SharedArrayBuffer)) {
820-
// Locking only works with SharedArrayBuffer
821833
return true;
822834
}
823835

824836
const int32Array = new Int32Array(this.indexMem);
825-
826-
// Wait until there are no write locks
837+
const lockIdx = ShareableMap.INDEX_LOCK_OFFSET / 4;
827838
const startTime = Date.now();
828-
while (true) {
829-
// Check if there's a write lock
830-
const currentState = Atomics.load(int32Array, ShareableMap.INDEX_LOCK_OFFSET / 4);
831839

832-
if (currentState !== ShareableMap.LOCK_STATE.WRITE_LOCKED) {
833-
// No write lock, try to update the state and increment read count
834-
const readCount = Atomics.add(int32Array, ShareableMap.INDEX_READ_COUNT_OFFSET / 4, 1) + 1;
835-
836-
// If this is the first read lock, update the state
837-
if (readCount === 1) {
838-
Atomics.store(int32Array, ShareableMap.INDEX_LOCK_OFFSET / 4, ShareableMap.LOCK_STATE.READ_LOCKED);
840+
while (true) {
841+
const current = Atomics.load(int32Array, lockIdx);
842+
843+
if (current >= 0) {
844+
// No writer present. Atomically register as a reader by incrementing the
845+
// lock word from `current` to `current + 1`. If another thread changed the
846+
// value between the load and the CAS (e.g. a writer arrived, or another
847+
// reader incremented first), the CAS will fail and we retry — no window
848+
// exists for a writer to hold its lock while we incorrectly proceed.
849+
const observed = Atomics.compareExchange(int32Array, lockIdx, current, current + 1);
850+
if (observed === current) {
851+
return true; // CAS succeeded — we are now registered as a reader
839852
}
840-
841-
return true;
853+
// CAS failed: another thread raced us; spin immediately without sleeping
854+
continue;
842855
}
843856

844-
// If we have a timeout and it's expired, return false
845-
if (timeout !== undefined && (Date.now() - startTime) >= timeout) {
857+
// Write lock is held (current === WRITE_LOCK_VALUE); park until it is released
858+
const elapsed = Date.now() - startTime;
859+
if (elapsed >= timeout) {
846860
throw new Error("ShareableMap: timeout expired while waiting for read lock.");
847861
}
848862

849-
// Wait for a notification that the write lock might be released
850-
// We use "not-equal" because we want to wake up when the state changes from WRITE_LOCKED
851-
Atomics.wait(int32Array, ShareableMap.INDEX_LOCK_OFFSET / 4,
852-
ShareableMap.LOCK_STATE.WRITE_LOCKED,
853-
timeout === undefined ? Infinity : timeout - (Date.now() - startTime));
863+
Atomics.wait(int32Array, lockIdx, ShareableMap.WRITE_LOCK_VALUE, timeout - elapsed);
854864
}
855865
}
856866

@@ -863,65 +873,60 @@ export class ShareableMap<K, V> extends TransferableDataStructure {
863873
}
864874

865875
const int32Array = new Int32Array(this.indexMem);
866-
867-
// Decrement the read count
868-
const readCount = Atomics.sub(int32Array, ShareableMap.INDEX_READ_COUNT_OFFSET / 4, 1) - 1;
869-
870-
// If this was the last read lock, update the state and notify waiters
871-
if (readCount === 0) {
872-
Atomics.store(int32Array, ShareableMap.INDEX_LOCK_OFFSET / 4, ShareableMap.LOCK_STATE.UNLOCKED);
873-
// Notify all waiters that the state has changed
874-
Atomics.notify(int32Array, ShareableMap.INDEX_LOCK_OFFSET / 4, Infinity);
876+
const lockIdx = ShareableMap.INDEX_LOCK_OFFSET / 4;
877+
878+
// Decrement the reader count. Atomics.sub returns the value *before* subtraction,
879+
// so a result of 1 means the count just reached 0 — we were the last reader.
880+
const previous = Atomics.sub(int32Array, lockIdx, 1);
881+
if (previous === 1) {
882+
// No more active readers; wake any writers that are parked waiting for 0
883+
Atomics.notify(int32Array, lockIdx, Infinity);
875884
}
876885
}
877886

878887
/**
879-
* Acquires an exclusive write lock on the map. No other readers or writers can access
888+
* Acquires an exclusive write lock on the map. No readers or other writers may access
880889
* the map while a write lock is held.
881890
*
882-
* @param timeout Optional timeout in milliseconds. If not provided, will wait indefinitely.
883-
* @returns true if the lock was acquired, false if it timed out
891+
* The CAS only succeeds when the lock word is exactly 0 (fully unlocked), ensuring
892+
* that neither active readers (N > 0) nor another writer (-1) can be present.
893+
*
894+
* @param timeout Optional timeout in milliseconds. Defaults to 500ms.
895+
* @returns true if the lock was acquired
896+
* @throws if the timeout expires before the lock is acquired
884897
*/
885898
public acquireWriteLock(timeout: number = 500): boolean {
886899
if (!(this.indexMem instanceof SharedArrayBuffer)) {
887-
// Locking only works with SharedArrayBuffer
888900
return true;
889901
}
890902

891903
const int32Array = new Int32Array(this.indexMem);
892-
904+
const lockIdx = ShareableMap.INDEX_LOCK_OFFSET / 4;
893905
const startTime = Date.now();
906+
894907
while (true) {
895-
// Check if the map is currently unlocked
896-
const currentState = Atomics.load(int32Array, ShareableMap.INDEX_LOCK_OFFSET / 4);
897-
898-
if (currentState === ShareableMap.LOCK_STATE.UNLOCKED) {
899-
// Try to atomically change state from UNLOCKED to WRITE_LOCKED
900-
const exchangedValue = Atomics.compareExchange(
901-
int32Array,
902-
ShareableMap.INDEX_LOCK_OFFSET / 4,
903-
ShareableMap.LOCK_STATE.UNLOCKED,
904-
ShareableMap.LOCK_STATE.WRITE_LOCKED
905-
);
908+
// Attempt to transition from fully-unlocked (0) to write-locked (-1).
909+
// If active readers or another writer are present the CAS will fail.
910+
const observed = Atomics.compareExchange(
911+
int32Array,
912+
lockIdx,
913+
0,
914+
ShareableMap.WRITE_LOCK_VALUE
915+
);
906916

907-
// If exchangedValue is UNLOCKED, we got the lock
908-
if (exchangedValue === ShareableMap.LOCK_STATE.UNLOCKED) {
909-
return true;
910-
}
917+
if (observed === 0) {
918+
return true; // CAS succeeded — we hold the write lock
911919
}
912920

913-
// If we have a timeout and it's expired, return false
914-
if (timeout !== undefined && (Date.now() - startTime) >= timeout) {
921+
const elapsed = Date.now() - startTime;
922+
if (elapsed >= timeout) {
915923
throw new Error("ShareableMap: timeout expired while waiting for write lock.");
916924
}
917925

918-
// Wait for a notification that the lock state has changed
919-
Atomics.wait(
920-
int32Array,
921-
ShareableMap.INDEX_LOCK_OFFSET / 4,
922-
currentState,
923-
timeout === undefined ? Infinity : timeout - (Date.now() - startTime)
924-
);
926+
// Park until the lock word changes (reader leaves or writer releases).
927+
// We pass `observed` so we wake as soon as the value changes from whatever
928+
// was blocking us.
929+
Atomics.wait(int32Array, lockIdx, observed, timeout - elapsed);
925930
}
926931
}
927932

@@ -934,23 +939,19 @@ export class ShareableMap<K, V> extends TransferableDataStructure {
934939
}
935940

936941
const int32Array = new Int32Array(this.indexMem);
942+
const lockIdx = ShareableMap.INDEX_LOCK_OFFSET / 4;
937943

938-
// Set the state to UNLOCKED
939-
Atomics.store(int32Array, ShareableMap.INDEX_LOCK_OFFSET / 4, ShareableMap.LOCK_STATE.UNLOCKED);
940-
941-
// Notify all waiters that the lock has been released
942-
Atomics.notify(int32Array, ShareableMap.INDEX_LOCK_OFFSET / 4, Infinity);
944+
Atomics.store(int32Array, lockIdx, 0);
945+
Atomics.notify(int32Array, lockIdx, Infinity);
943946
}
944947

945948
/**
946-
* Initialize lock state in the reset method
947-
* Add this to your reset() method
949+
* Initialise the lock word to the unlocked state (0).
948950
*/
949951
private initializeLockState(): void {
950952
if (this.indexMem instanceof SharedArrayBuffer) {
951953
const int32Array = new Int32Array(this.indexMem);
952-
Atomics.store(int32Array, ShareableMap.INDEX_LOCK_OFFSET / 4, ShareableMap.LOCK_STATE.UNLOCKED);
953-
Atomics.store(int32Array, ShareableMap.INDEX_READ_COUNT_OFFSET / 4, 0);
954+
Atomics.store(int32Array, ShareableMap.INDEX_LOCK_OFFSET / 4, 0);
954955
}
955956
}
956957
}

0 commit comments

Comments
 (0)