Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions services/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const defaultLinkID = "mcu0"
type LinkConfig struct {
// ChunkSize is the expected raw-byte payload per xfer_chunk. The MCU
// is receive-only for transfers, so this is informational/validation
// only on the Go side. Release: 2048 bytes.
// only on the Go side. Release: 1024 bytes.
ChunkSize uint32
// PhaseTimeout is the idle-chunk watchdog: an active inbound transfer
// is aborted with reason="timeout" if no xfer_chunk arrives within
Expand Down Expand Up @@ -57,7 +57,7 @@ type LinkConfig struct {

func DefaultLinkConfig() LinkConfig {
return LinkConfig{
ChunkSize: 2048,
ChunkSize: 1024,
PhaseTimeout: 15 * time.Second,
PingInterval: 10 * time.Second,
LivenessTimeout: 30 * time.Second,
Expand Down
8 changes: 0 additions & 8 deletions services/fabric/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,14 +809,6 @@ func checkBusError(payload any) string {
return ""
}

func mustMarshal(v any) json.RawMessage {
b, err := json.Marshal(v)
if err != nil {
return json.RawMessage(`{"error":"marshal_failed"}`)
}
return json.RawMessage(b)
}

func topicEquals(t bus.Topic, expected bus.Topic) bool {
if t.Len() != expected.Len() {
return false
Expand Down
23 changes: 15 additions & 8 deletions services/fabric/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

const transferTargetUpdaterMain = "updater/main"
const transferIdleRetryLimit = 3

// transferMeta captures xfer_begin contents. The transfer target is explicit
// on the wire; firmware update uses target="updater/main". meta remains opaque
Expand Down Expand Up @@ -58,6 +59,7 @@ type incomingTransfer struct {
bytesWritten uint32
chunksSeen uint32
hasher *xxhash.Hasher
idleRetries uint8
// deadline is the idle-chunk watchdog: bumped on every accepted chunk
// and on initial xfer_begin. checkTransferTimeout fires if now > deadline.
// Mirrors transfer_mgr.lua: `active.deadline = runtime.now() + phase_timeout`.
Expand Down Expand Up @@ -160,6 +162,13 @@ func (s *session) checkTransferTimeout(now time.Time) {
if !now.After(cur.deadline) {
return
}
if cur.idleRetries < transferIdleRetryLimit {
cur.idleRetries++
cur.deadline = now.Add(s.cfg.PhaseTimeout)
s.logKV("transfer idle retry", "offset", u32s(cur.bytesWritten))
s.sendTransferNeed(cur.meta.ID, cur.bytesWritten)
return
}
id := cur.meta.ID
s.abortTransfer("timeout")
s.sendTransferAbort(id, "timeout")
Expand Down Expand Up @@ -248,9 +257,6 @@ func (s *session) onTransferChunk(msg *protoXferChunk) {
s.logKV("xfer_chunk dropped", "id", msg.XferID)
return
}
// Lua transfer_mgr.lua aborts and clears the active transfer on any
// chunk-level fault (unexpected offset, decode failure, size mismatch).
// Match that — do not send xfer_need + keep alive.
id := cur.meta.ID
if msg.Offset != cur.bytesWritten {
s.abortTransfer("unexpected_offset")
Expand All @@ -259,8 +265,9 @@ func (s *session) onTransferChunk(msg *protoXferChunk) {
}
raw, errStr := decodeChunkData(msg.Data)
if errStr != "" {
s.abortTransfer(errStr)
s.sendTransferAbort(id, errStr)
s.logKV("xfer_chunk decode retry", "err", errStr)
s.sendTransferNeed(id, cur.bytesWritten)
cur.deadline = time.Now().Add(s.cfg.PhaseTimeout)
return
}
if len(raw) == 0 {
Expand Down Expand Up @@ -308,11 +315,11 @@ func (s *session) onTransferChunk(msg *protoXferChunk) {
_, _ = cur.hasher.Write(raw)
cur.bytesWritten += uint32(len(raw))
cur.chunksSeen++
cur.idleRetries = 0
cur.deadline = time.Now().Add(s.cfg.PhaseTimeout)
raw = nil
// Forced GC after each absorbed chunk eliminates firmware-transfer byte
// drops on the safe-window allocator. Do NOT remove this without
// reproducing the regression in firmware-mono/docs/old/FABRIC_TRANSFER_FIX.md.
// A collection point after each accepted chunk keeps UART processing inside
// the safe window described in docs/old/FABRIC_TRANSFER_FIX.md.
runtime.GC()
s.sendTransferNeed(cur.meta.ID, cur.bytesWritten)
}
Expand Down
7 changes: 4 additions & 3 deletions services/fabric/transfer_sink_rp2350.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func (s *streamedStageSink) Abort(reason string) error {
return nil
}

// Bytes returns nil because the TinyGo RP2350 default path streams directly
// into the inactive slot. fabric still calls updater/main staging; the
// updater consumes the pre-staged descriptor instead of an in-RAM artefact.
// Bytes returns nil because the TinyGo RP2350 default path verifies the signed
// container while streaming and writes only the authenticated payload into the
// inactive slot. fabric still calls updater/main staging; the updater consumes
// the verified staged descriptor instead of an in-RAM artefact.
func (s *streamedStageSink) Bytes() []byte { return nil }
37 changes: 26 additions & 11 deletions services/fabric/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,8 @@ func TestTransferReceiveSuccess(t *testing.T) {
}

func TestTransferChunkBadOffsetAborts(t *testing.T) {
// Lua transfer_mgr aborts and clears the active transfer on chunk faults
// (unexpected_offset, invalid_chunk_encoding, size_too_large). Match that — do not
// keep the transfer alive with an xfer_need.
// Unexpected offsets mean sender/receiver state has diverged; abort rather
// than asking for a resend.
b := newBus()
cm5, mcu := pipePair()
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -309,7 +308,7 @@ func TestTransferChunkBadOffsetAborts(t *testing.T) {
}
}

func TestTransferChunkDecodeFailureAborts(t *testing.T) {
func TestTransferChunkDecodeFailureRequestsSameOffset(t *testing.T) {
b := newBus()
cm5, mcu := pipePair()
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -332,12 +331,21 @@ func TestTransferChunkDecodeFailureAborts(t *testing.T) {
ChunkDigest: xxhashStr(payload),
})

abort := readMsg[protoXferAbort](t, cm5)
if abort.Err != "invalid_chunk_encoding" {
t.Fatalf("bad xfer_abort: %+v", abort)
need := readMsg[protoXferNeed](t, cm5)
if need.Type != msgXferNeed || need.XferID != "xfer-d1" || need.Next != 0 {
t.Fatalf("bad retry xfer_need: %+v", need)
}
if len(sink.abortReasons) == 0 {
t.Fatal("expected sink.Abort on decode failure")
if len(sink.abortReasons) != 0 {
t.Fatalf("sink.Abort called on recoverable decode failure: %v", sink.abortReasons)
}
if len(sink.writes) != 0 {
t.Fatalf("sink received %d writes before decode passed", len(sink.writes))
}

sendMsg(t, cm5, xferChunk("xfer-d1", 0, payload))
need = readMsg[protoXferNeed](t, cm5)
if need.Next != uint32(len(payload)) {
t.Fatalf("xfer_need.next after retry = %d, want %d", need.Next, len(payload))
}
}

Expand Down Expand Up @@ -626,8 +634,15 @@ func TestTransferIdleChunkWatchdog(t *testing.T) {
sendMsg(t, cm5, xferBegin("xfer-wd", payload, nil))
readTransferReady(t, cm5, "xfer-wd", 0)

// Stop sending chunks; watchdog should fire within ~PhaseTimeout +
// one exportTickInterval (50ms).
// Stop sending chunks. The watchdog should resend the current offset a
// bounded number of times before aborting, so a lost xfer_need does not
// strand both sides until the first idle timeout.
for i := 0; i < transferIdleRetryLimit; i++ {
need := readMsg[protoXferNeed](t, cm5)
if need.Type != msgXferNeed || need.XferID != "xfer-wd" || need.Next != 0 {
t.Fatalf("bad retry xfer_need[%d]: %+v", i, need)
}
}
abort := readMsg[protoXferAbort](t, cm5)
if abort.Type != msgXferAbort || abort.XferID != "xfer-wd" || abort.Err != "timeout" {
t.Fatalf("bad xfer_abort: %+v", abort)
Expand Down
2 changes: 1 addition & 1 deletion services/reactor/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (r *Reactor) Run(ctx context.Context) {
identity := firmwareIdentity()
updaterSvc := updater.New(updater.Options{
Conn: updaterConn,
Verifier: updater.PassthroughVerifier(identity),
Verifier: updater.SignedImageVerifier(),
Applier: updater.ProductionApplier(),
Identity: identity,
})
Expand Down
3 changes: 3 additions & 0 deletions services/updater/prestage_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
package updater

type streamedStage struct {
Version string
BuildID string
ImageID string
Length uint32
PayloadSHA256 string
}
Expand Down
70 changes: 33 additions & 37 deletions services/updater/prestage_tinygo.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,30 @@
package updater

import (
"crypto/sha256"
"encoding/hex"
"errors"

"pico2-a-b/abupdate"
"pico2-a-b/imagev1"
)

// streamedStage tracks a raw transfer that fabric has already streamed into
// the inactive A/B slot. It is the TinyGo bring-up path used before imagev1
// verification can stream directly from the transfer source.
// streamedStage tracks a signed transfer that verified successfully while
// fabric streamed it. Only the signed payload bytes are written to the inactive
// A/B slot; the container header, manifest and signature are never staged.
type streamedStage struct {
Version string
BuildID string
ImageID string
Length uint32
PayloadSHA256 string
}

var (
streamedStageDesc streamedStage
streamedStageOK bool
streamedStageHash = sha256.New()
streamedStageLen uint32
streamedVerifier *imagev1.StreamVerifier
)

// BeginStreamedStage prepares the inactive slot for a raw incoming transfer.
// BeginStreamedStage prepares verification for a signed incoming transfer.
// The caller must subsequently call WriteStreamedStage and CommitStreamedStage
// or AbortStreamedStage.
func BeginStreamedStage(size uint32) error {
Expand All @@ -35,57 +36,54 @@ func BeginStreamedStage(size uint32) error {
sharedUpdater = abupdate.Updater{}
sharedUpdaterInit = false

u, err := ensureUpdaterInited()
if err != nil {
return err
}
if rc := u.BeginUpdate(size); rc != 0 {
return errFromRC("begin_update", rc)
}
streamedStageHash.Reset()
streamedStageLen = 0
_ = size
streamedStageDesc = streamedStage{}
streamedStageOK = false
streamedVerifier = imagev1.NewStreamVerifier(SignedImagePolicy(), func(payloadLen uint32) (imagev1.PayloadSink, error) {
return newSlotSink(payloadLen)
})
return nil
}

func WriteStreamedStage(data []byte) error {
if len(data) == 0 {
return errors.New("empty_chunk")
}
u, err := ensureUpdaterInited()
if err != nil {
return err
}
if rc := u.WriteChunk(data); rc != 0 {
return errFromRC("write_chunk", rc)
if streamedVerifier == nil {
return errors.New("streamed_stage_not_started")
}
_, _ = streamedStageHash.Write(data)
streamedStageLen += uint32(len(data))
return nil
_, err := streamedVerifier.Write(data)
return err
}

func CommitStreamedStage() (uint32, error) {
u, err := ensureUpdaterInited()
if streamedVerifier == nil {
return 0, errors.New("streamed_stage_not_started")
}
res, err := streamedVerifier.Commit()
if err != nil {
streamedVerifier = nil
return 0, err
}
if rc := u.FlushFinal(); rc != 0 {
return 0, errFromRC("flush_final", rc)
}
streamedStageDesc = streamedStage{
Length: streamedStageLen,
PayloadSHA256: hex.EncodeToString(streamedStageHash.Sum(nil)),
Version: res.Version,
BuildID: res.BuildID,
ImageID: res.ImageID,
Length: res.PayloadLength,
PayloadSHA256: res.PayloadSHA256,
}
streamedStageOK = true
return u.BytesWritten(), nil
streamedVerifier = nil
return res.PayloadLength, nil
}

func AbortStreamedStage() {
if streamedVerifier != nil {
_ = streamedVerifier.Abort()
streamedVerifier = nil
}
streamedStageDesc = streamedStage{}
streamedStageOK = false
streamedStageLen = 0
streamedStageHash.Reset()
}

func consumeStreamedStage() (streamedStage, bool) {
Expand All @@ -95,7 +93,5 @@ func consumeStreamedStage() (streamedStage, bool) {
out := streamedStageDesc
streamedStageDesc = streamedStage{}
streamedStageOK = false
streamedStageLen = 0
streamedStageHash.Reset()
return out, true
}
Loading