diff --git a/.github/workflows/fly-deploy.yml b/.github/workflows/fly-deploy.yml index 7d59ce8..75832b6 100644 --- a/.github/workflows/fly-deploy.yml +++ b/.github/workflows/fly-deploy.yml @@ -13,7 +13,9 @@ jobs: deploy: name: Deploy relay app runs-on: ubuntu-latest - concurrency: deploy-relay-group + concurrency: + group: deploy-relay-group + cancel-in-progress: false env: FLY_APP_NAME: wrapper-dry-pathway-1935 steps: diff --git a/apps/cli/client/attach-client.ts b/apps/cli/client/attach-client.ts index edd27f9..751e5b1 100644 --- a/apps/cli/client/attach-client.ts +++ b/apps/cli/client/attach-client.ts @@ -26,6 +26,8 @@ export interface AttachResult { export interface AttachClientHandle { done: Promise; detach: () => Promise; + /** Send raw bytes to the session as input (used to forward stray keystrokes). */ + forwardInput: (data: string) => void; } const ABORTED_REASON = "user_aborted" as const; @@ -206,6 +208,10 @@ export function startAttachClient(opts: AttachClientOptions): AttachClientHandle exitCode = msg.exitCode; reason = "session_closed"; log.debug("session ended remotely", { exitCode }); + // Complete the attach flow now rather than waiting for the socket to + // close. Over the relay the viewer socket may linger after the host + // session ends, which would otherwise keep the attach open forever. + finalize(); break; case "error": log.warn("host error", { code: msg.code, message: msg.message }); @@ -221,6 +227,11 @@ export function startAttachClient(opts: AttachClientOptions): AttachClientHandle if (!finalized) reason = ABORTED_REASON; return finalize(); }, + forwardInput: (data: string): void => { + if (!sessionId) return; + if (ws.readyState !== WebSocket.OPEN) return; + safeSend(ws, { type: "input", sessionId, data }); + }, }; } diff --git a/apps/cli/commands/attach.ts b/apps/cli/commands/attach.ts index 75676cd..7ce7995 100644 --- a/apps/cli/commands/attach.ts +++ b/apps/cli/commands/attach.ts @@ -2,7 +2,13 @@ import * as p from "@clack/prompts"; import { createLogger, trackError, trackEvent } from "@repo/logger"; import { makeFunctionReference } from "convex/server"; import { startAttachClient } from "../client/attach-client"; -import { findSession, latestSession, listSessions, type SessionRecord } from "../registry/sessions"; +import { + findSession, + findSessionByPort, + latestSession, + listSessions, + type SessionRecord, +} from "../registry/sessions"; import { PrefixFilter, type PrefixCommand } from "../shell/prefix"; import { resolveAuthedConvexClient } from "../util/convex-client"; import { env } from "../util/env"; @@ -124,6 +130,7 @@ export async function runAttach(opts: AttachOptions): Promise { const prefixFilter = new PrefixFilter({ onCommand: handlePrefixCommand, + onForward: (data) => handle.forwardInput(data), onArmedChange: (armed) => { if (armed) { setTitle(`● wrapper armed • viewer • ${sessionTag}`); @@ -190,6 +197,11 @@ async function resolveTarget(opts: AttachOptions): Promise } if (opts.port) { + // Resolve the real session id from the registry so the attach can be + // authorized. If the port isn't a known local session, keep it unknown — + // authorization will then refuse (when a backend is configured). + const byPort = findSessionByPort(opts.port); + if (byPort) return { id: byPort.id, port: byPort.port, local: true }; return { id: "", port: opts.port, local: true }; } @@ -244,15 +256,25 @@ function shortenHome(path: string): string { async function ensureAttachAllowed(target: TargetSession): Promise { if (!target.local || target.port === undefined) return false; - if (target.id === "") return true; const backend = resolveAuthedConvexClient(); + // No backend configured: nothing to authorize against (pure local dev). if (backend.status === "unconfigured") return true; if (backend.status === "missing_auth") { process.stderr.write("[wrapper] backend auth required. Run `wrapper auth login` first.\n"); return false; } + // A backend is configured but we couldn't resolve a session id (e.g. attach + // by an unknown port). We cannot verify ownership/sharing, so refuse rather + // than silently granting access. + if (target.id === "") { + process.stderr.write( + "[wrapper] cannot authorize attach by port alone. Re-run with `--id `.\n", + ); + return false; + } + try { await backend.client.query(authorizeAttachRef, { sessionId: target.id }); return true; diff --git a/apps/cli/commands/shell-host.ts b/apps/cli/commands/shell-host.ts index 301e067..ac8b2a4 100644 --- a/apps/cli/commands/shell-host.ts +++ b/apps/cli/commands/shell-host.ts @@ -225,6 +225,14 @@ export async function runShellHost(opts: ShellHostOptions = {}): Promise { } try { + // Persist the shared flag before issuing the relay ticket so backend + // state (e.g. viewer authorization checks) is synchronized rather than + // racing the periodic fire-and-forget heartbeat. + await backend.client.mutation(sessionHeartbeatRef, { + sessionId, + shared: true, + port: server.port, + }); await backend.client.mutation(setRelayStateRef, { sessionId, relayState: "connecting" }); const issued = await backend.client.action(issueHostRelayTicketRef, { sessionId }); relayBridge = startRelayHostBridge({ @@ -294,11 +302,8 @@ export async function runShellHost(opts: ShellHostOptions = {}): Promise { shared = true; setSessionShared(sessionId, true); trackEvent("session_shared"); - if (backend.status === "ready") { - void backend.client - .mutation(sessionHeartbeatRef, { sessionId, shared: true, port: server.port }) - .catch(() => {}); - } + // Backend `shared` state is persisted (awaited) inside startRelayBridge + // before the relay ticket is issued, avoiding a sync race. void startRelayBridge(); break; case "unshare": @@ -331,6 +336,7 @@ export async function runShellHost(opts: ShellHostOptions = {}): Promise { const prefixFilter = new PrefixFilter({ onCommand: handlePrefixCommand, + onForward: (data) => session.write(data), onArmedChange: (armed) => { if (!env.hudEnabled) return; if (armed) { diff --git a/apps/cli/index.ts b/apps/cli/index.ts index aef45b6..240263a 100644 --- a/apps/cli/index.ts +++ b/apps/cli/index.ts @@ -12,8 +12,11 @@ import { runStatus } from "./commands/status"; import { telemetryDisable, telemetryEnable, telemetryStatus } from "./commands/telemetry"; import { runUninstall } from "./commands/uninstall"; import type { SupportedShell } from "./shell/detect"; +import pkg from "./package.json"; -const VERSION = "0.0.0"; +// Single source of truth: the published package version. Keeping this in sync +// with package.json ensures `wrapper --version` matches release/Homebrew. +const VERSION = pkg.version; const SUPPORTED_SHELLS: SupportedShell[] = ["zsh", "bash", "fish"]; const subcommand = process.argv[2]; diff --git a/apps/cli/registry/sessions.ts b/apps/cli/registry/sessions.ts index 699dcb4..7b612c7 100644 --- a/apps/cli/registry/sessions.ts +++ b/apps/cli/registry/sessions.ts @@ -6,6 +6,7 @@ import { unlinkSync, openSync, closeSync, + statSync, } from "node:fs"; import { dirname, join } from "node:path"; import { createLogger } from "@repo/logger"; @@ -102,6 +103,58 @@ function clone(value: T): T { return JSON.parse(JSON.stringify(value)) as T; } +const LOCK_ACQUIRE_TIMEOUT_MS = 2000; +const LOCK_STALE_MS = 10_000; +const LOCK_RETRY_MS = 15; + +/** + * Serialize read-modify-write on the registry with an exclusive lock file so + * two `shell-host` processes starting together can't clobber each other's + * record. Best-effort: if the lock can't be acquired within the timeout (or a + * stale lock is detected), we break/proceed so the CLI never deadlocks. + */ +function withRegistryLock(fn: () => T): T { + const lockFile = `${paths.sessionsRegistry()}.lock`; + const start = Date.now(); + let fd: number | null = null; + + while (fd === null) { + try { + fd = openSync(lockFile, "wx", 0o600); + } catch { + try { + const age = Date.now() - statSync(lockFile).mtimeMs; + if (age > LOCK_STALE_MS) { + unlinkSync(lockFile); + continue; + } + } catch { + // Lock vanished between open and stat; retry immediately. + continue; + } + if (Date.now() - start > LOCK_ACQUIRE_TIMEOUT_MS) break; + Bun.sleepSync(LOCK_RETRY_MS); + } + } + + try { + return fn(); + } finally { + if (fd !== null) { + try { + closeSync(fd); + } catch { + // ignore + } + try { + unlinkSync(lockFile); + } catch { + // ignore + } + } + } +} + function isPidAlive(pid: number): boolean { try { // Signal 0 is the standard "is this process reachable?" probe. @@ -141,6 +194,11 @@ export function findSession(id: SessionId): SessionRecord | null { return listSessions().find((s) => s.id === id) ?? null; } +/** Find a live session by its local port, or `null` if none matches. */ +export function findSessionByPort(port: number): SessionRecord | null { + return listSessions().find((s) => s.port === port) ?? null; +} + /** Most recently created live session (helper for `wrapper attach` w/o args). */ export function latestSession(): SessionRecord | null { const sessions = listSessions(); @@ -150,27 +208,33 @@ export function latestSession(): SessionRecord | null { /** Insert a record. Replaces any existing entry with the same id. */ export function registerSession(record: SessionRecord): void { - const raw = readRaw(); - const others = raw.sessions.filter((s) => s.id !== record.id); - others.push(record); - writeRaw({ version: SCHEMA_VERSION, sessions: others }); + withRegistryLock(() => { + const raw = readRaw(); + const others = raw.sessions.filter((s) => s.id !== record.id); + others.push(record); + writeRaw({ version: SCHEMA_VERSION, sessions: others }); + }); log.debug("registry registered session", { id: record.id, pid: record.pid, port: record.port }); } /** Remove a record by id. No-op if the record is already gone. */ export function unregisterSession(id: SessionId): void { - const raw = readRaw(); - const next = raw.sessions.filter((s) => s.id !== id); - if (next.length === raw.sessions.length) return; - writeRaw({ version: SCHEMA_VERSION, sessions: next }); + withRegistryLock(() => { + const raw = readRaw(); + const next = raw.sessions.filter((s) => s.id !== id); + if (next.length === raw.sessions.length) return; + writeRaw({ version: SCHEMA_VERSION, sessions: next }); + }); log.debug("registry unregistered session", { id }); } /** Mutate the `shared` flag of a record in place. */ export function setSessionShared(id: SessionId, shared: boolean): void { - const raw = readRaw(); - const next = raw.sessions.map((s) => (s.id === id ? { ...s, shared } : s)); - writeRaw({ version: SCHEMA_VERSION, sessions: next }); + withRegistryLock(() => { + const raw = readRaw(); + const next = raw.sessions.map((s) => (s.id === id ? { ...s, shared } : s)); + writeRaw({ version: SCHEMA_VERSION, sessions: next }); + }); } /** diff --git a/apps/cli/scripts/install.sh b/apps/cli/scripts/install.sh index 7dd4bbd..adf2cef 100644 --- a/apps/cli/scripts/install.sh +++ b/apps/cli/scripts/install.sh @@ -3,7 +3,7 @@ set -euo pipefail REPO="${WRAPPER_RELEASE_REPO:-heycupola/wrapper}" VERSION="${WRAPPER_VERSION:-latest}" -INSTALL_DIR="${WRAPPER_INSTALL_DIR:-$HOME/.wrapper/bin}" +INSTALL_DIR="${WRAPPER_INSTALL_DIR:-$HOME/.wrapper}" detect_platform() { local os arch diff --git a/apps/cli/shell/prefix.ts b/apps/cli/shell/prefix.ts index 9851c4d..37b3870 100644 --- a/apps/cli/shell/prefix.ts +++ b/apps/cli/shell/prefix.ts @@ -44,6 +44,10 @@ export interface PrefixFilterOptions { /** Called when the filter enters or leaves "armed" state — useful for * rendering an overlay hint. Defaults to a no-op. */ onArmedChange?: (armed: boolean) => void; + /** Called to forward bytes to the shell out-of-band — used to re-emit the + * prefix byte when the armed state times out without a follow-up command, + * so the keystroke is never silently dropped. Defaults to a no-op. */ + onForward?: (data: string) => void; /** Auto-reset the armed state after this many milliseconds. Defaults to * 1500. Set to 0 to disable the timeout (not recommended). */ armedTimeoutMs?: number; @@ -64,6 +68,7 @@ export class PrefixFilter { private readonly prefix: number; private readonly onCommand: (cmd: PrefixCommand) => void; private readonly onArmedChange: (armed: boolean) => void; + private readonly onForward: (data: string) => void; private readonly timeoutMs: number; private state: State = "idle"; private armedTimer: ReturnType | null = null; @@ -72,6 +77,7 @@ export class PrefixFilter { this.prefix = opts.prefix ?? DEFAULT_PREFIX; this.onCommand = opts.onCommand; this.onArmedChange = opts.onArmedChange ?? (() => undefined); + this.onForward = opts.onForward ?? (() => undefined); this.timeoutMs = opts.armedTimeoutMs ?? DEFAULT_TIMEOUT_MS; } @@ -154,17 +160,15 @@ export class PrefixFilter { this.onArmedChange(true); if (this.timeoutMs > 0) { this.armedTimer = setTimeout(() => { - // Time elapsed without a follow-up keystroke. Re-emit the prefix - // byte to the shell so it isn't lost — the user clearly didn't - // intend it to be a wrapper command. + // Time elapsed without a follow-up keystroke. Re-emit the prefix byte + // to the shell so it isn't lost — the user clearly didn't intend it as + // a wrapper command. `process()` already returned, so we forward it + // out-of-band via the onForward hook (host writes it to the PTY; + // viewer sends it as input). This upholds the "never drop input" + // invariant even across the armed timeout. if (this.state !== "armed") return; this.disarm(); - // We cannot synchronously add to the previous `process` call's - // output, so caller must surface the byte by listening to - // onArmedChange(false) and relying on its own input plumbing. - // In practice this is fine: a 1.5 s gap means the user paused, and - // the very next byte they type already arrives in idle state, so - // the worst case is a single dropped byte (the prefix itself). + this.onForward(String.fromCharCode(this.prefix)); }, this.timeoutMs); // Don't keep the event loop alive just for this timer. this.armedTimer.unref?.(); diff --git a/apps/cli/tests/prefix.test.ts b/apps/cli/tests/prefix.test.ts index 1047f89..2e4afdf 100644 --- a/apps/cli/tests/prefix.test.ts +++ b/apps/cli/tests/prefix.test.ts @@ -109,4 +109,18 @@ describe("PrefixFilter", () => { expect(filter.armed).toBe(false); expect(armedHistory).toEqual([true, false]); }); + + test("auto-timeout re-emits the prefix byte instead of dropping it", async () => { + const forwarded: string[] = []; + const filter = new PrefixFilter({ + onCommand: () => {}, + onForward: (data) => forwarded.push(data), + armedTimeoutMs: 50, + }); + expect(filter.process(PFX)).toBe(""); + expect(forwarded).toEqual([]); + await new Promise((r) => setTimeout(r, 100)); + expect(filter.armed).toBe(false); + expect(forwarded).toEqual([PFX]); + }); }); diff --git a/apps/cli/tsconfig.json b/apps/cli/tsconfig.json index d46291f..893c0b9 100644 --- a/apps/cli/tsconfig.json +++ b/apps/cli/tsconfig.json @@ -2,7 +2,8 @@ "extends": "@repo/typescript-config/bundler.json", "compilerOptions": { "moduleDetection": "force", - "noUncheckedIndexedAccess": true + "noUncheckedIndexedAccess": true, + "resolveJsonModule": true }, "include": [ "index.ts", diff --git a/apps/relay/src/hub.ts b/apps/relay/src/hub.ts index 66bde9a..e81f6be 100644 --- a/apps/relay/src/hub.ts +++ b/apps/relay/src/hub.ts @@ -124,16 +124,32 @@ export class RelayHub { private forwardHostMessage(binding: PeerBinding, msg: WrapperMessage): void { switch (msg.type) { case "session.opened": - case "session.closed": case "output": case "error": this.broadcastToViewers(binding.sessionId, msg); break; + case "session.closed": + // The session ended: deliver the final frame, then close every viewer + // so relay attaches don't linger open after the host is gone. + this.broadcastToViewers(binding.sessionId, msg); + this.closeViewers(binding.sessionId, CLOSE_HOST_DISCONNECTED, "session closed"); + break; default: this.log.warn("unexpected host message", { type: msg.type, sessionId: binding.sessionId }); } } + private closeViewers(sessionId: string, code: number, reason: string): void { + const viewers = this.viewersBySession.get(sessionId); + if (!viewers || viewers.size === 0) return; + for (const viewer of viewers) { + viewer.close(code, reason); + this.bindingByPeer.delete(viewer); + this.viewerState.delete(viewer); + } + this.viewersBySession.delete(sessionId); + } + private forwardViewerMessage(binding: PeerBinding, msg: WrapperMessage, peer: RelayPeer): void { const host = this.hostBySession.get(binding.sessionId); if (!host) { diff --git a/packages/logger/config.ts b/packages/logger/config.ts index b9af4f8..cc1dbda 100644 --- a/packages/logger/config.ts +++ b/packages/logger/config.ts @@ -85,7 +85,10 @@ function isTelemetryEnabled(): boolean { if (IS_DEV) return false; const preference = readTelemetryPreference(); if (preference !== null) return preference; - return true; + // No recorded consent yet. Many users first launch via the rc `shell-host` + // hook, which never shows the opt-out banner, so default to OFF until an + // explicit preference is saved (the first-run banner records consent). + return false; } export function getConfig(): LoggerConfig {