Skip to content

Commit 282de53

Browse files
hyperpolymathclaude
andcommitted
feat(local-coord-mcp): persist-on-write + replay-on-init for coord state
Wires the durability module into every mutation point in the coord FFI: peer add/remove/role/context/status, inbox push/pop, claim add/release, quarantine add/approve/reject. Each mutation appends a typed event to the BOJ_COORD_STATE_DIR log; boj_cartridge_init replays the log to restore peer registry, active claims, inbox ring buffers, and the quarantine queue across adapter restart. Replay dispatcher is tolerant: events referencing out-of-range slots, unknown request IDs, or stale peers are silently dropped rather than aborting replay. CRC-truncated records stop replay at the break, matching the durability module's best-effort semantics. Audit + track-update events are logged but don't reconstruct any in-memory state — they're append-only forensic data consumed by Task #13 (effective_affinity) and the supervision audit trail. All 24 tests pass (19 existing + 5 new durability round-trips). Existing in-memory-only behaviour is preserved when BOJ_COORD_STATE_DIR is unset. Task #7 Pass B, step 2/3. Restart-preserves-state integration test next. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 2ae952e commit 282de53

1 file changed

Lines changed: 171 additions & 3 deletions

File tree

cartridges/local-coord-mcp/ffi/local_coord_ffi.zig

Lines changed: 171 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,15 @@
88
// claiming (mutex). Binds ONLY to 127.0.0.1:7745 — the Idris2 ABI
99
// proves loopback-only at compile time; this FFI honours that constraint
1010
// at runtime.
11+
//
12+
// Durability: every mutation persists to an append-only log under
13+
// BOJ_COORD_STATE_DIR. On init the log is replayed to restore state
14+
// across adapter restarts. When the env var is unset, durability is a
15+
// silent no-op — process-local in-memory behaviour is preserved.
16+
// See coord_durability.zig.
1117

1218
const std = @import("std");
19+
const dur = @import("coord_durability.zig");
1320

1421
// ═══════════════════════════════════════════════════════════════════════
1522
// Constants (must match SafeLocalCoord.idr)
@@ -310,6 +317,7 @@ pub export fn coord_register(kind: c_int, role_hint: c_int, token_out: [*]u8, su
310317
@memcpy(token_out[0..TOKEN_LEN], &p.token);
311318
@memcpy(suffix_out[0..4], &p.suffix);
312319

320+
dur.logPeerAdd(@intCast(i), @intCast(@intFromEnum(client_kind)), @intCast(@intFromEnum(role)), &p.suffix, &p.token);
313321
return @intCast(i);
314322
}
315323
}
@@ -356,6 +364,7 @@ pub export fn coord_promote_to_supervisor(
356364
if (diff != 0) return -4;
357365

358366
peers[idx].role = .supervisor;
367+
dur.logPeerRoleSet(@intCast(idx), @intCast(@intFromEnum(Role.supervisor)));
359368
return 0;
360369
}
361370

@@ -403,6 +412,7 @@ pub export fn coord_set_role(
403412
}
404413

405414
target.role = nr;
415+
dur.logPeerRoleSet(@intCast(target_peer_idx), @intCast(@intFromEnum(nr)));
406416
return 0;
407417
}
408418

@@ -433,6 +443,7 @@ pub export fn coord_set_context(
433443

434444
if (clen > 0) @memcpy(peers[idx].context[0..clen], ctx_ptr[0..clen]);
435445
peers[idx].context_len = @intCast(clen);
446+
dur.logPeerContextSet(@intCast(idx), ctx_ptr[0..clen]);
436447
return 0;
437448
}
438449

@@ -459,14 +470,16 @@ pub export fn coord_deregister(token_ptr: [*]const u8, token_len: c_int) c_int {
459470
const idx = findPeerByToken(token_ptr, @intCast(token_len)) orelse return -1;
460471

461472
// Release all claims held by this peer
462-
for (&claims) |*c| {
473+
for (&claims, 0..) |*c, ci| {
463474
if (c.active and c.holder_idx == @as(u8, @intCast(idx))) {
464475
c.active = false;
476+
dur.logClaimRel(@intCast(ci));
465477
}
466478
}
467479

468480
peers[idx].active = false;
469481
peers[idx].state = .gone;
482+
dur.logPeerRemove(@intCast(idx));
470483
return 0;
471484
}
472485

@@ -524,6 +537,7 @@ pub export fn coord_send(
524537
p.inbox_lens[head] = @intCast(mlen);
525538
p.inbox_head = @intCast((@as(u32, p.inbox_head) + 1) % MAX_MESSAGES);
526539
p.inbox_count += 1;
540+
dur.logInboxPush(@intCast(i), msg_ptr[0..mlen]);
527541
sent += 1;
528542
}
529543
}
@@ -541,6 +555,7 @@ pub export fn coord_send(
541555
target.inbox_lens[head] = @intCast(mlen);
542556
target.inbox_head = @intCast((@as(u32, target.inbox_head) + 1) % MAX_MESSAGES);
543557
target.inbox_count += 1;
558+
dur.logInboxPush(@intCast(tidx), msg_ptr[0..mlen]);
544559
return 1;
545560
}
546561
}
@@ -566,6 +581,7 @@ pub export fn coord_receive(
566581
@memcpy(msg_out[0..mlen], peer.inbox[tail][0..mlen]);
567582
peer.inbox_tail = @intCast((@as(u32, peer.inbox_tail) + 1) % MAX_MESSAGES);
568583
peer.inbox_count -= 1;
584+
dur.logInboxPop(@intCast(idx));
569585
return @intCast(mlen);
570586
}
571587

@@ -595,12 +611,13 @@ pub export fn coord_claim_task(
595611
}
596612

597613
// Find an empty claim slot
598-
for (&claims) |*c| {
614+
for (&claims, 0..) |*c, ci| {
599615
if (!c.active) {
600616
c.active = true;
601617
@memcpy(c.task_name[0..tlen], task_ptr[0..tlen]);
602618
c.task_name_len = @intCast(tlen);
603619
c.holder_idx = @intCast(idx);
620+
dur.logClaimAdd(@intCast(ci), @intCast(idx), task_ptr[0..tlen]);
604621
return 0; // Granted
605622
}
606623
}
@@ -620,12 +637,13 @@ pub export fn coord_release_task(
620637
const idx = findPeerByToken(token_ptr, @intCast(token_len)) orelse return -1;
621638
const tlen: usize = @intCast(@min(task_len, 128));
622639

623-
for (&claims) |*c| {
640+
for (&claims, 0..) |*c, ci| {
624641
if (c.active and c.task_name_len == @as(u8, @intCast(tlen)) and
625642
std.mem.eql(u8, c.task_name[0..tlen], task_ptr[0..tlen]) and
626643
c.holder_idx == @as(u8, @intCast(idx)))
627644
{
628645
c.active = false;
646+
dur.logClaimRel(@intCast(ci));
629647
return 0;
630648
}
631649
}
@@ -681,6 +699,7 @@ pub export fn coord_send_gated(
681699
p.inbox_lens[head] = @intCast(mlen);
682700
p.inbox_head = @intCast((@as(u32, p.inbox_head) + 1) % MAX_MESSAGES);
683701
p.inbox_count += 1;
702+
dur.logInboxPush(@intCast(i), msg_ptr[0..mlen]);
684703
sent += 1;
685704
}
686705
}
@@ -697,6 +716,7 @@ pub export fn coord_send_gated(
697716
target.inbox_lens[head] = @intCast(mlen);
698717
target.inbox_head = @intCast((@as(u32, target.inbox_head) + 1) % MAX_MESSAGES);
699718
target.inbox_count += 1;
719+
dur.logInboxPush(@intCast(target_idx), msg_ptr[0..mlen]);
700720
return 1;
701721
}
702722

@@ -715,6 +735,7 @@ pub export fn coord_send_gated(
715735
@memcpy(q.msg[0..mlen], msg_ptr[0..mlen]);
716736
q.msg_len = @intCast(mlen);
717737
q.reason_len = 0;
738+
dur.logQuarAdd(q.request_id, q.sender_idx, q.target_idx, q.risk_tier, msg_ptr[0..mlen]);
718739
// Encode request_id as -(id + 1000) so caller can distinguish
719740
// from direct-send counts.
720741
const encoded: i64 = -(@as(i64, @intCast(q.request_id)) + 1000);
@@ -825,6 +846,7 @@ pub export fn coord_approve(
825846
p.inbox_lens[head] = @intCast(mlen);
826847
p.inbox_head = @intCast((@as(u32, p.inbox_head) + 1) % MAX_MESSAGES);
827848
p.inbox_count += 1;
849+
dur.logInboxPush(@intCast(i), q.msg[0..mlen]);
828850
}
829851
}
830852
} else {
@@ -838,8 +860,10 @@ pub export fn coord_approve(
838860
target.inbox_lens[head] = @intCast(mlen);
839861
target.inbox_head = @intCast((@as(u32, target.inbox_head) + 1) % MAX_MESSAGES);
840862
target.inbox_count += 1;
863+
dur.logInboxPush(@intCast(tidx), q.msg[0..mlen]);
841864
}
842865
q.active = false;
866+
dur.logQuarApprove(rid);
843867
return 0;
844868
}
845869
}
@@ -871,6 +895,7 @@ pub export fn coord_reject(
871895
if (rlen > 0) @memcpy(q.reason[0..rlen], reason_ptr[0..rlen]);
872896
q.reason_len = @intCast(rlen);
873897
q.active = false;
898+
dur.logQuarReject(rid, reason_ptr[0..rlen]);
874899
return 0;
875900
}
876901
}
@@ -891,6 +916,7 @@ pub export fn coord_set_status(
891916
const slen: usize = @intCast(@min(status_len, 256));
892917
@memcpy(peers[idx].status[0..slen], status_ptr[0..slen]);
893918
peers[idx].status_len = @intCast(slen);
919+
dur.logPeerStatusSet(@intCast(idx), status_ptr[0..slen]);
894920
return 0;
895921
}
896922

@@ -900,13 +926,155 @@ pub export fn coord_set_status(
900926

901927
pub export fn boj_cartridge_init() c_int {
902928
coord_reset();
929+
_ = dur.open();
930+
if (dur.isEnabled()) {
931+
dur.replay(replayDispatch);
932+
}
903933
return 0;
904934
}
905935

906936
pub export fn boj_cartridge_deinit() void {
937+
dur.close();
907938
coord_reset();
908939
}
909940

941+
// ═══════════════════════════════════════════════════════════════════════
942+
// Replay dispatcher — reconstructs in-memory state from the durable log.
943+
// Called exactly once per record during boj_cartridge_init replay.
944+
// Events that can't apply (e.g. slot out of range, unknown request_id)
945+
// are silently skipped — the log is best-effort, never a correctness gate.
946+
// ═══════════════════════════════════════════════════════════════════════
947+
948+
fn replayDispatch(event: dur.EventType, payload: []const u8) void {
949+
switch (event) {
950+
.peer_add => {
951+
const d = dur.decodePeerAdd(payload) orelse return;
952+
if (d.slot_idx >= MAX_PEERS) return;
953+
const p = &peers[d.slot_idx];
954+
p.active = true;
955+
p.kind = @enumFromInt(d.kind);
956+
p.role = @enumFromInt(d.role);
957+
p.state = .active;
958+
p.suffix = d.suffix;
959+
p.token = d.token;
960+
p.inbox_head = 0;
961+
p.inbox_tail = 0;
962+
p.inbox_count = 0;
963+
p.status_len = 0;
964+
p.context_len = 0;
965+
},
966+
.peer_remove => {
967+
const idx = dur.decodeSlotIdx(payload) orelse return;
968+
if (idx >= MAX_PEERS) return;
969+
peers[idx].active = false;
970+
peers[idx].state = .gone;
971+
},
972+
.peer_role_set => {
973+
const d = dur.decodePeerRoleSet(payload) orelse return;
974+
if (d.slot_idx >= MAX_PEERS) return;
975+
peers[d.slot_idx].role = @enumFromInt(d.role);
976+
},
977+
.peer_context_set => {
978+
const d = dur.decodePeerContextSet(payload) orelse return;
979+
if (d.slot_idx >= MAX_PEERS) return;
980+
const p = &peers[d.slot_idx];
981+
if (d.ctx.len > MAX_CONTEXT) return;
982+
if (d.ctx.len > 0) @memcpy(p.context[0..d.ctx.len], d.ctx);
983+
p.context_len = @intCast(d.ctx.len);
984+
},
985+
.peer_status_set => {
986+
const d = dur.decodePeerStatusSet(payload) orelse return;
987+
if (d.slot_idx >= MAX_PEERS) return;
988+
const p = &peers[d.slot_idx];
989+
if (d.status.len > 256) return;
990+
if (d.status.len > 0) @memcpy(p.status[0..d.status.len], d.status);
991+
p.status_len = @intCast(d.status.len);
992+
},
993+
.inbox_push => {
994+
const d = dur.decodeInboxPush(payload) orelse return;
995+
if (d.target_idx >= MAX_PEERS) return;
996+
const p = &peers[d.target_idx];
997+
if (!p.active or p.inbox_count >= MAX_MESSAGES) return;
998+
const mlen: usize = @min(d.msg.len, 512);
999+
const head: usize = p.inbox_head;
1000+
if (mlen > 0) @memcpy(p.inbox[head][0..mlen], d.msg[0..mlen]);
1001+
p.inbox_lens[head] = @intCast(mlen);
1002+
p.inbox_head = @intCast((@as(u32, p.inbox_head) + 1) % MAX_MESSAGES);
1003+
p.inbox_count += 1;
1004+
},
1005+
.inbox_pop => {
1006+
const idx = dur.decodeSlotIdx(payload) orelse return;
1007+
if (idx >= MAX_PEERS) return;
1008+
const p = &peers[idx];
1009+
if (p.inbox_count == 0) return;
1010+
p.inbox_tail = @intCast((@as(u32, p.inbox_tail) + 1) % MAX_MESSAGES);
1011+
p.inbox_count -= 1;
1012+
},
1013+
.claim_add => {
1014+
const d = dur.decodeClaimAdd(payload) orelse return;
1015+
if (d.claim_idx >= MAX_CLAIMS) return;
1016+
if (d.holder_idx >= MAX_PEERS) return;
1017+
const c = &claims[d.claim_idx];
1018+
c.active = true;
1019+
c.holder_idx = d.holder_idx;
1020+
const tlen: usize = @min(d.task.len, 128);
1021+
if (tlen > 0) @memcpy(c.task_name[0..tlen], d.task[0..tlen]);
1022+
c.task_name_len = @intCast(tlen);
1023+
},
1024+
.claim_rel => {
1025+
const idx = dur.decodeSlotIdx(payload) orelse return;
1026+
if (idx >= MAX_CLAIMS) return;
1027+
claims[idx].active = false;
1028+
},
1029+
.quar_add => {
1030+
const d = dur.decodeQuarAdd(payload) orelse return;
1031+
// First empty slot; logged entries beyond MAX_QUARANTINE are
1032+
// dropped during replay (hot-cache-only in Phase 1).
1033+
for (&quarantine) |*q| {
1034+
if (!q.active) {
1035+
q.active = true;
1036+
q.request_id = d.request_id;
1037+
q.sender_idx = d.sender_idx;
1038+
q.target_idx = d.target_idx;
1039+
q.risk_tier = d.risk_tier;
1040+
const mlen: usize = @min(d.msg.len, 512);
1041+
if (mlen > 0) @memcpy(q.msg[0..mlen], d.msg[0..mlen]);
1042+
q.msg_len = @intCast(mlen);
1043+
q.reason_len = 0;
1044+
if (d.request_id >= next_request_id) next_request_id = d.request_id + 1;
1045+
return;
1046+
}
1047+
}
1048+
},
1049+
.quar_approve => {
1050+
const rid = dur.decodeRequestId(payload) orelse return;
1051+
for (&quarantine) |*q| {
1052+
if (q.active and q.request_id == rid) {
1053+
q.active = false;
1054+
return;
1055+
}
1056+
}
1057+
},
1058+
.quar_reject => {
1059+
const d = dur.decodeQuarReject(payload) orelse return;
1060+
for (&quarantine) |*q| {
1061+
if (q.active and q.request_id == d.request_id) {
1062+
const rlen: usize = @min(d.reason.len, MAX_REASON);
1063+
if (rlen > 0) @memcpy(q.reason[0..rlen], d.reason[0..rlen]);
1064+
q.reason_len = @intCast(rlen);
1065+
q.active = false;
1066+
return;
1067+
}
1068+
}
1069+
},
1070+
.audit, .track_update => {
1071+
// Append-only by design — nothing to reconstruct in live memory.
1072+
// Track-record aggregation lands in Task #13 (effective_affinity).
1073+
},
1074+
else => {},
1075+
}
1076+
}
1077+
9101078
pub export fn boj_cartridge_name() [*:0]const u8 {
9111079
return "local-coord-mcp";
9121080
}

0 commit comments

Comments
 (0)