Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions backend/selective-entry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,7 @@ export default {
request.headers.get("upgrade") === "websocket"
) {
console.log("🔄 Routing to LiveStore sync handler on", request.url);
return syncHandler.fetch(
request as unknown as Request,
env,
ctx
) as unknown as WorkerResponse;
return syncHandler.fetch(request, env, ctx) as unknown as WorkerResponse;
}

// Route 3: API routes → Hono app
Expand Down
150 changes: 87 additions & 63 deletions backend/sync.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,65 @@
import {
makeDurableObject,
handleWebSocket,
} from "@livestore/sync-cf/cf-worker";
import { type Env, type ExecutionContext } from "./types";
import { makeDurableObject, makeWorker } from "@livestore/sync-cf/cf-worker";

import { getValidatedUser } from "./auth";
import { Schema } from "@runtimed/schema";

export class WebSocketServer extends makeDurableObject({
onPush: async (message) => {
console.log("onPush", message.batch);
storage: {
_tag: "d1",
binding: "DB",
},
onPull: async (message) => {
console.log("onPull", message);
onPush: async (message, { payload, storeId }) => {
try {
const decodedPayload = decodePayload(payload);
// Note: env is not available in onPush context, so we skip full auth validation here
// This is a limitation of the current LiveStore sync-cf API
console.log("📝 Push received:", {
storeId,
eventCount: message.batch.length,
hasPayload: !!payload,
});

// For now, we'll do basic payload validation without full auth
if (!decodedPayload.authToken) {
throw new Error("AuthToken is required");
}

// Log the type of client
if (decodedPayload?.runtime === true) {
console.log("📝 Runtime agent push:", {
runtimeId: decodedPayload.runtimeId,
storeId,
eventCount: message.batch.length,
});
} else {
console.log("📝 User push:", {
storeId,
eventCount: message.batch.length,
});
}
} catch (error: any) {
console.error("🚫 Push authentication failed:", error.message);
throw error;
}
},
onPull: async (_message, { payload, storeId }) => {
try {
const decodedPayload = decodePayload(payload);

// Note: env is not available in onPull context, so we skip full auth validation here
console.log("📝 Pull request:", {
storeId,
isRuntime: decodedPayload?.runtime === true,
hasPayload: !!payload,
});

// Basic payload validation
if (!decodedPayload.authToken) {
throw new Error("AuthToken is required");
}
} catch (error: any) {
console.error("🚫 Pull validation failed:", error.message);
throw error;
}
},
}) {}

Expand All @@ -39,60 +86,37 @@ const SyncPayloadSchema = Schema.Union(

const decodePayload = Schema.decodeUnknownSync(SyncPayloadSchema);

export default {
fetch: async (request: Request, env: Env, ctx: ExecutionContext) => {
const url = new URL(request.url);

const pathname = url.pathname;
export default makeWorker({
syncBackendBinding: "SYNC_BACKEND_DO",
validatePayload: async (rawPayload: unknown, { storeId }) => {
try {
const payload = decodePayload(rawPayload);

if (!pathname.startsWith("/livestore")) {
return new Response("Invalid request", { status: 400 });
}

return handleWebSocket(request, env, ctx, {
validatePayload: async (rawPayload) => {
try {
const payload = decodePayload(rawPayload);
let validatedUser = await getValidatedUser(payload.authToken, env);

if (!validatedUser) {
throw new Error("User must be authenticated");
}

// User identity is validated via JWT token
// LiveStore will manage clientId for device/app instance identification
if (payload?.runtime === true) {
// For runtime agents with full payload
console.log("✅ Runtime agent authenticated:", {
runtimeId: payload.runtimeId,
sessionId: payload.sessionId,
userId: payload.userId,
validatedUserId: validatedUser.id,
});
// Basic payload structure validation - detailed auth happens in Durable Object
if (!payload.authToken || typeof payload.authToken !== "string") {
throw new Error("Valid authToken is required");
}

// Verify that the runtime's claimed userId matches the authenticated user
if (payload.userId !== validatedUser.id) {
throw new Error(
`Runtime userId mismatch: payload claims ${payload.userId}, but token is for ${validatedUser.id}`
);
}
} else {
// For regular users
console.log("✅ Authenticated user:", {
userId: validatedUser.id,
});
}

// SECURITY NOTE: This validation only occurs at connection time.
// The current version of `@livestore/sync-cf` does not provide a mechanism
// to verify that the `clientId` on incoming events matches the `clientId`
// that was validated with this initial connection payload. A malicious
// client could pass this check and then send events with a different clientId.
} catch (error: any) {
console.error("🚫 Authentication failed:", error.message);
throw error; // Reject the WebSocket connection
if (payload?.runtime === true) {
// For runtime agents, require additional fields
if (!payload.runtimeId || !payload.sessionId || !payload.userId) {
throw new Error(
"Runtime agents require runtimeId, sessionId, and userId"
);
}
},
});
console.log(
"📝 Runtime agent payload structure valid for store:",
storeId
);
} else {
console.log("📝 User payload structure valid for store:", storeId);
}

// Full authentication happens in onPush/onPull handlers where Env is available
} catch (error: any) {
console.error("🚫 Payload validation failed:", error.message);
throw error;
}
},
};
enableCORS: true,
});
2 changes: 1 addition & 1 deletion backend/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export type Env = {
SERVICE_PROVIDER?: string; // "local" | "anaconda"

// Bindings from the original sync worker configuration
WEBSOCKET_SERVER: DurableObjectNamespace;
SYNC_BACKEND_DO: DurableObjectNamespace;
DB: D1Database;

// Secrets
Expand Down
27 changes: 10 additions & 17 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@
"@japikey/cloudflare": "^0.4.0",
"@japikey/japikey": "^0.4.0",
"@japikey/shared": "^0.4.0",
"@livestore/adapter-web": "^0.3.1",
"@livestore/livestore": "^0.3.1",
"@livestore/react": "^0.3.1",
"@livestore/sync-cf": "^0.3.1",
"@livestore/wa-sqlite": "1.0.5-dev.2",
"@livestore/webmesh": "^0.3.1",
"@livestore/adapter-web": "v0.4.0-dev.10",
"@livestore/livestore": "v0.4.0-dev.10",
"@livestore/react": "v0.4.0-dev.10",
"@livestore/sync-cf": "v0.4.0-dev.10",
"@livestore/wa-sqlite": "v0.4.0-dev.10",
"@livestore/webmesh": "v0.4.0-dev.10",
"@microlink/react-json-view": "^1.26.2",
"@overengineering/fps-meter": "^0.1.2",
"@radix-ui/react-avatar": "^1.1.10",
Expand Down Expand Up @@ -99,7 +99,7 @@
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"date-fns": "^4.1.0",
"effect": "3.15.5",
"effect": "^3.18.4",
"fractional-indexing": "^3.2.0",
"geojson-map-fit-mercator": "^1.1.0",
"hono": "^4.9.1",
Expand Down Expand Up @@ -139,8 +139,8 @@
"@cloudflare/workers-types": "^4.20250813.0",
"@effect/vitest": "0.23.7",
"@eslint/js": "^9.30.1",
"@livestore/adapter-node": "^0.3.1",
"@livestore/devtools-vite": "^0.3.1",
"@livestore/adapter-node": "v0.4.0-dev.10",
"@livestore/devtools-vite": "v0.4.0-dev.10",
"@tailwindcss/cli": "^4.1.10",
"@tailwindcss/postcss": "^4.1.10",
"@tailwindcss/typography": "^0.5.16",
Expand Down Expand Up @@ -178,15 +178,8 @@
},
"pnpm": {
"overrides": {
"effect": "3.15.5",
"react": "19.0.0",
"react-dom": "19.0.0",
"@effect/platform": "0.82.4",
"@effect/typeclass": "0.34.2",
"@effect/cluster": "0.34.2",
"@effect/experimental": "0.46.8",
"@effect/sql": "0.35.8",
"@effect/rpc": "0.59.9"
"react-dom": "19.0.0"
},
"onlyBuiltDependencies": [
"@parcel/watcher",
Expand Down
3 changes: 1 addition & 2 deletions packages/agent-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
"format:check": "prettier --check ."
},
"dependencies": {
"@livestore/livestore": "^0.3.1",
"effect": "3.15.5",
"@livestore/livestore": "v0.4.0-dev.10",
"zod": "^4.0.17",
"@opentelemetry/api": "^1.9.0"
},
Expand Down
3 changes: 1 addition & 2 deletions packages/ai-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
"format:check": "prettier --check ."
},
"dependencies": {
"@livestore/livestore": "^0.3.1",
"@livestore/livestore": "v0.4.0-dev.10",
"@opentelemetry/api": "^1.9.0",
"effect": "3.15.5",
"ollama": "^0.5.18",
"openai": "^5.22.0",
"zod": "^4.0.17"
Expand Down
5 changes: 3 additions & 2 deletions packages/pyodide-runtime/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,9 @@ export class PyodideRuntimeAgent extends LocalRuntimeAgent {
currentCellRef
);

const maxAiIterations: number =
parseInt(this.agent.store.query(maxAiIterations$)) || 10;
const maxAiIterations: number = parseInt(
this.agent.store.query(maxAiIterations$) || "10"
);

// Track AI execution for cancellation
const aiAbortController = new AbortController();
Expand Down
3 changes: 1 addition & 2 deletions packages/schema/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
"format:check": "prettier --check ."
},
"dependencies": {
"@livestore/livestore": "^0.3.1",
"effect": "3.15.5",
"@livestore/livestore": "v0.4.0-dev.10",
"zod": "^4.0.17"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/schema/src/queries/ai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ export const maxAiIterations$ = queryDb(
tables.notebookMetadata
.select("value")
.where("key", "=", "max_ai_iterations")
.first({ fallback: () => "10" })
.first({ behaviour: "fallback", fallback: () => "10" })
);
10 changes: 5 additions & 5 deletions packages/schema/src/queries/cellOrdering.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const firstCell$ = queryDb(
tables.cells
.select("id", "fractionalIndex")
.orderBy("fractionalIndex", "asc")
.first({ fallback: () => null }),
.first({ behaviour: "fallback", fallback: () => null }),
{ label: "cells.first" }
);

Expand All @@ -37,7 +37,7 @@ export const lastCell$ = queryDb(
tables.cells
.select("id", "fractionalIndex")
.orderBy("fractionalIndex", "desc")
.first({ fallback: () => null }),
.first({ behaviour: "fallback", fallback: () => null }),
{ label: "cells.last" }
);

Expand Down Expand Up @@ -88,7 +88,7 @@ export const getAdjacentCells = (cellId: string, fractionalIndex: string) => {
.select("id", "fractionalIndex")
.where("fractionalIndex", "<", fractionalIndex)
.orderBy("fractionalIndex", "desc")
.first({ fallback: () => null }),
.first({ behaviour: "fallback", fallback: () => null }),
{
deps: [cellId, fractionalIndex],
label: `cells.previous.${cellId}`,
Expand All @@ -100,7 +100,7 @@ export const getAdjacentCells = (cellId: string, fractionalIndex: string) => {
.select("id", "fractionalIndex")
.where("fractionalIndex", ">", fractionalIndex)
.orderBy("fractionalIndex", "asc")
.first({ fallback: () => null }),
.first({ behaviour: "fallback", fallback: () => null }),
{
deps: [cellId, fractionalIndex],
label: `cells.next.${cellId}`,
Expand All @@ -116,7 +116,7 @@ export const cellPositionInfo = (cellId: string) =>
tables.cells
.select("id", "fractionalIndex")
.where({ id: cellId })
.first({ fallback: () => null }),
.first({ behaviour: "fallback", fallback: () => null }),
{
deps: [cellId],
label: `cells.positionInfo.${cellId}`,
Expand Down
2 changes: 2 additions & 0 deletions packages/schema/src/queries/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export const cellFractionalIndex = (cellId: string) =>
.select("fractionalIndex")
.where({ id: cellId })
.first({
behaviour: "fallback",
fallback: () => null,
}),
{
Expand All @@ -50,6 +51,7 @@ export const cellQuery = {
.select()
.where({ id: cellId })
.first({
behaviour: "fallback",
fallback: () => null,
}),
{
Expand Down
Loading