Skip to content

Commit 52b3dc1

Browse files
Add socket waits for tables, multi column workflow support
1 parent 2130265 commit 52b3dc1

17 files changed

Lines changed: 1056 additions & 217 deletions

File tree

apps/realtime/src/handlers/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { setupConnectionHandlers } from '@/handlers/connection'
22
import { setupOperationsHandlers } from '@/handlers/operations'
33
import { setupPresenceHandlers } from '@/handlers/presence'
44
import { setupSubblocksHandlers } from '@/handlers/subblocks'
5+
import { setupTableHandlers } from '@/handlers/tables'
56
import { setupVariablesHandlers } from '@/handlers/variables'
67
import { setupWorkflowHandlers } from '@/handlers/workflow'
78
import type { AuthenticatedSocket } from '@/middleware/auth'
@@ -13,5 +14,6 @@ export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: IRoom
1314
setupSubblocksHandlers(socket, roomManager)
1415
setupVariablesHandlers(socket, roomManager)
1516
setupPresenceHandlers(socket, roomManager)
17+
setupTableHandlers(socket, roomManager)
1618
setupConnectionHandlers(socket, roomManager)
1719
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import { createLogger } from '@sim/logger'
2+
import type { AuthenticatedSocket } from '@/middleware/auth'
3+
import { verifyTableAccess } from '@/middleware/permissions'
4+
import { type IRoomManager, tableRoomName } from '@/rooms/types'
5+
6+
const logger = createLogger('TableHandlers')
7+
8+
/**
9+
* Wires `join-table` / `leave-table` socket events. Tables don't track presence
10+
* or last-modified state — joining is a thin wrapper around `socket.join` so the
11+
* Sim API → Realtime HTTP bridge can broadcast row updates back to subscribed clients.
12+
*/
13+
export function setupTableHandlers(socket: AuthenticatedSocket, _roomManager: IRoomManager) {
14+
socket.on('join-table', async ({ tableId }: { tableId?: string }) => {
15+
try {
16+
if (!tableId || typeof tableId !== 'string') {
17+
socket.emit('join-table-error', {
18+
tableId: tableId ?? null,
19+
error: 'tableId required',
20+
code: 'INVALID_TABLE_ID',
21+
retryable: false,
22+
})
23+
return
24+
}
25+
26+
const userId = socket.userId
27+
if (!userId) {
28+
socket.emit('join-table-error', {
29+
tableId,
30+
error: 'Authentication required',
31+
code: 'AUTHENTICATION_REQUIRED',
32+
retryable: false,
33+
})
34+
return
35+
}
36+
37+
const { hasAccess } = await verifyTableAccess(userId, tableId)
38+
if (!hasAccess) {
39+
socket.emit('join-table-error', {
40+
tableId,
41+
error: 'Access denied to table',
42+
code: 'ACCESS_DENIED',
43+
retryable: false,
44+
})
45+
return
46+
}
47+
48+
const room = tableRoomName(tableId)
49+
socket.join(room)
50+
socket.emit('join-table-success', { tableId, socketId: socket.id })
51+
logger.debug(`Socket ${socket.id} (user ${userId}) joined ${room}`)
52+
} catch (error) {
53+
logger.error(`Error joining table room:`, error)
54+
socket.emit('join-table-error', {
55+
tableId: null,
56+
error: 'Failed to join table',
57+
code: 'JOIN_TABLE_FAILED',
58+
retryable: true,
59+
})
60+
}
61+
})
62+
63+
socket.on('leave-table', async ({ tableId }: { tableId?: string }) => {
64+
try {
65+
if (!tableId || typeof tableId !== 'string') return
66+
const room = tableRoomName(tableId)
67+
socket.leave(room)
68+
logger.debug(`Socket ${socket.id} left ${room}`)
69+
} catch (error) {
70+
logger.error(`Error leaving table room:`, error)
71+
}
72+
})
73+
}

apps/realtime/src/middleware/permissions.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,49 @@ export async function verifyWorkflowAccess(
131131
return { hasAccess: false }
132132
}
133133
}
134+
135+
/**
136+
* Verify a user has read access to a table by virtue of workspace permission.
137+
* Mirrors `verifyWorkflowAccess` for the table-room socket join check.
138+
*/
139+
export async function verifyTableAccess(
140+
userId: string,
141+
tableId: string
142+
): Promise<{ hasAccess: boolean; workspaceId?: string }> {
143+
try {
144+
const { userTableDefinitions, permissions } = await import('@sim/db')
145+
const tableData = await db
146+
.select({ workspaceId: userTableDefinitions.workspaceId })
147+
.from(userTableDefinitions)
148+
.where(and(eq(userTableDefinitions.id, tableId), isNull(userTableDefinitions.archivedAt)))
149+
.limit(1)
150+
151+
if (!tableData.length) {
152+
logger.warn(`Table ${tableId} not found`)
153+
return { hasAccess: false }
154+
}
155+
const { workspaceId } = tableData[0]
156+
if (!workspaceId) return { hasAccess: false }
157+
158+
const [permissionRow] = await db
159+
.select({ permissionType: permissions.permissionType })
160+
.from(permissions)
161+
.where(
162+
and(
163+
eq(permissions.userId, userId),
164+
eq(permissions.entityType, 'workspace'),
165+
eq(permissions.entityId, workspaceId)
166+
)
167+
)
168+
.limit(1)
169+
170+
if (!permissionRow?.permissionType) {
171+
logger.warn(`User ${userId} has no permission for workspace ${workspaceId} (table ${tableId})`)
172+
return { hasAccess: false }
173+
}
174+
return { hasAccess: true, workspaceId }
175+
} catch (error) {
176+
logger.error(`Error verifying table access for user ${userId}, table ${tableId}:`, error)
177+
return { hasAccess: false }
178+
}
179+
}

apps/realtime/src/rooms/memory-manager.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
import { createLogger } from '@sim/logger'
22
import type { Server } from 'socket.io'
3-
import type { IRoomManager, UserPresence, UserSession, WorkflowRoom } from '@/rooms/types'
3+
import {
4+
type IRoomManager,
5+
tableRoomName,
6+
type TableRowUpdatedPayload,
7+
type UserPresence,
8+
type UserSession,
9+
type WorkflowRoom,
10+
} from '@/rooms/types'
411

512
const logger = createLogger('MemoryRoomManager')
613

@@ -255,4 +262,26 @@ export class MemoryRoomManager implements IRoomManager {
255262

256263
logger.info(`Notified ${room.users.size} users about workflow deployment change: ${workflowId}`)
257264
}
265+
266+
emitToTable<T = unknown>(tableId: string, event: string, payload: T): void {
267+
this._io.to(tableRoomName(tableId)).emit(event, payload)
268+
}
269+
270+
async handleTableRowUpdated(
271+
tableId: string,
272+
payload: TableRowUpdatedPayload
273+
): Promise<void> {
274+
this.emitToTable(tableId, 'table-row-updated', { tableId, ...payload })
275+
}
276+
277+
async handleTableRowDeleted(tableId: string, rowId: string): Promise<void> {
278+
this.emitToTable(tableId, 'table-row-deleted', { tableId, rowId })
279+
}
280+
281+
async handleTableDeleted(tableId: string): Promise<void> {
282+
logger.info(`Handling table deletion notification for ${tableId}`)
283+
this.emitToTable(tableId, 'table-deleted', { tableId, timestamp: Date.now() })
284+
// Eject sockets so they don't hold a stale room. Cross-pod safe via socket.io.
285+
await this._io.in(tableRoomName(tableId)).socketsLeave(tableRoomName(tableId))
286+
}
258287
}

apps/realtime/src/rooms/redis-manager.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
import { createLogger } from '@sim/logger'
22
import { createClient, type RedisClientType } from 'redis'
33
import type { Server } from 'socket.io'
4-
import type { IRoomManager, UserPresence, UserSession } from '@/rooms/types'
4+
import {
5+
type IRoomManager,
6+
tableRoomName,
7+
type TableRowUpdatedPayload,
8+
type UserPresence,
9+
type UserSession,
10+
} from '@/rooms/types'
511

612
const logger = createLogger('RedisRoomManager')
713

@@ -457,4 +463,26 @@ export class RedisRoomManager implements IRoomManager {
457463
const userCount = await this.getUniqueUserCount(workflowId)
458464
logger.info(`Notified ${userCount} users about workflow deployment change: ${workflowId}`)
459465
}
466+
467+
emitToTable<T = unknown>(tableId: string, event: string, payload: T): void {
468+
this._io.to(tableRoomName(tableId)).emit(event, payload)
469+
}
470+
471+
async handleTableRowUpdated(
472+
tableId: string,
473+
payload: TableRowUpdatedPayload
474+
): Promise<void> {
475+
this.emitToTable(tableId, 'table-row-updated', { tableId, ...payload })
476+
}
477+
478+
async handleTableRowDeleted(tableId: string, rowId: string): Promise<void> {
479+
this.emitToTable(tableId, 'table-row-deleted', { tableId, rowId })
480+
}
481+
482+
async handleTableDeleted(tableId: string): Promise<void> {
483+
logger.info(`Handling table deletion notification for ${tableId}`)
484+
this.emitToTable(tableId, 'table-deleted', { tableId, timestamp: Date.now() })
485+
// Eject sockets across all pods via socket.io's Redis adapter.
486+
await this._io.in(tableRoomName(tableId)).socketsLeave(tableRoomName(tableId))
487+
}
460488
}

apps/realtime/src/rooms/types.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,43 @@ export interface IRoomManager {
143143
* Handle workflow deployment change - notify users to refresh deployment state
144144
*/
145145
handleWorkflowDeployed(workflowId: string): Promise<void>
146+
147+
/**
148+
* Emit an event to all clients in a table room (`table:${tableId}`).
149+
* Tables don't track presence/last-modified state — just pub/sub.
150+
*/
151+
emitToTable<T = unknown>(tableId: string, event: string, payload: T): void
152+
153+
/**
154+
* Notify all clients in a table room of a row write (insert/update/cell-state-change).
155+
* Sim API calls this via the `/api/table-row-updated` HTTP bridge after every successful
156+
* row commit; the client merges the delta into its React Query cache.
157+
*/
158+
handleTableRowUpdated(tableId: string, payload: TableRowUpdatedPayload): Promise<void>
159+
160+
/**
161+
* Notify all clients in a table room that a row has been deleted.
162+
*/
163+
handleTableRowDeleted(tableId: string, rowId: string): Promise<void>
164+
165+
/**
166+
* Notify all clients in a table room that the table has been deleted; eject sockets.
167+
*/
168+
handleTableDeleted(tableId: string): Promise<void>
169+
}
170+
171+
/**
172+
* Payload broadcast on `table-row-updated`. Mirrors the shape of `TableRow.data` so
173+
* the client can merge directly into its React Query rows cache. `position` and
174+
* `updatedAt` are included for cache reconciliation; `data` is the full row data
175+
* (not a per-cell delta) — see plan Notes.
176+
*/
177+
export interface TableRowUpdatedPayload {
178+
rowId: string
179+
data: Record<string, unknown>
180+
position: number
181+
updatedAt: string | number
146182
}
183+
184+
/** Socket.IO room name for a table. Namespaced from workflow rooms. */
185+
export const tableRoomName = (tableId: string): string => `table:${tableId}`

apps/realtime/src/routes/http.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,46 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
150150
return
151151
}
152152

153+
// Handle table row write notifications from the Sim API
154+
if (req.method === 'POST' && req.url === '/api/table-row-updated') {
155+
try {
156+
const body = await readRequestBody(req)
157+
const { tableId, rowId, data, position, updatedAt } = JSON.parse(body)
158+
await roomManager.handleTableRowUpdated(tableId, { rowId, data, position, updatedAt })
159+
sendSuccess(res)
160+
} catch (error) {
161+
logger.error('Error handling table row update notification:', error)
162+
sendError(res, 'Failed to process table row update')
163+
}
164+
return
165+
}
166+
167+
if (req.method === 'POST' && req.url === '/api/table-row-deleted') {
168+
try {
169+
const body = await readRequestBody(req)
170+
const { tableId, rowId } = JSON.parse(body)
171+
await roomManager.handleTableRowDeleted(tableId, rowId)
172+
sendSuccess(res)
173+
} catch (error) {
174+
logger.error('Error handling table row deletion notification:', error)
175+
sendError(res, 'Failed to process table row deletion')
176+
}
177+
return
178+
}
179+
180+
if (req.method === 'POST' && req.url === '/api/table-deleted') {
181+
try {
182+
const body = await readRequestBody(req)
183+
const { tableId } = JSON.parse(body)
184+
await roomManager.handleTableDeleted(tableId)
185+
sendSuccess(res)
186+
} catch (error) {
187+
logger.error('Error handling table deletion notification:', error)
188+
sendError(res, 'Failed to process table deletion')
189+
}
190+
return
191+
}
192+
153193
res.writeHead(404, { 'Content-Type': 'application/json' })
154194
res.end(JSON.stringify({ error: 'Not found' }))
155195
}

apps/sim/app/api/table/utils.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,14 @@ export const CreateColumnSchema = z.object({
171171
.object({
172172
workflowId: z.string().min(1),
173173
dependencies: z.array(z.string()).optional(),
174-
outputPath: z.string().optional(),
174+
outputs: z
175+
.array(
176+
z.object({
177+
blockId: z.string().min(1),
178+
path: z.string().min(1),
179+
})
180+
)
181+
.optional(),
175182
})
176183
.optional(),
177184
}),
@@ -189,7 +196,14 @@ export const UpdateColumnSchema = z.object({
189196
.object({
190197
workflowId: z.string().min(1),
191198
dependencies: z.array(z.string()).optional(),
192-
outputPath: z.string().optional(),
199+
outputs: z
200+
.array(
201+
z.object({
202+
blockId: z.string().min(1),
203+
path: z.string().min(1),
204+
})
205+
)
206+
.optional(),
193207
})
194208
.optional(),
195209
}),

0 commit comments

Comments
 (0)