Skip to content

Commit 4e164a7

Browse files
hyperpolymathclaude
andcommitted
feat(local-coord-mcp): role tiers, supervisor gating, quarantine queue
Tasks #5 + #6. Adds the supervision layer that makes supervised peers (gemini / codex / vibe by default) safe to have in a shared workspace — Tier 2+ ops land in a quarantine queue for supervisor review instead of delivering directly. FFI changes (local_coord_ffi.zig): - New Role enum { supervisor, executor, supervised } stored on each Peer. Default derived from client_kind: claude -> executor, everything else -> supervised. - coord_register now takes role_hint as a 2nd arg; ROLE_HINT_DEFAULT=-1 honors client_kind default; role_hint=0 (supervisor) is ALWAYS rejected — use coord_promote_to_supervisor instead. - coord_promote_to_supervisor gates supervisor role on the BOJ_SUPERVISOR_TOKEN env var (constant-time compare) + at most one supervisor rule. Env var unset = supervisor role unavailable. - coord_set_role lets the active supervisor reassign other peers' roles, with protection against demoting the sole supervisor. - coord_send_gated routes Tier 2+ ops from supervised peers to a 32-entry quarantine queue; returns an encoded request_id (negative value < -1000) instead of a direct send count. - coord_review / coord_review_entry / coord_approve / coord_reject are the supervisor-only review tools. approve delivers the pending message to its target(s); reject logs the reason without delivery. - coord_read_peer_role helper for adapter introspection. Adapter changes (local_coord_adapter.zig): - coord_register accepts optional `role` JSON field. - New dispatch handlers for coord_promote_to_supervisor, coord_send_gated, coord_review, coord_review_entry, coord_approve, coord_reject. - coord_review emits a JSON array with request_id + preview; coord_review_entry returns the full message body. MCP surface: - cartridge.ncl declares the 6 new tools with full schemas. - cartridge.json regenerated. - mcp-bridge/lib/tools.js declares the same tools to clients. - mcp-bridge/main.js dispatchTool routes the new names to dispatchLocalCoord. Tests (all passing): - Default role derives from client_kind - Register rejects role_hint=supervisor (-3) - Promote contract documented (env-var check behaviour) - Gated send from supervised lands in quarantine; Tier 0/1 delivers direct - Supervisor approve delivers the message to the target - Supervisor reject removes without delivery - Non-supervisor cannot review/approve/reject (-1) - Pre-existing send/receive/claim/context/suffix tests updated for new coord_register signature. Version bump: 0.2.0 -> 0.3.0 (new API surface). Remaining for the coord backbone: - Watchdog TTL on supervised claims (DD-20) - warn_drift broadcast via Opus review on auto-release (DD-21) - Quarantine spill to VeriSimDB when in-memory queue full (DD-21) - VeriSimDB sidecar for durable state (Task #7) - E2E test across 2 instances (Task #8) Follows memory/project_coord_supervision_architecture.md. Design log + rationale: Desktop/COORD-MCP-DESIGN-LOG.md (Parts 3-4, Appendices A-J). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ceb5125 commit 4e164a7

6 files changed

Lines changed: 1077 additions & 25 deletions

File tree

cartridges/local-coord-mcp/adapter/local_coord_adapter.zig

Lines changed: 162 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,21 @@ fn dispatch(tool: []const u8, body: []const u8, resp: []u8, allocator: std.mem.A
8383
break :blk ctx_val.string;
8484
};
8585

86+
// Optional role hint. Server rejects role=supervisor here; callers
87+
// must promote via coord_promote_to_supervisor with the env secret.
88+
const role_hint: i32 = blk: {
89+
const rv = parsed.value.object.get("role") orelse break :blk -1;
90+
const rs = rv.string;
91+
if (std.mem.eql(u8, rs, "supervisor")) break :blk 0;
92+
if (std.mem.eql(u8, rs, "executor")) break :blk 1;
93+
if (std.mem.eql(u8, rs, "supervised")) break :blk 2;
94+
break :blk -1;
95+
};
96+
8697
var token: [16]u8 = undefined;
8798
var suffix: [4]u8 = undefined;
88-
const idx = ffi.coord_register(kind, &token, &suffix);
99+
const idx = ffi.coord_register(kind, role_hint, &token, &suffix);
100+
if (idx == -3) return .{ .status = 400, .body = errJson(resp, "supervisor role must be obtained via coord_promote_to_supervisor") };
89101
if (idx < 0) return .{ .status = 500, .body = errJson(resp, "registry full") };
90102

91103
if (ctx_str.len > 0) {
@@ -241,6 +253,155 @@ fn dispatch(tool: []const u8, body: []const u8, resp: []u8, allocator: std.mem.A
241253
return .{ .status = 500, .body = errJson(resp, "claim failed") };
242254
}
243255

256+
if (std.mem.eql(u8, tool, "coord_promote_to_supervisor")) {
257+
const parsed = std.json.parseFromSlice(std.json.Value, allocator, body, .{}) catch return .{ .status = 400, .body = errJson(resp, "invalid json") };
258+
defer parsed.deinit();
259+
const token_val = parsed.value.object.get("token") orelse return .{ .status = 400, .body = errJson(resp, "missing token") };
260+
const secret_val = parsed.value.object.get("secret") orelse return .{ .status = 400, .body = errJson(resp, "missing secret") };
261+
var token: [16]u8 = undefined;
262+
if (!parseToken(token_val.string, &token)) return .{ .status = 400, .body = errJson(resp, "invalid token hex") };
263+
264+
const secret = secret_val.string;
265+
const rc = ffi.coord_promote_to_supervisor(&token, 16, secret.ptr, @intCast(secret.len));
266+
if (rc == 0) return .{ .status = 200, .body = okJson(resp, "promoted") };
267+
if (rc == -1) return .{ .status = 401, .body = errJson(resp, "unauthenticated") };
268+
if (rc == -2) return .{ .status = 409, .body = errJson(resp, "supervisor already exists") };
269+
if (rc == -3) return .{ .status = 403, .body = errJson(resp, "supervisor role not configured on this server") };
270+
if (rc == -4) return .{ .status = 403, .body = errJson(resp, "secret does not match") };
271+
return .{ .status = 500, .body = errJson(resp, "promotion failed") };
272+
}
273+
274+
if (std.mem.eql(u8, tool, "coord_send_gated")) {
275+
const parsed = std.json.parseFromSlice(std.json.Value, allocator, body, .{}) catch return .{ .status = 400, .body = errJson(resp, "invalid json") };
276+
defer parsed.deinit();
277+
const token_val = parsed.value.object.get("token") orelse return .{ .status = 400, .body = errJson(resp, "missing token") };
278+
const target_val = parsed.value.object.get("target") orelse return .{ .status = 400, .body = errJson(resp, "missing target") };
279+
const msg_val = parsed.value.object.get("message") orelse return .{ .status = 400, .body = errJson(resp, "missing message") };
280+
const tier_val = parsed.value.object.get("risk_tier") orelse return .{ .status = 400, .body = errJson(resp, "missing risk_tier") };
281+
282+
var token: [16]u8 = undefined;
283+
if (!parseToken(token_val.string, &token)) return .{ .status = 400, .body = errJson(resp, "invalid token hex") };
284+
285+
const target_str = target_val.string;
286+
var target_idx: i32 = -1;
287+
if (!std.mem.eql(u8, target_str, "*")) {
288+
const suffix = extractSuffix(target_str) orelse return .{ .status = 400, .body = errJson(resp, "invalid target format") };
289+
target_idx = ffi.coord_find_peer_by_suffix(suffix.ptr);
290+
if (target_idx < 0) return .{ .status = 404, .body = errJson(resp, "target peer not found") };
291+
}
292+
293+
const msg = msg_val.string;
294+
const tier: i32 = @intCast(tier_val.integer);
295+
const rc = ffi.coord_send_gated(&token, 16, target_idx, msg.ptr, @intCast(msg.len), tier);
296+
297+
if (rc >= 0) {
298+
const body_out = std.fmt.bufPrint(resp, "{{\"success\":true,\"status\":\"delivered\",\"sent\":{d}}}", .{rc}) catch return .{ .status = 500, .body = errJson(resp, "buffer overflow") };
299+
return .{ .status = 200, .body = body_out };
300+
}
301+
if (rc < -1000) {
302+
const request_id: i64 = -(@as(i64, rc) + 1000);
303+
const body_out = std.fmt.bufPrint(resp, "{{\"success\":true,\"status\":\"quarantined\",\"request_id\":{d}}}", .{request_id}) catch return .{ .status = 500, .body = errJson(resp, "buffer overflow") };
304+
return .{ .status = 200, .body = body_out };
305+
}
306+
if (rc == -1) return .{ .status = 401, .body = errJson(resp, "unauthenticated") };
307+
if (rc == -2) return .{ .status = 404, .body = errJson(resp, "target peer not found") };
308+
if (rc == -3) return .{ .status = 503, .body = errJson(resp, "target inbox full") };
309+
if (rc == -4) return .{ .status = 503, .body = errJson(resp, "quarantine queue full — spill to VeriSimDB not yet wired") };
310+
if (rc == -5) return .{ .status = 428, .body = errJson(resp, "no supervisor registered — Tier 2+ from supervised requires a supervisor") };
311+
return .{ .status = 500, .body = errJson(resp, "gated send failed") };
312+
}
313+
314+
if (std.mem.eql(u8, tool, "coord_review")) {
315+
const parsed = std.json.parseFromSlice(std.json.Value, allocator, body, .{}) catch return .{ .status = 400, .body = errJson(resp, "invalid json") };
316+
defer parsed.deinit();
317+
const token_val = parsed.value.object.get("token") orelse return .{ .status = 400, .body = errJson(resp, "missing token") };
318+
var token: [16]u8 = undefined;
319+
if (!parseToken(token_val.string, &token)) return .{ .status = 400, .body = errJson(resp, "invalid token hex") };
320+
321+
// 32 entries max * 16 bytes per record = 512 bytes raw.
322+
var raw: [512]u8 = undefined;
323+
const count = ffi.coord_review(&token, 16, &raw, @intCast(raw.len));
324+
if (count < 0) return .{ .status = 403, .body = errJson(resp, "supervisor role required") };
325+
326+
var stream = std.io.fixedBufferStream(resp);
327+
const w = stream.writer();
328+
w.writeAll("{\"success\":true,\"entries\":[") catch return .{ .status = 500, .body = errJson(resp, "buffer overflow") };
329+
330+
var i: usize = 0;
331+
const cnt: usize = @intCast(count);
332+
while (i < cnt) : (i += 1) {
333+
const rec = raw[i * 16 ..][0..16];
334+
const rid_bytes: *const [4]u8 = rec[0..4];
335+
const rid: u32 = @bitCast(rid_bytes.*);
336+
const sender_idx: u8 = rec[4];
337+
const target_idx_sign: i8 = @bitCast(rec[5]);
338+
const risk_tier: u8 = rec[6];
339+
const mlen_bytes: *const [2]u8 = rec[7..9];
340+
const mlen: u16 = @bitCast(mlen_bytes.*);
341+
const preview_n: usize = @min(@as(usize, 7), @as(usize, mlen));
342+
const preview = rec[9 .. 9 + preview_n];
343+
344+
if (i > 0) w.writeAll(",") catch return .{ .status = 500, .body = errJson(resp, "buffer overflow") };
345+
std.fmt.format(w, "{{\"request_id\":{d},\"sender_idx\":{d},\"target_idx\":{d},\"risk_tier\":{d},\"msg_len\":{d},\"preview\":\"{s}\"}}", .{
346+
rid, sender_idx, target_idx_sign, risk_tier, mlen, preview,
347+
}) catch return .{ .status = 500, .body = errJson(resp, "buffer overflow") };
348+
}
349+
w.writeAll("]}") catch return .{ .status = 500, .body = errJson(resp, "buffer overflow") };
350+
return .{ .status = 200, .body = resp[0..stream.pos] };
351+
}
352+
353+
if (std.mem.eql(u8, tool, "coord_review_entry")) {
354+
const parsed = std.json.parseFromSlice(std.json.Value, allocator, body, .{}) catch return .{ .status = 400, .body = errJson(resp, "invalid json") };
355+
defer parsed.deinit();
356+
const token_val = parsed.value.object.get("token") orelse return .{ .status = 400, .body = errJson(resp, "missing token") };
357+
const rid_val = parsed.value.object.get("request_id") orelse return .{ .status = 400, .body = errJson(resp, "missing request_id") };
358+
var token: [16]u8 = undefined;
359+
if (!parseToken(token_val.string, &token)) return .{ .status = 400, .body = errJson(resp, "invalid token hex") };
360+
361+
var msg_buf: [512]u8 = undefined;
362+
const rc = ffi.coord_review_entry(&token, 16, @intCast(rid_val.integer), &msg_buf, @intCast(msg_buf.len));
363+
if (rc == -1) return .{ .status = 403, .body = errJson(resp, "supervisor role required") };
364+
if (rc == -2) return .{ .status = 404, .body = errJson(resp, "request_id not found") };
365+
if (rc < 0) return .{ .status = 500, .body = errJson(resp, "review failed") };
366+
367+
const msg_slice = msg_buf[0..@intCast(rc)];
368+
const body_out = std.fmt.bufPrint(resp, "{{\"success\":true,\"message\":\"{s}\"}}", .{msg_slice}) catch return .{ .status = 500, .body = errJson(resp, "buffer overflow") };
369+
return .{ .status = 200, .body = body_out };
370+
}
371+
372+
if (std.mem.eql(u8, tool, "coord_approve")) {
373+
const parsed = std.json.parseFromSlice(std.json.Value, allocator, body, .{}) catch return .{ .status = 400, .body = errJson(resp, "invalid json") };
374+
defer parsed.deinit();
375+
const token_val = parsed.value.object.get("token") orelse return .{ .status = 400, .body = errJson(resp, "missing token") };
376+
const rid_val = parsed.value.object.get("request_id") orelse return .{ .status = 400, .body = errJson(resp, "missing request_id") };
377+
var token: [16]u8 = undefined;
378+
if (!parseToken(token_val.string, &token)) return .{ .status = 400, .body = errJson(resp, "invalid token hex") };
379+
380+
const rc = ffi.coord_approve(&token, 16, @intCast(rid_val.integer));
381+
if (rc == 0) return .{ .status = 200, .body = okJson(resp, "approved") };
382+
if (rc == -1) return .{ .status = 403, .body = errJson(resp, "supervisor role required") };
383+
if (rc == -2) return .{ .status = 404, .body = errJson(resp, "request_id not found") };
384+
if (rc == -3) return .{ .status = 503, .body = errJson(resp, "target inbox full — retry") };
385+
return .{ .status = 500, .body = errJson(resp, "approve failed") };
386+
}
387+
388+
if (std.mem.eql(u8, tool, "coord_reject")) {
389+
const parsed = std.json.parseFromSlice(std.json.Value, allocator, body, .{}) catch return .{ .status = 400, .body = errJson(resp, "invalid json") };
390+
defer parsed.deinit();
391+
const token_val = parsed.value.object.get("token") orelse return .{ .status = 400, .body = errJson(resp, "missing token") };
392+
const rid_val = parsed.value.object.get("request_id") orelse return .{ .status = 400, .body = errJson(resp, "missing request_id") };
393+
const reason_val = parsed.value.object.get("reason") orelse return .{ .status = 400, .body = errJson(resp, "missing reason") };
394+
var token: [16]u8 = undefined;
395+
if (!parseToken(token_val.string, &token)) return .{ .status = 400, .body = errJson(resp, "invalid token hex") };
396+
397+
const reason = reason_val.string;
398+
const rc = ffi.coord_reject(&token, 16, @intCast(rid_val.integer), reason.ptr, @intCast(reason.len));
399+
if (rc == 0) return .{ .status = 200, .body = okJson(resp, "rejected") };
400+
if (rc == -1) return .{ .status = 403, .body = errJson(resp, "supervisor role required") };
401+
if (rc == -2) return .{ .status = 404, .body = errJson(resp, "request_id not found") };
402+
return .{ .status = 500, .body = errJson(resp, "reject failed") };
403+
}
404+
244405
return .{ .status = 404, .body = errJson(resp, "not implemented") };
245406
}
246407

cartridges/local-coord-mcp/cartridge.json

Lines changed: 147 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@
4444
"context": {
4545
"description": "Optional per-window disambiguator (repo name, tty label). Alphanumeric/hyphen/underscore only, max 32 chars.",
4646
"type": "string"
47+
},
48+
"role": {
49+
"description": "Optional requested role. supervisor is ALWAYS rejected here — promote via coord_promote_to_supervisor. Default derived from client_kind (claude->executor, others->supervised).",
50+
"enum": [
51+
"executor",
52+
"supervised"
53+
],
54+
"type": "string"
4755
}
4856
},
4957
"required": [
@@ -152,7 +160,145 @@
152160
"type": "object"
153161
},
154162
"name": "coord_status"
163+
},
164+
{
165+
"description": "Promote this peer to the supervisor role. Requires the BOJ_SUPERVISOR_TOKEN env-var to be set on the server and the presented secret to match. At most one supervisor at a time. Intended for the Opus-tier Claude that holds the veto.",
166+
"inputSchema": {
167+
"properties": {
168+
"secret": {
169+
"description": "Must match BOJ_SUPERVISOR_TOKEN env var on the server",
170+
"type": "string"
171+
},
172+
"token": {
173+
"description": "Own session token from coord_register",
174+
"type": "string"
175+
}
176+
},
177+
"required": [
178+
"token",
179+
"secret"
180+
],
181+
"type": "object"
182+
},
183+
"name": "coord_promote_to_supervisor"
184+
},
185+
{
186+
"description": "Send a message with a declared risk_tier. Tier 2+ from role=supervised peers is quarantined for supervisor review; everything else delivers directly. Returns status:quarantined + request_id when gated.",
187+
"inputSchema": {
188+
"properties": {
189+
"message": {
190+
"description": "Message content (typically an A2ML envelope per schemas/coord-messages.ncl)",
191+
"type": "string"
192+
},
193+
"risk_tier": {
194+
"description": "Declared risk tier 0-4 per the risk ladder",
195+
"maximum": 4,
196+
"minimum": 0,
197+
"type": "integer"
198+
},
199+
"target": {
200+
"description": "Peer ID to send to, or '*' for broadcast",
201+
"type": "string"
202+
},
203+
"token": {
204+
"description": "Session token from coord_register",
205+
"type": "string"
206+
}
207+
},
208+
"required": [
209+
"token",
210+
"target",
211+
"message",
212+
"risk_tier"
213+
],
214+
"type": "object"
215+
},
216+
"name": "coord_send_gated"
217+
},
218+
{
219+
"description": "List all currently quarantined messages awaiting supervisor decision. Supervisor role only.",
220+
"inputSchema": {
221+
"properties": {
222+
"token": {
223+
"description": "Supervisor session token",
224+
"type": "string"
225+
}
226+
},
227+
"required": [
228+
"token"
229+
],
230+
"type": "object"
231+
},
232+
"name": "coord_review"
233+
},
234+
{
235+
"description": "Read the full message body of a specific quarantined entry. Supervisor role only.",
236+
"inputSchema": {
237+
"properties": {
238+
"request_id": {
239+
"description": "Request ID from coord_review or coord_send_gated",
240+
"type": "integer"
241+
},
242+
"token": {
243+
"description": "Supervisor session token",
244+
"type": "string"
245+
}
246+
},
247+
"required": [
248+
"token",
249+
"request_id"
250+
],
251+
"type": "object"
252+
},
253+
"name": "coord_review_entry"
254+
},
255+
{
256+
"description": "Approve a quarantined message — delivers to target and removes from queue. Supervisor role only.",
257+
"inputSchema": {
258+
"properties": {
259+
"request_id": {
260+
"description": "Request ID to approve",
261+
"type": "integer"
262+
},
263+
"token": {
264+
"description": "Supervisor session token",
265+
"type": "string"
266+
}
267+
},
268+
"required": [
269+
"token",
270+
"request_id"
271+
],
272+
"type": "object"
273+
},
274+
"name": "coord_approve"
275+
},
276+
{
277+
"description": "Reject a quarantined message with a reason — removes from queue without delivery. Supervisor role only. Reason logged for audit.",
278+
"inputSchema": {
279+
"properties": {
280+
"reason": {
281+
"description": "Reason for rejection (surfaces in audit log)",
282+
"type": "string"
283+
},
284+
"request_id": {
285+
"description": "Request ID to reject",
286+
"type": "integer"
287+
},
288+
"token": {
289+
"description": "Supervisor session token",
290+
"type": "string"
291+
}
292+
},
293+
"required": [
294+
"token",
295+
"request_id",
296+
"reason"
297+
],
298+
"type": "object"
299+
},
300+
"name": "coord_reject"
155301
}
156302
],
157-
"version": "0.2.0"
303+
"version": "0.3.0"
158304
}

0 commit comments

Comments
 (0)