Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 159 additions & 3 deletions realtime/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ export const wss = new WebSocketServer({
const ipConnections = new Map<string, number>();
const ipConnectionTimestamps = new Map<string, number[]>();
const unauthorizedAccessCooldown = new Map<string, UnauthorizedCooldownState>();
const unauthorizedCooldownExpirations: UnauthorizedCooldownExpiryEntry[] = [];
const MAX_UNAUTHORIZED_COOLDOWN_EXPIRATIONS_PER_CLEANUP = 512;
const UNAUTHORIZED_COOLDOWN_HEAP_REBUILD_MIN_SIZE = 1024;
const UNAUTHORIZED_COOLDOWN_HEAP_REBUILD_RATIO = 4;
const ROOM_ID_PATTERN = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;

interface UnauthorizedCooldownState {
Expand All @@ -102,6 +106,11 @@ interface UnauthorizedCooldownState {
suppressedAttempts: number;
}

interface UnauthorizedCooldownExpiryEntry {
key: string;
denyUntil: number;
}

interface AccessCheckData {
allowed: boolean;
accessLevel: RealtimeAccessLevel | null;
Expand Down Expand Up @@ -155,6 +164,122 @@ function buildUnauthorizedCooldownKey(roomId: string, clientIp: string, token: s
return `${roomId}:${clientIp}:${getTokenFingerprint(token)}`;
}

function swapUnauthorizedCooldownHeapEntries(a: number, b: number): void {
const tmp = unauthorizedCooldownExpirations[a];
unauthorizedCooldownExpirations[a] = unauthorizedCooldownExpirations[b];
unauthorizedCooldownExpirations[b] = tmp;
}

function siftUnauthorizedCooldownExpirationUp(index: number): void {
let current = index;

while (current > 0) {
const parent = Math.floor((current - 1) / 2);
if (
unauthorizedCooldownExpirations[parent].denyUntil <=
unauthorizedCooldownExpirations[current].denyUntil
) {
break;
}

swapUnauthorizedCooldownHeapEntries(parent, current);
current = parent;
}
}

function siftUnauthorizedCooldownExpirationDown(index: number): void {
let current = index;
const size = unauthorizedCooldownExpirations.length;

while (true) {
const left = current * 2 + 1;
const right = left + 1;
let smallest = current;

if (
left < size &&
unauthorizedCooldownExpirations[left].denyUntil <
unauthorizedCooldownExpirations[smallest].denyUntil
) {
smallest = left;
}

if (
right < size &&
unauthorizedCooldownExpirations[right].denyUntil <
unauthorizedCooldownExpirations[smallest].denyUntil
) {
smallest = right;
}

if (smallest === current) {
break;
}

swapUnauthorizedCooldownHeapEntries(current, smallest);
current = smallest;
}
}

function pushUnauthorizedCooldownExpiration(entry: UnauthorizedCooldownExpiryEntry): void {
unauthorizedCooldownExpirations.push(entry);
siftUnauthorizedCooldownExpirationUp(unauthorizedCooldownExpirations.length - 1);
}

function peekUnauthorizedCooldownExpiration(): UnauthorizedCooldownExpiryEntry | null {
return unauthorizedCooldownExpirations[0] ?? null;
}

function popUnauthorizedCooldownExpiration(): UnauthorizedCooldownExpiryEntry | null {
if (unauthorizedCooldownExpirations.length === 0) {
return null;
}

const root = unauthorizedCooldownExpirations[0];
const last = unauthorizedCooldownExpirations.pop();
if (unauthorizedCooldownExpirations.length > 0 && last) {
unauthorizedCooldownExpirations[0] = last;
siftUnauthorizedCooldownExpirationDown(0);
}

return root;
}

function rebuildUnauthorizedCooldownExpirationHeap(): void {
unauthorizedCooldownExpirations.length = 0;

for (const [key, state] of unauthorizedAccessCooldown.entries()) {
unauthorizedCooldownExpirations.push({
key,
denyUntil: state.denyUntil,
});
}

for (let i = Math.floor(unauthorizedCooldownExpirations.length / 2) - 1; i >= 0; i -= 1) {
siftUnauthorizedCooldownExpirationDown(i);
}
}

function maybeRebuildUnauthorizedCooldownExpirationHeap(): void {
if (unauthorizedCooldownExpirations.length < UNAUTHORIZED_COOLDOWN_HEAP_REBUILD_MIN_SIZE) {
return;
}

if (unauthorizedAccessCooldown.size === 0) {
unauthorizedCooldownExpirations.length = 0;
return;
}

if (
unauthorizedCooldownExpirations.length <=
unauthorizedAccessCooldown.size * UNAUTHORIZED_COOLDOWN_HEAP_REBUILD_RATIO
) {
return;
}

rebuildUnauthorizedCooldownExpirationHeap();
}

function getUnauthorizedCooldownState(key: string, now: number): UnauthorizedCooldownState | null {
const existingState = unauthorizedAccessCooldown.get(key);
if (!existingState) {
Expand All @@ -173,6 +298,7 @@ function trackUnauthorizedAccess(key: string, now: number): UnauthorizedCooldown
const existingState = getUnauthorizedCooldownState(key, now);
if (existingState) {
existingState.denyUntil = now + config.unauthorizedAccessCooldownMs;
pushUnauthorizedCooldownExpiration({ key, denyUntil: existingState.denyUntil });
return existingState;
Comment thread
santhoshh-kumar marked this conversation as resolved.
}

Expand All @@ -183,6 +309,7 @@ function trackUnauthorizedAccess(key: string, now: number): UnauthorizedCooldown
};

unauthorizedAccessCooldown.set(key, nextState);
pushUnauthorizedCooldownExpiration({ key, denyUntil: nextState.denyUntil });
return nextState;
Comment thread
santhoshh-kumar marked this conversation as resolved.
}

Expand Down Expand Up @@ -222,11 +349,40 @@ function logUnauthorizedRejection(
}

function cleanupExpiredUnauthorizedCooldown(now = Date.now()): void {
for (const [key, state] of unauthorizedAccessCooldown.entries()) {
if (state.denyUntil <= now) {
unauthorizedAccessCooldown.delete(key);
let processedEntries = 0;

while (processedEntries < MAX_UNAUTHORIZED_COOLDOWN_EXPIRATIONS_PER_CLEANUP) {
const nextExpiration = peekUnauthorizedCooldownExpiration();
if (!nextExpiration || nextExpiration.denyUntil > now) {
break;
}

const expiredEntry = popUnauthorizedCooldownExpiration();
if (!expiredEntry) {
break;
}

processedEntries += 1;

const existingState = unauthorizedAccessCooldown.get(expiredEntry.key);
if (!existingState) {
continue;
}

// The key may have been renewed after this heap entry was pushed.
if (existingState.denyUntil !== expiredEntry.denyUntil) {
continue;
}

unauthorizedAccessCooldown.delete(expiredEntry.key);
}

if (unauthorizedAccessCooldown.size === 0) {
unauthorizedCooldownExpirations.length = 0;
return;
}

maybeRebuildUnauthorizedCooldownExpirationHeap();
}

async function fetchAccess(token: string, roomId: string): Promise<AccessCheckData | null> {
Expand Down
55 changes: 55 additions & 0 deletions realtime/tests/unit/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const waitForConnectionProcessing = async () => {
describe('Server', () => {
let server: any;
let wss: any;
let cleanupInactiveRooms: any;
let setupWSConnectionMock: any;
let memoryUsageSpy: any;
let fetchMock: jest.MockedFunction<typeof fetch>;
Expand Down Expand Up @@ -103,6 +104,7 @@ describe('Server', () => {
const serverModule = await import('../../src/server');
server = serverModule.server;
wss = serverModule.wss;
cleanupInactiveRooms = serverModule.cleanupInactiveRooms;

const yjsUtilsModule = await import('../../src/yjs-utils');
setupWSConnectionMock = yjsUtilsModule.setupWSConnection;
Expand Down Expand Up @@ -263,6 +265,59 @@ describe('Server', () => {
}
});

it('should not clear renewed cooldown state when stale expirations are cleaned', async () => {
jest.useFakeTimers();

try {
fetchMock.mockResolvedValue({
ok: true,
json: async () => ({
success: true,
data: {
allowed: false,
accessLevel: null,
owner: false,
},
error: null,
}),
} as Response);

const firstConn: any = new EventEmitter();
firstConn.close = jest.fn();
firstConn.readyState = WebSocket.OPEN;
wss.emit('connection', firstConn, mockReq);
await waitForConnectionProcessing();

expect(firstConn.close).toHaveBeenCalledWith(1008, 'Access denied');
expect(fetchMock).toHaveBeenCalledTimes(1);

await jest.advanceTimersByTimeAsync(15001);

const secondConn: any = new EventEmitter();
secondConn.close = jest.fn();
secondConn.readyState = WebSocket.OPEN;
wss.emit('connection', secondConn, mockReq);
await waitForConnectionProcessing();

expect(secondConn.close).toHaveBeenCalledWith(1008, 'Access denied');
expect(fetchMock).toHaveBeenCalledTimes(2);

cleanupInactiveRooms();

const thirdConn: any = new EventEmitter();
thirdConn.close = jest.fn();
thirdConn.readyState = WebSocket.OPEN;
wss.emit('connection', thirdConn, mockReq);
await waitForConnectionProcessing();

// Third attempt is still blocked by the renewed cooldown, so no new access check runs.
expect(thirdConn.close).toHaveBeenCalledWith(1008, 'Access denied');
expect(fetchMock).toHaveBeenCalledTimes(2);
} finally {
jest.useRealTimers();
}
});

it('should handle synchronous error in setupWSConnection', async () => {
// Mock setupWSConnection to throw synchronously
setupWSConnectionMock.mockImplementationOnce(() => {
Expand Down
63 changes: 39 additions & 24 deletions web/services/indexed-db.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const DOCUMENTS_STORE = 'documents';
class IndexedDBService {
private static instance: IndexedDBService;
private dbPromise: Promise<IDBPDatabase> | null = null;
private dbClosingPromise: Promise<void> | null = null;
private dbSwitchPromise: Promise<void> = Promise.resolve();
private isSupported: boolean = true;
private currentUserId: string | null = null;

Expand Down Expand Up @@ -35,22 +35,42 @@ class IndexedDBService {

this.currentUserId = userId;

if (this.dbPromise && !this.dbClosingPromise) {
const dbPromiseToClose = this.dbPromise;
this.dbSwitchPromise = this.dbSwitchPromise
.then(async () => {
await this.closeCurrentConnection();
})
.catch((error) => {
console.warn('Failed to process IndexedDB user switch:', error);
});
}

private async awaitStableUserSwitch(): Promise<void> {
while (true) {
const inFlightSwitch = this.dbSwitchPromise;
await inFlightSwitch;

this.dbClosingPromise = dbPromiseToClose
.then((db) => {
db.close();
})
.catch((error) => {
console.warn('Failed to close IndexedDB connection during user switch:', error);
})
.finally(() => {
if (this.dbPromise === dbPromiseToClose) {
this.dbPromise = null;
}
this.dbClosingPromise = null;
});
// A newer switch may have been queued while awaiting this one.
if (inFlightSwitch === this.dbSwitchPromise) {
return;
}
}
}

private async closeCurrentConnection(): Promise<void> {
const dbPromiseToClose = this.dbPromise;
if (!dbPromiseToClose) {
return;
}

try {
const db = await dbPromiseToClose;
db.close();
} catch (error) {
console.warn('Failed to close IndexedDB connection during user switch:', error);
} finally {
if (this.dbPromise === dbPromiseToClose) {
this.dbPromise = null;
}
}
}

Expand All @@ -59,9 +79,7 @@ class IndexedDBService {
throw new Error('IndexedDB not supported');
}

if (this.dbClosingPromise) {
await this.dbClosingPromise;
}
await this.awaitStableUserSwitch();

if (!this.dbPromise) {
this.dbPromise = this.initDB();
Expand Down Expand Up @@ -208,11 +226,8 @@ class IndexedDBService {

public async wipeDatabase(): Promise<void> {
try {
if (this.dbPromise) {
const db = await this.dbPromise;
db.close();
this.dbPromise = null;
}
await this.awaitStableUserSwitch();
await this.closeCurrentConnection();
await deleteDB(this.dbName);
} catch (error) {
console.error('Failed to wipe database:', error);
Expand Down
Loading