Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/fly-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ jobs:
deploy:
name: Deploy relay app
runs-on: ubuntu-latest
concurrency: deploy-relay-group
concurrency:
group: deploy-relay-group
cancel-in-progress: false
env:
FLY_APP_NAME: wrapper-dry-pathway-1935
steps:
Expand Down
11 changes: 11 additions & 0 deletions apps/cli/client/attach-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export interface AttachResult {
export interface AttachClientHandle {
done: Promise<AttachResult>;
detach: () => Promise<AttachResult>;
/** Send raw bytes to the session as input (used to forward stray keystrokes). */
forwardInput: (data: string) => void;
}

const ABORTED_REASON = "user_aborted" as const;
Expand Down Expand Up @@ -206,6 +208,10 @@ export function startAttachClient(opts: AttachClientOptions): AttachClientHandle
exitCode = msg.exitCode;
reason = "session_closed";
log.debug("session ended remotely", { exitCode });
// Complete the attach flow now rather than waiting for the socket to
// close. Over the relay the viewer socket may linger after the host
// session ends, which would otherwise keep the attach open forever.
finalize();
break;
case "error":
log.warn("host error", { code: msg.code, message: msg.message });
Expand All @@ -221,6 +227,11 @@ export function startAttachClient(opts: AttachClientOptions): AttachClientHandle
if (!finalized) reason = ABORTED_REASON;
return finalize();
},
forwardInput: (data: string): void => {
if (!sessionId) return;
if (ws.readyState !== WebSocket.OPEN) return;
safeSend(ws, { type: "input", sessionId, data });
},
};
}

Expand Down
26 changes: 24 additions & 2 deletions apps/cli/commands/attach.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import * as p from "@clack/prompts";
import { createLogger, trackError, trackEvent } from "@repo/logger";
import { makeFunctionReference } from "convex/server";
import { startAttachClient } from "../client/attach-client";
import { findSession, latestSession, listSessions, type SessionRecord } from "../registry/sessions";
import {
findSession,
findSessionByPort,
latestSession,
listSessions,
type SessionRecord,
} from "../registry/sessions";
import { PrefixFilter, type PrefixCommand } from "../shell/prefix";
import { resolveAuthedConvexClient } from "../util/convex-client";
import { env } from "../util/env";
Expand Down Expand Up @@ -124,6 +130,7 @@ export async function runAttach(opts: AttachOptions): Promise<void> {

const prefixFilter = new PrefixFilter({
onCommand: handlePrefixCommand,
onForward: (data) => handle.forwardInput(data),
onArmedChange: (armed) => {
if (armed) {
setTitle(`● wrapper armed • viewer • ${sessionTag}`);
Expand Down Expand Up @@ -190,6 +197,11 @@ async function resolveTarget(opts: AttachOptions): Promise<TargetSession | null>
}

if (opts.port) {
// Resolve the real session id from the registry so the attach can be
// authorized. If the port isn't a known local session, keep it unknown —
// authorization will then refuse (when a backend is configured).
const byPort = findSessionByPort(opts.port);
if (byPort) return { id: byPort.id, port: byPort.port, local: true };
return { id: "<unknown>", port: opts.port, local: true };
}

Expand Down Expand Up @@ -244,15 +256,25 @@ function shortenHome(path: string): string {

async function ensureAttachAllowed(target: TargetSession): Promise<boolean> {
if (!target.local || target.port === undefined) return false;
if (target.id === "<unknown>") return true;

const backend = resolveAuthedConvexClient();
// No backend configured: nothing to authorize against (pure local dev).
if (backend.status === "unconfigured") return true;
if (backend.status === "missing_auth") {
process.stderr.write("[wrapper] backend auth required. Run `wrapper auth login` first.\n");
return false;
}

// A backend is configured but we couldn't resolve a session id (e.g. attach
// by an unknown port). We cannot verify ownership/sharing, so refuse rather
// than silently granting access.
if (target.id === "<unknown>") {
process.stderr.write(
"[wrapper] cannot authorize attach by port alone. Re-run with `--id <sessionId>`.\n",
);
return false;
}

try {
await backend.client.query(authorizeAttachRef, { sessionId: target.id });
return true;
Expand Down
16 changes: 11 additions & 5 deletions apps/cli/commands/shell-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ export async function runShellHost(opts: ShellHostOptions = {}): Promise<void> {
}

try {
// Persist the shared flag before issuing the relay ticket so backend
// state (e.g. viewer authorization checks) is synchronized rather than
// racing the periodic fire-and-forget heartbeat.
await backend.client.mutation(sessionHeartbeatRef, {
sessionId,
shared: true,
port: server.port,
});
await backend.client.mutation(setRelayStateRef, { sessionId, relayState: "connecting" });
const issued = await backend.client.action(issueHostRelayTicketRef, { sessionId });
relayBridge = startRelayHostBridge({
Expand Down Expand Up @@ -294,11 +302,8 @@ export async function runShellHost(opts: ShellHostOptions = {}): Promise<void> {
shared = true;
setSessionShared(sessionId, true);
trackEvent("session_shared");
if (backend.status === "ready") {
void backend.client
.mutation(sessionHeartbeatRef, { sessionId, shared: true, port: server.port })
.catch(() => {});
}
// Backend `shared` state is persisted (awaited) inside startRelayBridge
// before the relay ticket is issued, avoiding a sync race.
void startRelayBridge();
break;
case "unshare":
Expand Down Expand Up @@ -331,6 +336,7 @@ export async function runShellHost(opts: ShellHostOptions = {}): Promise<void> {

const prefixFilter = new PrefixFilter({
onCommand: handlePrefixCommand,
onForward: (data) => session.write(data),
onArmedChange: (armed) => {
if (!env.hudEnabled) return;
if (armed) {
Expand Down
5 changes: 4 additions & 1 deletion apps/cli/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import { runStatus } from "./commands/status";
import { telemetryDisable, telemetryEnable, telemetryStatus } from "./commands/telemetry";
import { runUninstall } from "./commands/uninstall";
import type { SupportedShell } from "./shell/detect";
import pkg from "./package.json";

const VERSION = "0.0.0";
// Single source of truth: the published package version. Keeping this in sync
// with package.json ensures `wrapper --version` matches release/Homebrew.
const VERSION = pkg.version;
const SUPPORTED_SHELLS: SupportedShell[] = ["zsh", "bash", "fish"];

const subcommand = process.argv[2];
Expand Down
86 changes: 75 additions & 11 deletions apps/cli/registry/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
unlinkSync,
openSync,
closeSync,
statSync,
} from "node:fs";
import { dirname, join } from "node:path";
import { createLogger } from "@repo/logger";
Expand Down Expand Up @@ -102,6 +103,58 @@ function clone<T>(value: T): T {
return JSON.parse(JSON.stringify(value)) as T;
}

const LOCK_ACQUIRE_TIMEOUT_MS = 2000;
const LOCK_STALE_MS = 10_000;
const LOCK_RETRY_MS = 15;

/**
* Serialize read-modify-write on the registry with an exclusive lock file so
* two `shell-host` processes starting together can't clobber each other's
* record. Best-effort: if the lock can't be acquired within the timeout (or a
* stale lock is detected), we break/proceed so the CLI never deadlocks.
*/
function withRegistryLock<T>(fn: () => T): T {
const lockFile = `${paths.sessionsRegistry()}.lock`;
const start = Date.now();
let fd: number | null = null;

while (fd === null) {
try {
fd = openSync(lockFile, "wx", 0o600);
} catch {
try {
const age = Date.now() - statSync(lockFile).mtimeMs;
if (age > LOCK_STALE_MS) {
unlinkSync(lockFile);
continue;
}
} catch {
// Lock vanished between open and stat; retry immediately.
continue;
}
if (Date.now() - start > LOCK_ACQUIRE_TIMEOUT_MS) break;
Bun.sleepSync(LOCK_RETRY_MS);
}
}

try {
return fn();
} finally {
if (fd !== null) {
try {
closeSync(fd);
} catch {
// ignore
}
try {
unlinkSync(lockFile);
} catch {
// ignore
}
}
}
}

function isPidAlive(pid: number): boolean {
try {
// Signal 0 is the standard "is this process reachable?" probe.
Expand Down Expand Up @@ -141,6 +194,11 @@ export function findSession(id: SessionId): SessionRecord | null {
return listSessions().find((s) => s.id === id) ?? null;
}

/** Find a live session by its local port, or `null` if none matches. */
export function findSessionByPort(port: number): SessionRecord | null {
return listSessions().find((s) => s.port === port) ?? null;
}

/** Most recently created live session (helper for `wrapper attach` w/o args). */
export function latestSession(): SessionRecord | null {
const sessions = listSessions();
Expand All @@ -150,27 +208,33 @@ export function latestSession(): SessionRecord | null {

/** Insert a record. Replaces any existing entry with the same id. */
export function registerSession(record: SessionRecord): void {
const raw = readRaw();
const others = raw.sessions.filter((s) => s.id !== record.id);
others.push(record);
writeRaw({ version: SCHEMA_VERSION, sessions: others });
withRegistryLock(() => {
const raw = readRaw();
const others = raw.sessions.filter((s) => s.id !== record.id);
others.push(record);
writeRaw({ version: SCHEMA_VERSION, sessions: others });
});
log.debug("registry registered session", { id: record.id, pid: record.pid, port: record.port });
}

/** Remove a record by id. No-op if the record is already gone. */
export function unregisterSession(id: SessionId): void {
const raw = readRaw();
const next = raw.sessions.filter((s) => s.id !== id);
if (next.length === raw.sessions.length) return;
writeRaw({ version: SCHEMA_VERSION, sessions: next });
withRegistryLock(() => {
const raw = readRaw();
const next = raw.sessions.filter((s) => s.id !== id);
if (next.length === raw.sessions.length) return;
writeRaw({ version: SCHEMA_VERSION, sessions: next });
});
log.debug("registry unregistered session", { id });
}

/** Mutate the `shared` flag of a record in place. */
export function setSessionShared(id: SessionId, shared: boolean): void {
const raw = readRaw();
const next = raw.sessions.map((s) => (s.id === id ? { ...s, shared } : s));
writeRaw({ version: SCHEMA_VERSION, sessions: next });

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prune writes bypass registry lock

Medium Severity

The listSessions function prunes stale entries by calling writeRaw without acquiring the registry lock. This creates a race condition with other locked registry mutations, potentially leading to session record loss or data corruption during concurrent operations.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 72a0e29. Configure here.

withRegistryLock(() => {
const raw = readRaw();
const next = raw.sessions.map((s) => (s.id === id ? { ...s, shared } : s));
writeRaw({ version: SCHEMA_VERSION, sessions: next });
});
}

/**
Expand Down
2 changes: 1 addition & 1 deletion apps/cli/scripts/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set -euo pipefail

REPO="${WRAPPER_RELEASE_REPO:-heycupola/wrapper}"
VERSION="${WRAPPER_VERSION:-latest}"
INSTALL_DIR="${WRAPPER_INSTALL_DIR:-$HOME/.wrapper/bin}"
INSTALL_DIR="${WRAPPER_INSTALL_DIR:-$HOME/.wrapper}"

detect_platform() {
local os arch
Expand Down
22 changes: 13 additions & 9 deletions apps/cli/shell/prefix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ export interface PrefixFilterOptions {
/** Called when the filter enters or leaves "armed" state — useful for
* rendering an overlay hint. Defaults to a no-op. */
onArmedChange?: (armed: boolean) => void;
/** Called to forward bytes to the shell out-of-band — used to re-emit the
* prefix byte when the armed state times out without a follow-up command,
* so the keystroke is never silently dropped. Defaults to a no-op. */
onForward?: (data: string) => void;
/** Auto-reset the armed state after this many milliseconds. Defaults to
* 1500. Set to 0 to disable the timeout (not recommended). */
armedTimeoutMs?: number;
Expand All @@ -64,6 +68,7 @@ export class PrefixFilter {
private readonly prefix: number;
private readonly onCommand: (cmd: PrefixCommand) => void;
private readonly onArmedChange: (armed: boolean) => void;
private readonly onForward: (data: string) => void;
private readonly timeoutMs: number;
private state: State = "idle";
private armedTimer: ReturnType<typeof setTimeout> | null = null;
Expand All @@ -72,6 +77,7 @@ export class PrefixFilter {
this.prefix = opts.prefix ?? DEFAULT_PREFIX;
this.onCommand = opts.onCommand;
this.onArmedChange = opts.onArmedChange ?? (() => undefined);
this.onForward = opts.onForward ?? (() => undefined);
this.timeoutMs = opts.armedTimeoutMs ?? DEFAULT_TIMEOUT_MS;
}

Expand Down Expand Up @@ -154,17 +160,15 @@ export class PrefixFilter {
this.onArmedChange(true);
if (this.timeoutMs > 0) {
this.armedTimer = setTimeout(() => {
// Time elapsed without a follow-up keystroke. Re-emit the prefix
// byte to the shell so it isn't lost — the user clearly didn't
// intend it to be a wrapper command.
// Time elapsed without a follow-up keystroke. Re-emit the prefix byte
// to the shell so it isn't lost — the user clearly didn't intend it as
// a wrapper command. `process()` already returned, so we forward it
// out-of-band via the onForward hook (host writes it to the PTY;
// viewer sends it as input). This upholds the "never drop input"
// invariant even across the armed timeout.
if (this.state !== "armed") return;
this.disarm();
// We cannot synchronously add to the previous `process` call's
// output, so caller must surface the byte by listening to
// onArmedChange(false) and relying on its own input plumbing.
// In practice this is fine: a 1.5 s gap means the user paused, and
// the very next byte they type already arrives in idle state, so
// the worst case is a single dropped byte (the prefix itself).
this.onForward(String.fromCharCode(this.prefix));
}, this.timeoutMs);
// Don't keep the event loop alive just for this timer.
this.armedTimer.unref?.();
Expand Down
14 changes: 14 additions & 0 deletions apps/cli/tests/prefix.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,18 @@ describe("PrefixFilter", () => {
expect(filter.armed).toBe(false);
expect(armedHistory).toEqual([true, false]);
});

test("auto-timeout re-emits the prefix byte instead of dropping it", async () => {
const forwarded: string[] = [];
const filter = new PrefixFilter({
onCommand: () => {},
onForward: (data) => forwarded.push(data),
armedTimeoutMs: 50,
});
expect(filter.process(PFX)).toBe("");
expect(forwarded).toEqual([]);
await new Promise((r) => setTimeout(r, 100));
expect(filter.armed).toBe(false);
expect(forwarded).toEqual([PFX]);
});
});
3 changes: 2 additions & 1 deletion apps/cli/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"extends": "@repo/typescript-config/bundler.json",
"compilerOptions": {
"moduleDetection": "force",
"noUncheckedIndexedAccess": true
"noUncheckedIndexedAccess": true,
"resolveJsonModule": true
},
"include": [
"index.ts",
Expand Down
Loading
Loading