Skip to content

Commit c40fa93

Browse files
feat: use Sync Streams
# Sync Stream Migration Notes ## Motivation This commit moves the E2EE chat demo from bucket-based sync rules to Sync Streams. The old buckets made every membership-derived dataset fan out automatically, which over-fetched encrypted payloads and forced the client to juggle parameter state. Streams let us subscribe only to the data that matters, lean on edition 2 features such as `waitForStream`, and keep the SQL definitions closer to the row-level security policies already in place. ## High-Level Changes **PowerSync configuration.** `packages/e2ee-chat/infra/powersync/sync_rules.yaml` was replaced by `packages/e2ee-chat/infra/powersync/sync_streams.yaml`. Auto-subscribed streams now serve vault keys, identity keypairs, membership rows, room metadata, and wrapped room keys, while parameterized streams (`room_members`, `room_messages`) accept a `room_id` so the client only syncs an active room. **Frontend integration.** The frontend now depends on `@powersync/common@1.39.0`, `@powersync/react@1.8.0`, and `@powersync/web@1.27.0`. `useQuery` calls pass stream descriptors (for example `USER_VAULT_KEYS_STREAM` and `ROOM_DETAILS_STREAM`) together with `waitForStream: true`, ensuring the initial dataset arrives before the UI renders. `useStatus` tracks `hasSynced` to gate decryption work until a stream confirms its first batch. **Developer experience.** Stream SQL can use subqueries to double-check membership before delivering rows—something bucket parameters could not express—so the security posture matches our expectations while the client code becomes simpler. ## Opportunities Streams expose lifecycle control that we can surface in the product. We can preload priority rooms or defer low-traffic ones to trim bandwidth, bolt on new capabilities like presence or typing indicators as additional streams, and wire `useStatus().forStream(...)` into tooling to highlight sync health. Because the SQL now lives in a single stream file, evolving RLS filters or role-based access checks should stay straightforward.
1 parent 8cfcbec commit c40fa93

6 files changed

Lines changed: 197 additions & 83 deletions

File tree

packages/e2ee-chat/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ Run the frontend at the printed URL once the dev server starts.
5151
-- You can scope this to specific tables if needed, but the publication name must stay "powersync".
5252
CREATE PUBLICATION powersync FOR ALL TABLES;
5353
```
54-
4. Copy `infra/powersync/sync_rules.yaml` into your PowerSync dashboard so the client can sync the encrypted tables.
54+
4. Copy `infra/powersync/sync_streams.yaml` into your PowerSync dashboard so the client can sync the encrypted tables using sync streams.
5555
5. Populate `frontend/.env.local` with the Supabase URL and anon key from the dashboard (see Quickstart above).
5656

5757
If you ever need a fresh database during development, run `pnpm --filter @app/chat-e2ee migrate:reset` to drop and reapply the schema locally. To repair discrepancies between your migrations and the remote database, use the Supabase CLI’s `migration repair` command as documented by Supabase.
@@ -91,14 +91,14 @@ If you ever need a fresh database during development, run `pnpm --filter @app/ch
9191
- **Anonymous sessions** – Enable the Supabase Anonymous provider and the launch screen shows a "Continue as guest" button. Guest users still unlock a local vault, but their messages display the Supabase user UUID unless you add a dedicated `sender_id` column to your schema.
9292
- **Mirrors**`startChatMirrors` decrypts encrypted rows per-room, writing plaintext representations into `chat_rooms_plain` and `chat_messages_plain` so the UI can query unencrypted data locally.
9393

94-
## Schema & sync rules
94+
## Schema & sync streams
9595

9696
- Supabase schema lives at `packages/e2ee-chat/infra/schema.sql`.
97-
- PowerSync sync rules live at `packages/e2ee-chat/infra/powersync/sync_rules.yaml` and replicate:
98-
- The personal vault tables (`chat_e2ee_keys`, `chat_identity_*`).
99-
- Per-room buckets: encrypted rooms, messages, membership, and the wrapped room keys for the current user.
97+
- PowerSync sync streams live at `packages/e2ee-chat/infra/powersync/sync_streams.yaml` and configure:
98+
- Auto-subscribed streams for personal vault tables (`chat_e2ee_keys`, `chat_identity_*`), room metadata, membership records, and wrapped keys.
99+
- Parameterized streams (`room_members`, `room_messages`) that the client subscribes to on demand when a room becomes active.
100100

101-
Run the provided Supabase scripts (see `package.json` scripts) to push the schema to your project. Use the PowerSync dashboard to paste the sync rules.
101+
Run the provided Supabase scripts (see `package.json` scripts) to push the schema to your project. Use the PowerSync dashboard to paste the sync streams configuration.
102102

103103
After editing `infra/schema.sql`, generate and push a fresh migration:
104104

packages/e2ee-chat/frontend/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
"@crypto/webauthn": "workspace:*",
3030
"@heroicons/react": "^2.2.0",
3131
"@journeyapps/wa-sqlite": "^1.3.1",
32-
"@powersync/common": "^1.38.1",
33-
"@powersync/react": "^1.7.4",
34-
"@powersync/web": "^1.26.2",
32+
"@powersync/common": "^1.39.0",
33+
"@powersync/react": "^1.8.0",
34+
"@powersync/web": "^1.27.0",
3535
"@supabase/supabase-js": "^2.45.0",
3636
"libsodium-wrappers-sumo": "^0.7.13",
3737
"react": "19.1.1",

packages/e2ee-chat/frontend/src/App.tsx

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { useEffect, useMemo, useState } from "react";
2-
import { usePowerSync, useQuery } from "@powersync/react";
2+
import { usePowerSync, useQuery, useStatus } from "@powersync/react";
33
import { createPasswordCrypto } from "@crypto/password";
44
import {
55
createDEKCrypto,
@@ -74,21 +74,30 @@ function useSupabaseUser(): {
7474
}
7575

7676
function useVaultProviders(userId: string | null) {
77-
const { data } = useQuery(
77+
const { data, isLoading } = useQuery(
7878
"SELECT provider FROM chat_e2ee_keys WHERE user_id = ?",
7979
[userId ?? ""],
80-
{ throttleMs: 150 },
80+
{
81+
throttleMs: 150,
82+
streams: userId
83+
? [{ ...USER_VAULT_KEYS_STREAM, waitForStream: true }]
84+
: undefined,
85+
},
8186
);
8287
return useMemo(() => {
88+
if (isLoading) {
89+
return { haveAny: false, havePassword: false, loading: true };
90+
}
8391
const rows = Array.isArray(data)
8492
? (data as Array<{ provider: string }>)
8593
: [];
8694
const providers = new Set(rows.map((r) => r.provider));
8795
return {
8896
haveAny: providers.size > 0,
8997
havePassword: providers.has("password"),
98+
loading: false,
9099
};
91-
}, [data]);
100+
}, [data, isLoading]);
92101
}
93102

94103
type RoomPlain = {
@@ -127,10 +136,18 @@ type RoomKeyRow = {
127136
kdf_salt_b64: string;
128137
};
129138

139+
const USER_VAULT_KEYS_STREAM = {
140+
name: "user_vault_keys",
141+
parameters: undefined,
142+
} as const;
143+
const ROOM_KEYS_STREAM = { name: "room_keys", parameters: undefined } as const;
144+
const ROOM_DETAILS_STREAM = { name: "room_details", parameters: undefined } as const;
145+
130146
export default function App() {
131147
const db = usePowerSync();
132148
const [dbReady, setDbReady] = useState(false);
133149
const { userId, authEvent } = useSupabaseUser();
150+
const status = useStatus();
134151
const providers = useVaultProviders(userId);
135152

136153
if (typeof window !== "undefined" && import.meta.env.MODE !== "production") {
@@ -147,7 +164,6 @@ export default function App() {
147164
const [activeRoomId, setActiveRoomId] = useState<string | null>(null);
148165
const [pendingRoomSelection, setPendingRoomSelection] =
149166
useState<string | null>(null);
150-
const [mirrorsStarted, setMirrorsStarted] = useState(false);
151167
const [passwordResetPending, setPasswordResetPending] = useState(false);
152168
const [guestSupported, setGuestSupported] = useState(() =>
153169
isAnonymousSupported(),
@@ -212,10 +228,8 @@ export default function App() {
212228
},
213229
},
214230
);
215-
setMirrorsStarted(true);
216231
return () => {
217232
stop?.();
218-
setMirrorsStarted(false);
219233
};
220234
}, [db, userId, dataCrypto, roomProviders]);
221235

@@ -238,7 +252,12 @@ export default function App() {
238252
const { data: roomKeyRows } = useQuery(
239253
"SELECT * FROM chat_room_keys WHERE user_id = ?",
240254
[userId ?? ""],
241-
{ throttleMs: 250 },
255+
{
256+
throttleMs: 250,
257+
streams: userId
258+
? [{ ...ROOM_KEYS_STREAM, waitForStream: true }]
259+
: undefined,
260+
},
242261
);
243262

244263
useEffect(() => {
@@ -305,7 +324,12 @@ export default function App() {
305324
? `SELECT * FROM ${ROOMS_MIRROR_TABLE} ORDER BY updated_at DESC`
306325
: "SELECT NULL as id WHERE 1 = 0";
307326

308-
const roomsQuery = useQuery(roomsSql, [], { throttleMs: 200 });
327+
const roomsQuery = useQuery(roomsSql, [], {
328+
throttleMs: 200,
329+
streams: dbReady
330+
? [{ ...ROOM_DETAILS_STREAM, waitForStream: true }]
331+
: undefined,
332+
});
309333

310334
const roomsData = useMemo(() => {
311335
if (roomsQuery?.error) {
@@ -368,10 +392,31 @@ export default function App() {
368392
}
369393
}, [rooms, activeRoomId, roomKeys, pendingRoomSelection]);
370394

395+
const roomMessagesStreamDescriptor = useMemo(
396+
() =>
397+
activeRoomId
398+
? { name: "room_messages", parameters: { room_id: activeRoomId } }
399+
: null,
400+
[activeRoomId],
401+
);
402+
403+
const roomMembersStreamDescriptor = useMemo(
404+
() =>
405+
activeRoomId
406+
? { name: "room_members", parameters: { room_id: activeRoomId } }
407+
: null,
408+
[activeRoomId],
409+
);
410+
371411
const { data: messagesData } = useQuery(
372412
`SELECT * FROM ${MESSAGES_MIRROR_TABLE} WHERE room_id = ? ORDER BY sent_at ASC`,
373413
[activeRoomId ?? ""],
374-
{ throttleMs: 120 },
414+
{
415+
throttleMs: 120,
416+
streams: roomMessagesStreamDescriptor
417+
? [{ ...roomMessagesStreamDescriptor, waitForStream: true }]
418+
: undefined,
419+
},
375420
);
376421
const remoteMessages: MessagePlain[] = useMemo(() => {
377422
if (!Array.isArray(messagesData)) return [];
@@ -407,7 +452,12 @@ export default function App() {
407452
const { data: membershipData } = useQuery(
408453
"SELECT * FROM chat_room_members WHERE room_id = ? ORDER BY joined_at ASC",
409454
[activeRoomId ?? ""],
410-
{ throttleMs: 300 },
455+
{
456+
throttleMs: 300,
457+
streams: roomMembersStreamDescriptor
458+
? [{ ...roomMembersStreamDescriptor, waitForStream: true }]
459+
: undefined,
460+
},
411461
);
412462
const members: MemberPlain[] = useMemo(() => {
413463
if (!Array.isArray(membershipData)) return [];
@@ -420,6 +470,16 @@ export default function App() {
420470
}));
421471
}, [membershipData]);
422472

473+
const roomsStreamStatus = status.forStream(ROOM_DETAILS_STREAM);
474+
const activeRoomStreamStatus = roomMessagesStreamDescriptor
475+
? status.forStream(roomMessagesStreamDescriptor)
476+
: null;
477+
const mirrorsStarted = dataCrypto
478+
? (roomsStreamStatus?.subscription?.hasSynced ?? false) &&
479+
(!roomMessagesStreamDescriptor ||
480+
(activeRoomStreamStatus?.subscription?.hasSynced ?? false))
481+
: false;
482+
423483
const canSendToActiveRoom = activeRoomId ? roomKeys.has(activeRoomId) : false;
424484

425485
const handleSignIn = async (email: string, password: string) => {
@@ -715,6 +775,13 @@ export default function App() {
715775
}
716776

717777
if (!dataCrypto) {
778+
if (providers.loading) {
779+
return (
780+
<div className="min-h-screen bg-slate-950/5 flex items-center justify-center">
781+
<div className="text-sm text-slate-500">Preparing encrypted vault…</div>
782+
</div>
783+
);
784+
}
718785
return (
719786
<VaultScreen
720787
hasVault={providers.haveAny}

packages/e2ee-chat/infra/powersync/sync_rules.yaml

Lines changed: 0 additions & 46 deletions
This file was deleted.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# PowerSync Sync Streams for the E2EE Chat demo.
2+
#
3+
# The configuration combines auto-subscribed streams for user-specific metadata
4+
# with parameterized streams that are activated on demand by the client when a
5+
# room is opened. This keeps sensitive encrypted payloads scoped to rooms the
6+
# user is actively viewing while still syncing key material needed for cryptography.
7+
8+
streams:
9+
user_vault_keys:
10+
query: |
11+
SELECT *
12+
FROM public.chat_e2ee_keys
13+
WHERE user_id = auth.user_id()
14+
auto_subscribe: true
15+
16+
identity_private_keys:
17+
query: |
18+
SELECT *
19+
FROM public.chat_identity_private_keys
20+
WHERE user_id = auth.user_id()
21+
auto_subscribe: true
22+
23+
identity_public_keys:
24+
query: |
25+
SELECT *
26+
FROM public.chat_identity_public_keys
27+
auto_subscribe: true
28+
29+
membership_records:
30+
query: |
31+
SELECT *
32+
FROM public.chat_room_members
33+
WHERE user_id = auth.user_id()
34+
auto_subscribe: true
35+
36+
room_details:
37+
query: |
38+
SELECT *
39+
FROM public.chat_rooms
40+
WHERE id IN (
41+
SELECT room_id
42+
FROM public.chat_room_members
43+
WHERE user_id = auth.user_id()
44+
)
45+
auto_subscribe: true
46+
47+
room_keys:
48+
query: |
49+
SELECT *
50+
FROM public.chat_room_keys
51+
WHERE user_id = auth.user_id()
52+
auto_subscribe: true
53+
54+
room_members:
55+
query: |
56+
SELECT *
57+
FROM public.chat_room_members
58+
WHERE room_id = subscription.parameter('room_id')
59+
AND room_id IN (
60+
SELECT room_id
61+
FROM public.chat_room_members
62+
WHERE user_id = auth.user_id()
63+
)
64+
65+
room_messages:
66+
query: |
67+
SELECT *
68+
FROM public.chat_messages
69+
WHERE bucket_id = subscription.parameter('room_id')
70+
AND bucket_id IN (
71+
SELECT room_id
72+
FROM public.chat_room_members
73+
WHERE user_id = auth.user_id()
74+
)
75+
76+
config:
77+
edition: 2

0 commit comments

Comments
 (0)