From 27289487405016c5eca62286cbc07fe7b4725db2 Mon Sep 17 00:00:00 2001 From: cpunt Date: Tue, 12 May 2026 16:53:51 +0000 Subject: [PATCH 1/7] updater: enforce signed MCU images --- services/fabric/transfer_sink_rp2350.go | 7 ++- services/reactor/reactor.go | 2 +- services/updater/prestage_host.go | 3 + services/updater/prestage_tinygo.go | 70 ++++++++++----------- services/updater/receiver.go | 7 +-- services/updater/security.go | 62 +++++++++++++++++++ services/updater/updater_test.go | 77 ++++++++++++++++++++++++ services/updater/verifier.go | 32 ++++------ services/updater/verifier_passthrough.go | 50 --------------- 9 files changed, 196 insertions(+), 114 deletions(-) create mode 100644 services/updater/security.go delete mode 100644 services/updater/verifier_passthrough.go diff --git a/services/fabric/transfer_sink_rp2350.go b/services/fabric/transfer_sink_rp2350.go index d8b1d3f..7b8fdd3 100644 --- a/services/fabric/transfer_sink_rp2350.go +++ b/services/fabric/transfer_sink_rp2350.go @@ -55,7 +55,8 @@ func (s *streamedStageSink) Abort(reason string) error { return nil } -// Bytes returns nil because the TinyGo RP2350 default path streams directly -// into the inactive slot. fabric still calls updater/main staging; the -// updater consumes the pre-staged descriptor instead of an in-RAM artefact. +// Bytes returns nil because the TinyGo RP2350 default path verifies the signed +// container while streaming and writes only the authenticated payload into the +// inactive slot. fabric still calls updater/main staging; the updater consumes +// the verified staged descriptor instead of an in-RAM artefact. func (s *streamedStageSink) Bytes() []byte { return nil } diff --git a/services/reactor/reactor.go b/services/reactor/reactor.go index 8fac7c8..a988407 100644 --- a/services/reactor/reactor.go +++ b/services/reactor/reactor.go @@ -429,7 +429,7 @@ func (r *Reactor) Run(ctx context.Context) { identity := firmwareIdentity() updaterSvc := updater.New(updater.Options{ Conn: updaterConn, - Verifier: updater.PassthroughVerifier(identity), + Verifier: updater.SignedImageVerifier(), Applier: updater.ProductionApplier(), Identity: identity, }) diff --git a/services/updater/prestage_host.go b/services/updater/prestage_host.go index c78c83a..e7af695 100644 --- a/services/updater/prestage_host.go +++ b/services/updater/prestage_host.go @@ -3,6 +3,9 @@ package updater type streamedStage struct { + Version string + BuildID string + ImageID string Length uint32 PayloadSHA256 string } diff --git a/services/updater/prestage_tinygo.go b/services/updater/prestage_tinygo.go index bef919d..dbece20 100644 --- a/services/updater/prestage_tinygo.go +++ b/services/updater/prestage_tinygo.go @@ -3,17 +3,19 @@ package updater import ( - "crypto/sha256" - "encoding/hex" "errors" "pico2-a-b/abupdate" + "pico2-a-b/imagev1" ) -// streamedStage tracks a raw transfer that fabric has already streamed into -// the inactive A/B slot. It is the TinyGo bring-up path used before imagev1 -// verification can stream directly from the transfer source. +// streamedStage tracks a signed transfer that verified successfully while +// fabric streamed it. Only the signed payload bytes are written to the inactive +// A/B slot; the container header, manifest and signature are never staged. type streamedStage struct { + Version string + BuildID string + ImageID string Length uint32 PayloadSHA256 string } @@ -21,11 +23,10 @@ type streamedStage struct { var ( streamedStageDesc streamedStage streamedStageOK bool - streamedStageHash = sha256.New() - streamedStageLen uint32 + streamedVerifier *imagev1.StreamVerifier ) -// BeginStreamedStage prepares the inactive slot for a raw incoming transfer. +// BeginStreamedStage prepares verification for a signed incoming transfer. // The caller must subsequently call WriteStreamedStage and CommitStreamedStage // or AbortStreamedStage. func BeginStreamedStage(size uint32) error { @@ -35,17 +36,12 @@ func BeginStreamedStage(size uint32) error { sharedUpdater = abupdate.Updater{} sharedUpdaterInit = false - u, err := ensureUpdaterInited() - if err != nil { - return err - } - if rc := u.BeginUpdate(size); rc != 0 { - return errFromRC("begin_update", rc) - } - streamedStageHash.Reset() - streamedStageLen = 0 + _ = size streamedStageDesc = streamedStage{} streamedStageOK = false + streamedVerifier = imagev1.NewStreamVerifier(SignedImagePolicy(), func(payloadLen uint32) (imagev1.PayloadSink, error) { + return newSlotSink(payloadLen) + }) return nil } @@ -53,39 +49,41 @@ func WriteStreamedStage(data []byte) error { if len(data) == 0 { return errors.New("empty_chunk") } - u, err := ensureUpdaterInited() - if err != nil { - return err - } - if rc := u.WriteChunk(data); rc != 0 { - return errFromRC("write_chunk", rc) + if streamedVerifier == nil { + return errors.New("streamed_stage_not_started") } - _, _ = streamedStageHash.Write(data) - streamedStageLen += uint32(len(data)) - return nil + _, err := streamedVerifier.Write(data) + return err } func CommitStreamedStage() (uint32, error) { - u, err := ensureUpdaterInited() + if streamedVerifier == nil { + return 0, errors.New("streamed_stage_not_started") + } + res, err := streamedVerifier.Commit() if err != nil { + streamedVerifier = nil return 0, err } - if rc := u.FlushFinal(); rc != 0 { - return 0, errFromRC("flush_final", rc) - } streamedStageDesc = streamedStage{ - Length: streamedStageLen, - PayloadSHA256: hex.EncodeToString(streamedStageHash.Sum(nil)), + Version: res.Version, + BuildID: res.BuildID, + ImageID: res.ImageID, + Length: res.PayloadLength, + PayloadSHA256: res.PayloadSHA256, } streamedStageOK = true - return u.BytesWritten(), nil + streamedVerifier = nil + return res.PayloadLength, nil } func AbortStreamedStage() { + if streamedVerifier != nil { + _ = streamedVerifier.Abort() + streamedVerifier = nil + } streamedStageDesc = streamedStage{} streamedStageOK = false - streamedStageLen = 0 - streamedStageHash.Reset() } func consumeStreamedStage() (streamedStage, bool) { @@ -95,7 +93,5 @@ func consumeStreamedStage() (streamedStage, bool) { out := streamedStageDesc streamedStageDesc = streamedStage{} streamedStageOK = false - streamedStageLen = 0 - streamedStageHash.Reset() return out, true } diff --git a/services/updater/receiver.go b/services/updater/receiver.go index 51c33e8..5409093 100644 --- a/services/updater/receiver.go +++ b/services/updater/receiver.go @@ -44,11 +44,10 @@ func (s *Service) handleStage(msg *bus.Message) { s.reply(msg, StageReply{OK: false, Err: "artefact_missing"}) return } - stageIdentity, _ := identityFromStageMeta(s.identity, payload.Meta) desc := StagedDescriptor{ - Version: stageIdentity.Version, - BuildID: stageIdentity.Build, - ImageID: stageIdentity.ImageID, + Version: staged.Version, + BuildID: staged.BuildID, + ImageID: staged.ImageID, Length: staged.Length, Slot: 0, PayloadSHA256: staged.PayloadSHA256, diff --git a/services/updater/security.go b/services/updater/security.go new file mode 100644 index 0000000..3dfe227 --- /dev/null +++ b/services/updater/security.go @@ -0,0 +1,62 @@ +package updater + +import ( + "errors" + "io" + + "pico2-a-b/imagev1" +) + +var ( + SignedImageProductFamily = "bigbox" + SignedImageHardwareProfile = "bb-v1-cm5-2" + SignedImageMCUBoardFamily = "rp2354a" + SignedImageTrustedKeyID = "" + SignedImageTrustedPublicKey = "" +) + +type signedImageVerifier struct{} + +func SignedImageVerifier() Verifier { + return signedImageVerifier{} +} + +func SignedImagePolicy() imagev1.Policy { + var keys []imagev1.TrustedKey + if SignedImageTrustedKeyID != "" && SignedImageTrustedPublicKey != "" { + pub, err := imagev1.ParsePublicKeyHex(SignedImageTrustedPublicKey) + if err == nil { + keys = append(keys, imagev1.TrustedKey{ + KeyID: SignedImageTrustedKeyID, + PublicKey: pub, + }) + } + } + return imagev1.Policy{ + Target: imagev1.Target{ + ProductFamily: SignedImageProductFamily, + HardwareProfile: SignedImageHardwareProfile, + MCUBoardFamily: SignedImageMCUBoardFamily, + }, + Keys: keys, + } +} + +func (signedImageVerifier) Verify(r io.Reader, sink SlotSink) (Manifest, error) { + if sink == nil { + return Manifest{}, errors.New("signed_image: nil sink") + } + res, err := imagev1.Verify(r, SignedImagePolicy(), func(uint32) (imagev1.PayloadSink, error) { + return sink, nil + }) + if err != nil { + return Manifest{}, err + } + return Manifest{ + Version: res.Version, + BuildID: res.BuildID, + ImageID: res.ImageID, + PayloadSHA256: res.PayloadSHA256, + PayloadLength: res.PayloadLength, + }, nil +} diff --git a/services/updater/updater_test.go b/services/updater/updater_test.go index 86747e3..08170a1 100644 --- a/services/updater/updater_test.go +++ b/services/updater/updater_test.go @@ -3,6 +3,7 @@ package updater import ( "bytes" "context" + "crypto/ed25519" "encoding/hex" "encoding/json" "io" @@ -11,6 +12,7 @@ import ( "time" "devicecode-go/bus" + "pico2-a-b/imagev1" ) // ---- helpers -------------------------------------------------------- @@ -474,6 +476,81 @@ func TestStageStubVerifierPublishesFailed(t *testing.T) { } } +func TestStageSignedImageVerifierWritesManifestDescriptor(t *testing.T) { + seed := bytes.Repeat([]byte{0x42}, ed25519.SeedSize) + priv := ed25519.NewKeyFromSeed(seed) + pub := priv.Public().(ed25519.PublicKey) + target := imagev1.Target{ + ProductFamily: "bigbox", + HardwareProfile: "bb-v1-cm5-2", + MCUBoardFamily: "rp2354a", + } + artefact, _, err := imagev1.Pack([]byte("signed payload"), imagev1.PackOptions{ + Target: target, + Version: "13.0", + BuildID: "build-13.0", + ImageID: "mcu-dev-13.0", + KeyID: "test-key", + }, priv) + if err != nil { + t.Fatal(err) + } + + oldProduct := SignedImageProductFamily + oldProfile := SignedImageHardwareProfile + oldBoard := SignedImageMCUBoardFamily + oldKeyID := SignedImageTrustedKeyID + oldKey := SignedImageTrustedPublicKey + defer func() { + SignedImageProductFamily = oldProduct + SignedImageHardwareProfile = oldProfile + SignedImageMCUBoardFamily = oldBoard + SignedImageTrustedKeyID = oldKeyID + SignedImageTrustedPublicKey = oldKey + }() + SignedImageProductFamily = target.ProductFamily + SignedImageHardwareProfile = target.HardwareProfile + SignedImageMCUBoardFamily = target.MCUBoardFamily + SignedImageTrustedKeyID = "test-key" + SignedImageTrustedPublicKey = hex.EncodeToString(pub) + + b := newTestBus() + conn := b.NewConnection("updater") + caller := b.NewConnection("caller") + memMD := NewMemoryMetadata() + _, cancel := runService(t, b, Options{ + Conn: conn, + Verifier: SignedImageVerifier(), + Metadata: memMD, + MetadataWrite: memMD, + }) + defer cancel() + + req := caller.NewMessage(TopicStageRPC, testStagePayload("signed-xfer", artefact), false) + replySub := caller.Request(req) + defer caller.Unsubscribe(replySub) + select { + case msg := <-replySub.Channel(): + reply, _ := msg.Payload.(StageReply) + if !reply.OK { + t.Fatalf("stage reply not ok: %+v", reply) + } + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for stage reply") + } + + desc, ok := memMD.StagedDescriptor() + if !ok { + t.Fatal("staged descriptor not persisted") + } + if desc.Version != "13.0" || desc.BuildID != "build-13.0" || desc.ImageID != "mcu-dev-13.0" { + t.Fatalf("descriptor wrong: %+v", desc) + } + if desc.Length != uint32(len("signed payload")) || len(desc.PayloadSHA256) != 64 { + t.Fatalf("descriptor payload metadata wrong: %+v", desc) + } +} + func TestStageFakeAcceptWritesStagedDescriptor(t *testing.T) { // W11: on verifier success staging writes the manifest's // fields to the metadata writer. A subsequent commit RPC reads diff --git a/services/updater/verifier.go b/services/updater/verifier.go index 6adaf11..dc800ea 100644 --- a/services/updater/verifier.go +++ b/services/updater/verifier.go @@ -6,10 +6,9 @@ import ( ) // Manifest is the small subset of the signed-image manifest that updater -// staging needs after verification succeeds. The full canonical manifest -// lives in pico2-a-b/imagev1 (added in fabric-security); this type is the -// local interface we can carry across the staging -> updater -> -// state/self/updater pipeline without depending on imagev1. +// staging needs after verification succeeds. The full canonical manifest lives +// in pico2-a-b/imagev1; this type is the local interface carried across the +// staging -> updater -> state/self/updater pipeline. type Manifest struct { Version string BuildID string @@ -32,11 +31,9 @@ type SlotSink interface { Abort() error } -// Verifier is updater/main staging's hook into signed-image verification. The -// production wiring on the fabric-update branch passes a stub that -// always rejects (ErrUnsignedNotSupported); fabric-security ships a -// real adapter over pico2-a-b/imagev1.Verify that fills the same -// interface. +// Verifier is updater/main staging's hook into signed-image verification. +// Production wiring uses SignedImageVerifier; tests may pass fakes, and nil +// Options.Verifier falls back to the rejecting StubVerifier. type Verifier interface { // Verify reads the artefact bytes from r, validates the signed // envelope (header + manifest + signature), and on success streams @@ -58,11 +55,10 @@ var ErrUnsignedNotSupported = errors.New("verifier_stub: unsigned images not sup // reboot fires; an implementation that reboots inside Apply would otherwise // skip both the wire reply and the state/self/updater retain. // -// The fabric-update branch ships a refusing default (RefusingApplier) -// so the commit RPC never lies about apply success on a branch where -// the apply path doesn't exist. fabric-security supplies a real -// abupdate-backed implementation that triggers REBOOT_TYPE_FLASH_UPDATE -// into the staged slot. +// New() still defaults to RefusingApplier so tests and host builds never claim +// apply success without an explicit production applier. Reactor wiring supplies +// the abupdate-backed implementation that triggers REBOOT_TYPE_FLASH_UPDATE into +// the staged slot. type Applier interface { // CanApply validates that the apply path is wired and the // descriptor is acceptable. Quick, no side effects beyond minimal @@ -97,13 +93,11 @@ func (refusingApplier) CanApply(d StagedDescriptor) error { // calls this. Defined for interface conformance. func (refusingApplier) ArmReboot(d StagedDescriptor) { _ = d } -// stubVerifier is the production verifier this branch ships with. It -// always rejects, so no untrusted firmware can stage. fabric-security -// replaces this with a real imagev1-backed adapter. +// stubVerifier is the safe default when no verifier is wired. It always rejects +// so no unsigned firmware can stage accidentally. type stubVerifier struct{} -// StubVerifier returns the rejecting production verifier. Staging wiring -// takes a Verifier; production passes this, tests pass fakes. +// StubVerifier returns the rejecting verifier used as New's default. func StubVerifier() Verifier { return stubVerifier{} } func (stubVerifier) Verify(r io.Reader, sink SlotSink) (Manifest, error) { diff --git a/services/updater/verifier_passthrough.go b/services/updater/verifier_passthrough.go deleted file mode 100644 index e1e6f91..0000000 --- a/services/updater/verifier_passthrough.go +++ /dev/null @@ -1,50 +0,0 @@ -package updater - -import ( - "crypto/sha256" - "encoding/hex" - "errors" - "io" -) - -// passthroughVerifier accepts any artefact, streams its bytes straight -// into sink while computing SHA-256, and returns a synthetic manifest -// with the artefact length + computed hash. Intended for the bringup -// stack on this branch where the signed-image v1 envelope (header + -// canonical manifest + Ed25519 signature) is not yet implemented. -// -// Replace with a real verifier when fabric-security lands; this exists -// so fw-update-e2e can drive the staging → applier → reboot path -// end-to-end without the signed-image scaffolding in place. -type passthroughVerifier struct { - identity Identity -} - -// PassthroughVerifier returns a Verifier that accepts any artefact and -// fills the manifest with identity (caller-supplied), the artefact -// length, and the SHA-256 of the streamed payload. Reboot-time apply -// is gated by the Applier; a passthrough verifier without a real -// applier still ends with state=failed(apply_unavailable) at commit. -func PassthroughVerifier(identity Identity) Verifier { - return passthroughVerifier{identity: identity} -} - -func (v passthroughVerifier) Verify(r io.Reader, sink SlotSink) (Manifest, error) { - if sink == nil { - return Manifest{}, errors.New("passthrough_verifier: nil sink") - } - hasher := sha256.New() - mw := io.MultiWriter(sink, hasher) - n, err := io.Copy(mw, r) - if err != nil { - _ = sink.Abort() - return Manifest{}, err - } - return Manifest{ - Version: v.identity.Version, - BuildID: v.identity.Build, - ImageID: v.identity.ImageID, - PayloadSHA256: hex.EncodeToString(hasher.Sum(nil)), - PayloadLength: uint32(n), - }, nil -} From 174e35cafaf8bb3146ab2209bd2e9222ec9a8aa8 Mon Sep 17 00:00:00 2001 From: cpunt Date: Tue, 12 May 2026 21:33:24 +0000 Subject: [PATCH 2/7] fabric: retry malformed transfer chunks --- services/fabric/transfer.go | 5 +++-- services/fabric/transfer_test.go | 26 +++++++++++++++++--------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/services/fabric/transfer.go b/services/fabric/transfer.go index dd4155a..b29e3a8 100644 --- a/services/fabric/transfer.go +++ b/services/fabric/transfer.go @@ -259,8 +259,9 @@ func (s *session) onTransferChunk(msg *protoXferChunk) { } raw, errStr := decodeChunkData(msg.Data) if errStr != "" { - s.abortTransfer(errStr) - s.sendTransferAbort(id, errStr) + s.logKV("xfer_chunk decode retry", "err", errStr) + s.sendTransferNeed(id, cur.bytesWritten) + cur.deadline = time.Now().Add(s.cfg.PhaseTimeout) return } if len(raw) == 0 { diff --git a/services/fabric/transfer_test.go b/services/fabric/transfer_test.go index 872566a..ae2f196 100644 --- a/services/fabric/transfer_test.go +++ b/services/fabric/transfer_test.go @@ -271,9 +271,8 @@ func TestTransferReceiveSuccess(t *testing.T) { } func TestTransferChunkBadOffsetAborts(t *testing.T) { - // Lua transfer_mgr aborts and clears the active transfer on chunk faults - // (unexpected_offset, invalid_chunk_encoding, size_too_large). Match that — do not - // keep the transfer alive with an xfer_need. + // Unexpected offsets mean sender/receiver state has diverged; abort rather + // than asking for a resend. b := newBus() cm5, mcu := pipePair() ctx, cancel := context.WithCancel(context.Background()) @@ -309,7 +308,7 @@ func TestTransferChunkBadOffsetAborts(t *testing.T) { } } -func TestTransferChunkDecodeFailureAborts(t *testing.T) { +func TestTransferChunkDecodeFailureRequestsSameOffset(t *testing.T) { b := newBus() cm5, mcu := pipePair() ctx, cancel := context.WithCancel(context.Background()) @@ -332,12 +331,21 @@ func TestTransferChunkDecodeFailureAborts(t *testing.T) { ChunkDigest: xxhashStr(payload), }) - abort := readMsg[protoXferAbort](t, cm5) - if abort.Err != "invalid_chunk_encoding" { - t.Fatalf("bad xfer_abort: %+v", abort) + need := readMsg[protoXferNeed](t, cm5) + if need.Type != msgXferNeed || need.XferID != "xfer-d1" || need.Next != 0 { + t.Fatalf("bad retry xfer_need: %+v", need) } - if len(sink.abortReasons) == 0 { - t.Fatal("expected sink.Abort on decode failure") + if len(sink.abortReasons) != 0 { + t.Fatalf("sink.Abort called on recoverable decode failure: %v", sink.abortReasons) + } + if len(sink.writes) != 0 { + t.Fatalf("sink received %d writes before decode passed", len(sink.writes)) + } + + sendMsg(t, cm5, xferChunk("xfer-d1", 0, payload)) + need = readMsg[protoXferNeed](t, cm5) + if need.Next != uint32(len(payload)) { + t.Fatalf("xfer_need.next after retry = %d, want %d", need.Next, len(payload)) } } From 1184865e4b9dae8bd239b518510ec083a7813027 Mon Sep 17 00:00:00 2001 From: cpunt Date: Tue, 12 May 2026 21:58:01 +0000 Subject: [PATCH 3/7] fabric: bind transfer chunks to offsets --- services/fabric/protocol.go | 16 +++++++---- services/fabric/transfer.go | 18 ++++++++++++ services/fabric/transfer_test.go | 48 ++++++++++++++++++++++++++++---- 3 files changed, 71 insertions(+), 11 deletions(-) diff --git a/services/fabric/protocol.go b/services/fabric/protocol.go index a5d3bca..40f53ed 100644 --- a/services/fabric/protocol.go +++ b/services/fabric/protocol.go @@ -113,13 +113,17 @@ type protoXferReady struct { } // protoXferChunk carries unpadded base64url data plus a required xxhash32 -// digest over the raw decoded chunk bytes. +// digest over the raw decoded chunk bytes. ChunkOffsetDigest is optional for +// compatibility with older CM5 senders; when present, it binds the chunk bytes +// to the transfer id and byte offset so spliced valid chunks are retried before +// whole-object commit. type protoXferChunk struct { - Type string `json:"type"` - XferID string `json:"xfer_id"` - Offset uint32 `json:"offset"` - Data string `json:"data"` - ChunkDigest string `json:"chunk_digest"` + Type string `json:"type"` + XferID string `json:"xfer_id"` + Offset uint32 `json:"offset"` + Data string `json:"data"` + ChunkDigest string `json:"chunk_digest"` + ChunkOffsetDigest string `json:"chunk_offset_digest,omitempty"` } // protoXferNeed (control) acks the MCU's expected next byte offset. diff --git a/services/fabric/transfer.go b/services/fabric/transfer.go index b29e3a8..ed3ed99 100644 --- a/services/fabric/transfer.go +++ b/services/fabric/transfer.go @@ -101,6 +101,16 @@ func decodeChunkData(encoded string) ([]byte, string) { return raw, "" } +func chunkOffsetDigest(id string, offset uint32, raw []byte) string { + h := xxhash.New(0) + _, _ = h.Write([]byte(id)) + _, _ = h.Write([]byte{0}) + _, _ = h.Write([]byte(u32s(offset))) + _, _ = h.Write([]byte{0}) + _, _ = h.Write(raw) + return xxhashHex(h.Sum32()) +} + func (s *session) sendTransferReady(id string) bool { return s.sendControl(marshal(protoXferReady{ Type: msgXferReady, @@ -299,6 +309,14 @@ func (s *session) onTransferChunk(msg *protoXferChunk) { cur.deadline = time.Now().Add(s.cfg.PhaseTimeout) return } + if msg.ChunkOffsetDigest != "" { + want, ok := canonicalXXHash32Hex(msg.ChunkOffsetDigest) + if !ok || want != chunkOffsetDigest(msg.XferID, msg.Offset, raw) { + s.sendTransferNeed(cur.meta.ID, cur.bytesWritten) + cur.deadline = time.Now().Add(s.cfg.PhaseTimeout) + return + } + } if err := cur.sink.WriteChunk(msg.Offset, raw); err != nil { reason := err.Error() s.logKV("transfer write failed", "err", reason) diff --git a/services/fabric/transfer_test.go b/services/fabric/transfer_test.go index ae2f196..f9ace00 100644 --- a/services/fabric/transfer_test.go +++ b/services/fabric/transfer_test.go @@ -95,11 +95,12 @@ func xferBegin(id string, payload []byte, meta json.RawMessage) protoXferBegin { func xferChunk(id string, off uint32, payload []byte) protoXferChunk { return protoXferChunk{ - Type: msgXferChunk, - XferID: id, - Offset: off, - Data: rawURL(payload), - ChunkDigest: xxhashStr(payload), + Type: msgXferChunk, + XferID: id, + Offset: off, + Data: rawURL(payload), + ChunkDigest: xxhashStr(payload), + ChunkOffsetDigest: chunkOffsetDigest(id, off, payload), } } @@ -415,6 +416,43 @@ func TestTransferChunkDigestMismatchRequestsSameOffset(t *testing.T) { } } +func TestTransferChunkOffsetDigestMismatchRequestsSameOffset(t *testing.T) { + b := newBus() + cm5, mcu := pipePair() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sink := &fakeTransferSink{} + go runSessionWithSink(ctx, mcu, b.NewConnection("fabric"), sink) + bringUp(t, cm5) + + payload := []byte("abcd") + sendMsg(t, cm5, xferBegin("xfer-bad-offset-digest", payload, nil)) + readTransferReady(t, cm5, "xfer-bad-offset-digest", 0) + + sendMsg(t, cm5, protoXferChunk{ + Type: msgXferChunk, + XferID: "xfer-bad-offset-digest", + Offset: 0, + Data: rawURL(payload), + ChunkDigest: xxhashStr(payload), + ChunkOffsetDigest: chunkOffsetDigest("xfer-bad-offset-digest", 1, payload), + }) + need := readMsg[protoXferNeed](t, cm5) + if need.Next != 0 { + t.Fatalf("retry xfer_need.next = %d, want 0", need.Next) + } + if len(sink.writes) != 0 { + t.Fatalf("sink received %d writes before offset digest passed", len(sink.writes)) + } + + sendMsg(t, cm5, xferChunk("xfer-bad-offset-digest", 0, payload)) + need = readMsg[protoXferNeed](t, cm5) + if need.Next != uint32(len(payload)) { + t.Fatalf("xfer_need.next after retry = %d, want %d", need.Next, len(payload)) + } +} + func TestTransferChunkSizeOverflowAborts(t *testing.T) { b := newBus() cm5, mcu := pipePair() From 5df82e23983e0bd581bded5806dc4da8792b6880 Mon Sep 17 00:00:00 2001 From: cpunt Date: Tue, 12 May 2026 22:15:41 +0000 Subject: [PATCH 4/7] fabric: retry idle transfer offsets --- services/fabric/transfer.go | 10 ++++++++++ services/fabric/transfer_test.go | 11 +++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/services/fabric/transfer.go b/services/fabric/transfer.go index ed3ed99..151baaa 100644 --- a/services/fabric/transfer.go +++ b/services/fabric/transfer.go @@ -13,6 +13,7 @@ import ( ) const transferTargetUpdaterMain = "updater/main" +const transferIdleRetryLimit = 3 // transferMeta captures xfer_begin contents. The transfer target is explicit // on the wire; firmware update uses target="updater/main". meta remains opaque @@ -58,6 +59,7 @@ type incomingTransfer struct { bytesWritten uint32 chunksSeen uint32 hasher *xxhash.Hasher + idleRetries uint8 // deadline is the idle-chunk watchdog: bumped on every accepted chunk // and on initial xfer_begin. checkTransferTimeout fires if now > deadline. // Mirrors transfer_mgr.lua: `active.deadline = runtime.now() + phase_timeout`. @@ -170,6 +172,13 @@ func (s *session) checkTransferTimeout(now time.Time) { if !now.After(cur.deadline) { return } + if cur.idleRetries < transferIdleRetryLimit { + cur.idleRetries++ + cur.deadline = now.Add(s.cfg.PhaseTimeout) + s.logKV("transfer idle retry", "offset", u32s(cur.bytesWritten)) + s.sendTransferNeed(cur.meta.ID, cur.bytesWritten) + return + } id := cur.meta.ID s.abortTransfer("timeout") s.sendTransferAbort(id, "timeout") @@ -327,6 +336,7 @@ func (s *session) onTransferChunk(msg *protoXferChunk) { _, _ = cur.hasher.Write(raw) cur.bytesWritten += uint32(len(raw)) cur.chunksSeen++ + cur.idleRetries = 0 cur.deadline = time.Now().Add(s.cfg.PhaseTimeout) raw = nil // Forced GC after each absorbed chunk eliminates firmware-transfer byte diff --git a/services/fabric/transfer_test.go b/services/fabric/transfer_test.go index f9ace00..45aea05 100644 --- a/services/fabric/transfer_test.go +++ b/services/fabric/transfer_test.go @@ -672,8 +672,15 @@ func TestTransferIdleChunkWatchdog(t *testing.T) { sendMsg(t, cm5, xferBegin("xfer-wd", payload, nil)) readTransferReady(t, cm5, "xfer-wd", 0) - // Stop sending chunks; watchdog should fire within ~PhaseTimeout + - // one exportTickInterval (50ms). + // Stop sending chunks. The watchdog should resend the current offset a + // bounded number of times before aborting, so a lost xfer_need does not + // strand both sides until the first idle timeout. + for i := 0; i < transferIdleRetryLimit; i++ { + need := readMsg[protoXferNeed](t, cm5) + if need.Type != msgXferNeed || need.XferID != "xfer-wd" || need.Next != 0 { + t.Fatalf("bad retry xfer_need[%d]: %+v", i, need) + } + } abort := readMsg[protoXferAbort](t, cm5) if abort.Type != msgXferAbort || abort.XferID != "xfer-wd" || abort.Err != "timeout" { t.Fatalf("bad xfer_abort: %+v", abort) From ac864b77d5e7ac37263b0746714e3c180e00f878 Mon Sep 17 00:00:00 2001 From: cpunt Date: Wed, 13 May 2026 09:38:07 +0000 Subject: [PATCH 5/7] fabric: lower OTA transfer chunk size --- services/fabric/fabric.go | 4 ++-- services/updater/types.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/services/fabric/fabric.go b/services/fabric/fabric.go index 91cf908..8cb2e0d 100644 --- a/services/fabric/fabric.go +++ b/services/fabric/fabric.go @@ -27,7 +27,7 @@ const defaultLinkID = "mcu0" type LinkConfig struct { // ChunkSize is the expected raw-byte payload per xfer_chunk. The MCU // is receive-only for transfers, so this is informational/validation - // only on the Go side. Release: 2048 bytes. + // only on the Go side. Release: 1024 bytes. ChunkSize uint32 // PhaseTimeout is the idle-chunk watchdog: an active inbound transfer // is aborted with reason="timeout" if no xfer_chunk arrives within @@ -57,7 +57,7 @@ type LinkConfig struct { func DefaultLinkConfig() LinkConfig { return LinkConfig{ - ChunkSize: 2048, + ChunkSize: 1024, PhaseTimeout: 15 * time.Second, PingInterval: 10 * time.Second, LivenessTimeout: 30 * time.Second, diff --git a/services/updater/types.go b/services/updater/types.go index 650655c..c84d4be 100644 --- a/services/updater/types.go +++ b/services/updater/types.go @@ -32,7 +32,7 @@ const ( PrepareTargetMCU = "mcu" TargetUpdaterMain = "updater/main" DigestAlgXXHash32 = "xxhash32" - DefaultMaxChunkSize uint32 = 2048 + DefaultMaxChunkSize uint32 = 1024 ) // PrepareRequest mirrors the current cmd/self/updater/prepare payload. From 93e3ec5570f0af8113af44cad0eefa4489a1fe2b Mon Sep 17 00:00:00 2001 From: cpunt Date: Mon, 18 May 2026 19:06:18 +0000 Subject: [PATCH 6/7] fabric: align security transfer wire contract --- services/fabric/protocol.go | 16 ++++------- services/fabric/transfer.go | 26 ++--------------- services/fabric/transfer_test.go | 48 ++++---------------------------- 3 files changed, 13 insertions(+), 77 deletions(-) diff --git a/services/fabric/protocol.go b/services/fabric/protocol.go index 40f53ed..a5d3bca 100644 --- a/services/fabric/protocol.go +++ b/services/fabric/protocol.go @@ -113,17 +113,13 @@ type protoXferReady struct { } // protoXferChunk carries unpadded base64url data plus a required xxhash32 -// digest over the raw decoded chunk bytes. ChunkOffsetDigest is optional for -// compatibility with older CM5 senders; when present, it binds the chunk bytes -// to the transfer id and byte offset so spliced valid chunks are retried before -// whole-object commit. +// digest over the raw decoded chunk bytes. type protoXferChunk struct { - Type string `json:"type"` - XferID string `json:"xfer_id"` - Offset uint32 `json:"offset"` - Data string `json:"data"` - ChunkDigest string `json:"chunk_digest"` - ChunkOffsetDigest string `json:"chunk_offset_digest,omitempty"` + Type string `json:"type"` + XferID string `json:"xfer_id"` + Offset uint32 `json:"offset"` + Data string `json:"data"` + ChunkDigest string `json:"chunk_digest"` } // protoXferNeed (control) acks the MCU's expected next byte offset. diff --git a/services/fabric/transfer.go b/services/fabric/transfer.go index 151baaa..838259f 100644 --- a/services/fabric/transfer.go +++ b/services/fabric/transfer.go @@ -103,16 +103,6 @@ func decodeChunkData(encoded string) ([]byte, string) { return raw, "" } -func chunkOffsetDigest(id string, offset uint32, raw []byte) string { - h := xxhash.New(0) - _, _ = h.Write([]byte(id)) - _, _ = h.Write([]byte{0}) - _, _ = h.Write([]byte(u32s(offset))) - _, _ = h.Write([]byte{0}) - _, _ = h.Write(raw) - return xxhashHex(h.Sum32()) -} - func (s *session) sendTransferReady(id string) bool { return s.sendControl(marshal(protoXferReady{ Type: msgXferReady, @@ -267,9 +257,6 @@ func (s *session) onTransferChunk(msg *protoXferChunk) { s.logKV("xfer_chunk dropped", "id", msg.XferID) return } - // Lua transfer_mgr.lua aborts and clears the active transfer on any - // chunk-level fault (unexpected offset, decode failure, size mismatch). - // Match that — do not send xfer_need + keep alive. id := cur.meta.ID if msg.Offset != cur.bytesWritten { s.abortTransfer("unexpected_offset") @@ -318,14 +305,6 @@ func (s *session) onTransferChunk(msg *protoXferChunk) { cur.deadline = time.Now().Add(s.cfg.PhaseTimeout) return } - if msg.ChunkOffsetDigest != "" { - want, ok := canonicalXXHash32Hex(msg.ChunkOffsetDigest) - if !ok || want != chunkOffsetDigest(msg.XferID, msg.Offset, raw) { - s.sendTransferNeed(cur.meta.ID, cur.bytesWritten) - cur.deadline = time.Now().Add(s.cfg.PhaseTimeout) - return - } - } if err := cur.sink.WriteChunk(msg.Offset, raw); err != nil { reason := err.Error() s.logKV("transfer write failed", "err", reason) @@ -339,9 +318,8 @@ func (s *session) onTransferChunk(msg *protoXferChunk) { cur.idleRetries = 0 cur.deadline = time.Now().Add(s.cfg.PhaseTimeout) raw = nil - // Forced GC after each absorbed chunk eliminates firmware-transfer byte - // drops on the safe-window allocator. Do NOT remove this without - // reproducing the regression in firmware-mono/docs/old/FABRIC_TRANSFER_FIX.md. + // A collection point after each accepted chunk keeps UART processing inside + // the safe window described in docs/old/FABRIC_TRANSFER_FIX.md. runtime.GC() s.sendTransferNeed(cur.meta.ID, cur.bytesWritten) } diff --git a/services/fabric/transfer_test.go b/services/fabric/transfer_test.go index 45aea05..af040b2 100644 --- a/services/fabric/transfer_test.go +++ b/services/fabric/transfer_test.go @@ -95,12 +95,11 @@ func xferBegin(id string, payload []byte, meta json.RawMessage) protoXferBegin { func xferChunk(id string, off uint32, payload []byte) protoXferChunk { return protoXferChunk{ - Type: msgXferChunk, - XferID: id, - Offset: off, - Data: rawURL(payload), - ChunkDigest: xxhashStr(payload), - ChunkOffsetDigest: chunkOffsetDigest(id, off, payload), + Type: msgXferChunk, + XferID: id, + Offset: off, + Data: rawURL(payload), + ChunkDigest: xxhashStr(payload), } } @@ -416,43 +415,6 @@ func TestTransferChunkDigestMismatchRequestsSameOffset(t *testing.T) { } } -func TestTransferChunkOffsetDigestMismatchRequestsSameOffset(t *testing.T) { - b := newBus() - cm5, mcu := pipePair() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - sink := &fakeTransferSink{} - go runSessionWithSink(ctx, mcu, b.NewConnection("fabric"), sink) - bringUp(t, cm5) - - payload := []byte("abcd") - sendMsg(t, cm5, xferBegin("xfer-bad-offset-digest", payload, nil)) - readTransferReady(t, cm5, "xfer-bad-offset-digest", 0) - - sendMsg(t, cm5, protoXferChunk{ - Type: msgXferChunk, - XferID: "xfer-bad-offset-digest", - Offset: 0, - Data: rawURL(payload), - ChunkDigest: xxhashStr(payload), - ChunkOffsetDigest: chunkOffsetDigest("xfer-bad-offset-digest", 1, payload), - }) - need := readMsg[protoXferNeed](t, cm5) - if need.Next != 0 { - t.Fatalf("retry xfer_need.next = %d, want 0", need.Next) - } - if len(sink.writes) != 0 { - t.Fatalf("sink received %d writes before offset digest passed", len(sink.writes)) - } - - sendMsg(t, cm5, xferChunk("xfer-bad-offset-digest", 0, payload)) - need = readMsg[protoXferNeed](t, cm5) - if need.Next != uint32(len(payload)) { - t.Fatalf("xfer_need.next after retry = %d, want %d", need.Next, len(payload)) - } -} - func TestTransferChunkSizeOverflowAborts(t *testing.T) { b := newBus() cm5, mcu := pipePair() From 086d696f2550935c766dd0adfd8695503add5353 Mon Sep 17 00:00:00 2001 From: cpunt Date: Tue, 19 May 2026 09:13:27 +0000 Subject: [PATCH 7/7] updater: drop stale staging metadata helpers --- services/fabric/session.go | 8 ---- services/updater/receiver.go | 69 -------------------------------- services/updater/updater_test.go | 47 ---------------------- 3 files changed, 124 deletions(-) diff --git a/services/fabric/session.go b/services/fabric/session.go index 80bbd23..4f5d0e6 100644 --- a/services/fabric/session.go +++ b/services/fabric/session.go @@ -809,14 +809,6 @@ func checkBusError(payload any) string { return "" } -func mustMarshal(v any) json.RawMessage { - b, err := json.Marshal(v) - if err != nil { - return json.RawMessage(`{"error":"marshal_failed"}`) - } - return json.RawMessage(b) -} - func topicEquals(t bus.Topic, expected bus.Topic) bool { if t.Len() != expected.Len() { return false diff --git a/services/updater/receiver.go b/services/updater/receiver.go index 5409093..11d1527 100644 --- a/services/updater/receiver.go +++ b/services/updater/receiver.go @@ -118,72 +118,3 @@ func (s *Service) handleStage(msg *bus.Message) { // running image, while this descriptor describes the staged image. s.reply(msg, StageReply{OK: true, Stage: "staged"}) } - -type stageMetadata struct { - Version string `json:"version,omitempty"` - Build string `json:"build,omitempty"` - BuildID string `json:"build_id,omitempty"` - ImageID string `json:"image_id,omitempty"` - ExpectedImageID string `json:"expected_image_id,omitempty"` -} - -type stageMetadataEnvelope struct { - Metadata stageMetadata `json:"metadata,omitempty"` - Meta stageMetadata `json:"meta,omitempty"` - Request struct { - Metadata stageMetadata `json:"metadata,omitempty"` - Meta stageMetadata `json:"meta,omitempty"` - ExpectedImageID string `json:"expected_image_id,omitempty"` - } `json:"request,omitempty"` -} - -func applyStageMetadata(ident *Identity, md stageMetadata) bool { - applied := false - if md.Version != "" { - ident.Version = md.Version - applied = true - } - if md.BuildID != "" { - ident.Build = md.BuildID - applied = true - } else if md.Build != "" { - ident.Build = md.Build - applied = true - } - if md.ImageID != "" { - ident.ImageID = md.ImageID - applied = true - } else if md.ExpectedImageID != "" { - ident.ImageID = md.ExpectedImageID - applied = true - } - return applied -} - -func identityFromStageMeta(defaults Identity, meta any) (Identity, bool) { - ident := defaults - applied := false - md, ok := jsonDecode[stageMetadata](meta) - if ok { - applied = applyStageMetadata(&ident, md) || applied - } - - env, ok := jsonDecode[stageMetadataEnvelope](meta) - if !ok { - return ident, applied - } - applied = applyStageMetadata(&ident, env.Metadata) || applied - applied = applyStageMetadata(&ident, env.Meta) || applied - if env.Request.ExpectedImageID != "" && env.Request.Metadata.ExpectedImageID == "" { - env.Request.Metadata.ExpectedImageID = env.Request.ExpectedImageID - } - if env.Request.ExpectedImageID != "" && env.Request.Meta.ExpectedImageID == "" { - env.Request.Meta.ExpectedImageID = env.Request.ExpectedImageID - } - applied = applyStageMetadata(&ident, env.Request.Metadata) || applied - applied = applyStageMetadata(&ident, env.Request.Meta) || applied - if !applied { - return ident, false - } - return ident, true -} diff --git a/services/updater/updater_test.go b/services/updater/updater_test.go index 08170a1..2b16e5a 100644 --- a/services/updater/updater_test.go +++ b/services/updater/updater_test.go @@ -870,53 +870,6 @@ func TestJSONDecodeAcceptsTypedAndRaw(t *testing.T) { } } -func TestIdentityFromStageMetaAppliesUploadMetadata(t *testing.T) { - defaults := Identity{Version: "0.0.0-dev", Build: "local", ImageID: "img-dev"} - meta := json.RawMessage(`{"version":"13.0","build":"fw-update-e2e-13.0","image_id":"mcu-dev-13.0"}`) - - ident, applied := identityFromStageMeta(defaults, meta) - if !applied { - t.Fatal("metadata was not applied") - } - if ident.Version != "13.0" || ident.Build != "fw-update-e2e-13.0" || ident.ImageID != "mcu-dev-13.0" { - t.Fatalf("identity = %+v", ident) - } -} - -func TestIdentityFromStageMetaAcceptsBuildIDAndKeepsDefaults(t *testing.T) { - defaults := Identity{Version: "1.0.0", Build: "old-build", ImageID: "old-img"} - meta := map[string]any{"build_id": "new-build"} - - ident, applied := identityFromStageMeta(defaults, meta) - if !applied { - t.Fatal("metadata was not applied") - } - if ident.Version != "1.0.0" || ident.Build != "new-build" || ident.ImageID != "old-img" { - t.Fatalf("identity = %+v", ident) - } -} - -func TestIdentityFromStageMetaAcceptsNestedRequestMetadata(t *testing.T) { - defaults := Identity{Version: "0.0.0-dev", Build: "local", ImageID: "img-dev"} - meta := map[string]any{ - "request": map[string]any{ - "expected_image_id": "mcu-dev-13.0", - "metadata": map[string]any{ - "version": "13.0", - "build": "fw-update-e2e-13.0", - }, - }, - } - - ident, applied := identityFromStageMeta(defaults, meta) - if !applied { - t.Fatal("metadata was not applied") - } - if ident.Version != "13.0" || ident.Build != "fw-update-e2e-13.0" || ident.ImageID != "mcu-dev-13.0" { - t.Fatalf("identity = %+v", ident) - } -} - // ---- memorySink behaviour ------------------------------------------- func TestMemorySinkAbortClearsBuffer(t *testing.T) {