Skip to content

Commit 193f692

Browse files
bdchathamclaude
andauthored
refactor: environment-driven snapshot upload (#53)
## Summary Move snapshot upload S3 coordinates from task params to sidecar environment, matching the snapshot restore and genesis conventions. - `SnapshotUploadRequest`: remove Bucket/Prefix/Region (now empty) - `NewSnapshotUploader`: takes bucket/region/chainID from env at construction - Upload prefix derived as `{chainID}/` from `SEI_CHAIN_ID` - Client `SnapshotUploadTask` stripped to empty struct ## Companion PR - sei-k8s-controller: pending ## Test plan - [x] Build clean - [x] Client tests pass - [x] Full sidecar test suite (CI) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4fb3316 commit 193f692

7 files changed

Lines changed: 48 additions & 126 deletions

File tree

serve.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ var serveCmd = cli.Command{
9090
engine.TaskMarkReady: tasks.MarkReadyHandler(),
9191
engine.TaskConfigureGenesis: tasks.NewGenesisFetcher(homeDir, chainID, genesisBucket, genesisRegion, nil).Handler(),
9292
engine.TaskConfigureStateSync: tasks.NewStateSyncConfigurer(homeDir, nil).Handler(),
93-
engine.TaskSnapshotUpload: tasks.NewSnapshotUploader(homeDir, snapshotUploadInterval, nil).Handler(),
93+
engine.TaskSnapshotUpload: tasks.NewSnapshotUploader(homeDir, snapshotBucket, snapshotRegion, chainID, snapshotUploadInterval, nil).Handler(),
9494
engine.TaskResultExport: tasks.NewResultExporter(homeDir, nil).Handler(),
9595
engine.TaskAwaitCondition: tasks.NewConditionWaiter(nil).Handler(),
9696
engine.TaskGenerateIdentity: tasks.NewIdentityGenerator(homeDir).Handler(),

sidecar/client/client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,8 @@ func TestSubmitTask_ValidationFailure(t *testing.T) {
203203
t.Fatal("server should not be called when validation fails")
204204
}))
205205

206-
// SnapshotUploadTask requires Bucket — empty should fail validation.
207-
_, err := c.SubmitSnapshotUploadTask(context.Background(), SnapshotUploadTask{})
206+
// DiscoverPeersTask requires at least one source — empty should fail validation.
207+
_, err := c.SubmitDiscoverPeersTask(context.Background(), DiscoverPeersTask{})
208208
if err == nil {
209209
t.Fatal("expected validation error")
210210
}

sidecar/client/tasks.go

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -98,47 +98,25 @@ func SnapshotRestoreTaskFromParams(params map[string]interface{}) SnapshotRestor
9898
}
9999

100100
// SnapshotUploadTask archives and streams a local snapshot to S3.
101+
// S3 coordinates are derived by the sidecar from its environment.
101102
type SnapshotUploadTask struct {
102103
TaskMeta
103-
Bucket string
104-
Prefix string
105-
Region string
106104
}
107105

108106
func (t SnapshotUploadTask) TaskType() string { return TaskTypeSnapshotUpload }
109107

110-
func (t SnapshotUploadTask) Validate() error {
111-
if t.Bucket == "" {
112-
return fmt.Errorf("snapshot-upload: missing required field Bucket")
113-
}
114-
if t.Region == "" {
115-
return fmt.Errorf("snapshot-upload: missing required field Region")
116-
}
117-
return nil
118-
}
108+
func (t SnapshotUploadTask) Validate() error { return nil }
119109

120110
func (t SnapshotUploadTask) ToTaskRequest() TaskRequest {
121-
p := map[string]interface{}{
122-
"bucket": t.Bucket,
123-
"region": t.Region,
124-
}
125-
if t.Prefix != "" {
126-
p["prefix"] = t.Prefix
127-
}
128-
req := TaskRequest{Type: t.TaskType(), Params: &p}
111+
req := TaskRequest{Type: t.TaskType()}
129112
t.applyMeta(&req)
130113
return req
131114
}
132115

133116
// SnapshotUploadTaskFromParams reconstructs a SnapshotUploadTask from
134117
// a generic params map.
135-
func SnapshotUploadTaskFromParams(params map[string]interface{}) SnapshotUploadTask {
136-
s := func(k string) string { v, _ := params[k].(string); return v }
137-
return SnapshotUploadTask{
138-
Bucket: s("bucket"),
139-
Prefix: s("prefix"),
140-
Region: s("region"),
141-
}
118+
func SnapshotUploadTaskFromParams(_ map[string]interface{}) SnapshotUploadTask {
119+
return SnapshotUploadTask{}
142120
}
143121

144122
// ConfigureGenesisTask instructs the sidecar to resolve and write genesis.json.

sidecar/client/tasks_test.go

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,7 @@ func genSnapshotRestoreTask() gopter.Gen {
2020
}
2121

2222
func genSnapshotUploadTask() gopter.Gen {
23-
return gopter.CombineGens(
24-
genNonEmptyString(),
25-
genNonEmptyString(),
26-
genNonEmptyString(),
27-
).Map(func(v []interface{}) SnapshotUploadTask {
28-
return SnapshotUploadTask{
29-
Bucket: v[0].(string),
30-
Prefix: v[1].(string),
31-
Region: v[2].(string),
32-
}
33-
})
23+
return gen.Const(SnapshotUploadTask{})
3424
}
3525

3626
func genConfigureGenesisTask() gopter.Gen {
@@ -120,13 +110,7 @@ func TestSnapshotUploadRoundTrip(t *testing.T) {
120110
return false
121111
}
122112
req := task.ToTaskRequest()
123-
if req.Type != TaskTypeSnapshotUpload {
124-
return false
125-
}
126-
rebuilt := SnapshotUploadTaskFromParams(*req.Params)
127-
return rebuilt.Bucket == task.Bucket &&
128-
rebuilt.Prefix == task.Prefix &&
129-
rebuilt.Region == task.Region
113+
return req.Type == TaskTypeSnapshotUpload && req.Params == nil
130114
},
131115
genSnapshotUploadTask(),
132116
))
@@ -278,27 +262,9 @@ func TestSnapshotRestoreValidation(t *testing.T) {
278262
}
279263

280264
func TestSnapshotUploadValidation(t *testing.T) {
281-
cases := []struct {
282-
name string
283-
task SnapshotUploadTask
284-
ok bool
285-
}{
286-
{"valid", SnapshotUploadTask{Bucket: "b", Region: "r"}, true},
287-
{"valid with prefix", SnapshotUploadTask{Bucket: "b", Prefix: "p", Region: "r"}, true},
288-
{"missing bucket", SnapshotUploadTask{Region: "r"}, false},
289-
{"missing region", SnapshotUploadTask{Bucket: "b"}, false},
290-
{"all empty", SnapshotUploadTask{}, false},
291-
}
292-
for _, tc := range cases {
293-
t.Run(tc.name, func(t *testing.T) {
294-
err := tc.task.Validate()
295-
if tc.ok && err != nil {
296-
t.Errorf("unexpected error: %v", err)
297-
}
298-
if !tc.ok && err == nil {
299-
t.Error("expected validation error, got nil")
300-
}
301-
})
265+
task := SnapshotUploadTask{}
266+
if err := task.Validate(); err != nil {
267+
t.Errorf("unexpected error: %v", err)
302268
}
303269
}
304270

sidecar/tasks/snapshot_upload.go

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,8 @@ const (
3030
)
3131

3232
// SnapshotUploadRequest holds the parameters for the snapshot upload task.
33-
type SnapshotUploadRequest struct {
34-
Bucket string `json:"bucket"`
35-
Prefix string `json:"prefix"`
36-
Region string `json:"region"`
37-
}
33+
// S3 bucket, region, and prefix are derived from the sidecar's environment.
34+
type SnapshotUploadRequest struct{}
3835

3936
// uploadState tracks the last successfully uploaded snapshot height.
4037
type uploadState struct {
@@ -46,12 +43,16 @@ type uploadState struct {
4643
// at the configured interval until the context is cancelled.
4744
type SnapshotUploader struct {
4845
homeDir string
46+
bucket string
47+
region string
48+
chainID string
4949
uploadInterval time.Duration
5050
s3UploaderFactory seis3.UploaderFactory
5151
}
5252

5353
// NewSnapshotUploader creates an uploader targeting the given home directory.
54-
func NewSnapshotUploader(homeDir string, uploadInterval time.Duration, factory seis3.UploaderFactory) *SnapshotUploader {
54+
// Bucket, region, and chainID are read from environment at construction time.
55+
func NewSnapshotUploader(homeDir, bucket, region, chainID string, uploadInterval time.Duration, factory seis3.UploaderFactory) *SnapshotUploader {
5556
if factory == nil {
5657
factory = seis3.DefaultUploaderFactory
5758
}
@@ -60,6 +61,9 @@ func NewSnapshotUploader(homeDir string, uploadInterval time.Duration, factory s
6061
}
6162
return &SnapshotUploader{
6263
homeDir: homeDir,
64+
bucket: bucket,
65+
region: region,
66+
chainID: chainID,
6367
uploadInterval: uploadInterval,
6468
s3UploaderFactory: factory,
6569
}
@@ -70,21 +74,15 @@ func NewSnapshotUploader(homeDir string, uploadInterval time.Duration, factory s
7074
// sleeping for the configured interval between attempts. It stays
7175
// running until the context is cancelled.
7276
func (u *SnapshotUploader) Handler() engine.TaskHandler {
73-
return engine.TypedHandler(func(ctx context.Context, cfg SnapshotUploadRequest) error {
74-
if cfg.Bucket == "" {
75-
return fmt.Errorf("snapshot-upload: missing required param 'bucket'")
76-
}
77-
if cfg.Region == "" {
78-
return fmt.Errorf("snapshot-upload: missing required param 'region'")
79-
}
80-
return u.runLoop(ctx, cfg)
77+
return engine.TypedHandler(func(ctx context.Context, _ SnapshotUploadRequest) error {
78+
return u.runLoop(ctx)
8179
})
8280
}
8381

84-
func (u *SnapshotUploader) runLoop(ctx context.Context, cfg SnapshotUploadRequest) error {
85-
uploadLog.Info("starting snapshot upload loop", "interval", u.uploadInterval, "bucket", cfg.Bucket)
82+
func (u *SnapshotUploader) runLoop(ctx context.Context) error {
83+
uploadLog.Info("starting snapshot upload loop", "interval", u.uploadInterval, "bucket", u.bucket)
8684
for {
87-
if err := u.Upload(ctx, cfg); err != nil {
85+
if err := u.Upload(ctx); err != nil {
8886
uploadLog.Warn("upload attempt failed, will retry next interval", "error", err)
8987
}
9088

@@ -103,7 +101,7 @@ func (u *SnapshotUploader) runLoop(ctx context.Context, cfg SnapshotUploadReques
103101
//
104102
// The archive is streamed through an io.Pipe so it never needs to be buffered
105103
// entirely in memory; the transfermanager handles multipart upload automatically.
106-
func (u *SnapshotUploader) Upload(ctx context.Context, cfg SnapshotUploadRequest) error {
104+
func (u *SnapshotUploader) Upload(ctx context.Context) error {
107105
snapshotsDir := filepath.Join(u.homeDir, "data", "snapshots")
108106

109107
height, err := pickUploadCandidate(snapshotsDir)
@@ -121,25 +119,25 @@ func (u *SnapshotUploader) Upload(ctx context.Context, cfg SnapshotUploadRequest
121119
return nil
122120
}
123121

124-
uploadLog.Info("uploading snapshot", "height", height, "bucket", cfg.Bucket, "region", cfg.Region)
122+
uploadLog.Info("uploading snapshot", "height", height, "bucket", u.bucket, "region", u.region)
125123

126-
uploader, err := u.s3UploaderFactory(ctx, cfg.Region)
124+
uploader, err := u.s3UploaderFactory(ctx, u.region)
127125
if err != nil {
128126
return fmt.Errorf("building S3 uploader: %w", err)
129127
}
130128

131-
prefix := normalizePrefix(cfg.Prefix)
129+
prefix := u.chainID + "/"
132130

133131
archiveKey := fmt.Sprintf("%s%d.tar.gz", prefix, height)
134132
uploadLog.Info("streaming archive to S3", "key", archiveKey)
135-
if err := u.streamUpload(ctx, uploader, cfg.Bucket, archiveKey, snapshotsDir, height); err != nil {
133+
if err := u.streamUpload(ctx, uploader, u.bucket, archiveKey, snapshotsDir, height); err != nil {
136134
return fmt.Errorf("uploading %s: %w", archiveKey, err)
137135
}
138136

139137
latestKey := prefix + "latest.txt"
140138
latestBody := []byte(strconv.FormatInt(height, 10))
141139
_, err = uploader.UploadObject(ctx, &transfermanager.UploadObjectInput{
142-
Bucket: aws.String(cfg.Bucket),
140+
Bucket: aws.String(u.bucket),
143141
Key: aws.String(latestKey),
144142
Body: bytes.NewReader(latestBody),
145143
})

sidecar/tasks/snapshot_upload_test.go

Lines changed: 13 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -99,22 +99,18 @@ func TestUpload_UploadsArchiveAndLatestTxt(t *testing.T) {
9999
setupSnapshotDirs(t, homeDir, []int64{1000, 2000})
100100

101101
mock := newMockS3Uploader()
102-
uploader := NewSnapshotUploader(homeDir, 0, mockUploaderFactory(mock))
102+
uploader := NewSnapshotUploader(homeDir, "my-bucket", "eu-central-1", "testchain", 0, mockUploaderFactory(mock))
103103

104-
err := uploader.Upload(context.Background(), SnapshotUploadRequest{
105-
Bucket: "my-bucket",
106-
Prefix: "state-sync",
107-
Region: "eu-central-1",
108-
})
104+
err := uploader.Upload(context.Background())
109105
if err != nil {
110106
t.Fatalf("Upload() error = %v", err)
111107
}
112108

113-
if _, ok := mock.uploads["my-bucket/state-sync/1000.tar.gz"]; !ok {
109+
if _, ok := mock.uploads["my-bucket/testchain/1000.tar.gz"]; !ok {
114110
t.Error("expected archive upload at state-sync/1000.tar.gz")
115111
}
116112

117-
latest, ok := mock.uploads["my-bucket/state-sync/latest.txt"]
113+
latest, ok := mock.uploads["my-bucket/testchain/latest.txt"]
118114
if !ok {
119115
t.Fatal("expected latest.txt upload")
120116
}
@@ -132,13 +128,9 @@ func TestUpload_SkipsWhenAlreadyUploaded(t *testing.T) {
132128
_ = os.WriteFile(filepath.Join(homeDir, uploadStateFile), data, 0o644)
133129

134130
mock := newMockS3Uploader()
135-
uploader := NewSnapshotUploader(homeDir, 0, mockUploaderFactory(mock))
131+
uploader := NewSnapshotUploader(homeDir, "my-bucket", "eu-central-1", "testchain", 0, mockUploaderFactory(mock))
136132

137-
err := uploader.Upload(context.Background(), SnapshotUploadRequest{
138-
Bucket: "my-bucket",
139-
Prefix: "state-sync",
140-
Region: "eu-central-1",
141-
})
133+
err := uploader.Upload(context.Background())
142134
if err != nil {
143135
t.Fatalf("Upload() error = %v", err)
144136
}
@@ -157,18 +149,14 @@ func TestUpload_UploadsNewerSnapshot(t *testing.T) {
157149
_ = os.WriteFile(filepath.Join(homeDir, uploadStateFile), data, 0o644)
158150

159151
mock := newMockS3Uploader()
160-
uploader := NewSnapshotUploader(homeDir, 0, mockUploaderFactory(mock))
152+
uploader := NewSnapshotUploader(homeDir, "my-bucket", "eu-central-1", "testchain", 0, mockUploaderFactory(mock))
161153

162-
err := uploader.Upload(context.Background(), SnapshotUploadRequest{
163-
Bucket: "my-bucket",
164-
Prefix: "state-sync",
165-
Region: "eu-central-1",
166-
})
154+
err := uploader.Upload(context.Background())
167155
if err != nil {
168156
t.Fatalf("Upload() error = %v", err)
169157
}
170158

171-
if _, ok := mock.uploads["my-bucket/state-sync/2000.tar.gz"]; !ok {
159+
if _, ok := mock.uploads["my-bucket/testchain/2000.tar.gz"]; !ok {
172160
t.Error("expected archive upload at state-sync/2000.tar.gz")
173161
}
174162
}
@@ -178,13 +166,9 @@ func TestUpload_NoOpsWhenTooFewSnapshots(t *testing.T) {
178166
setupSnapshotDirs(t, homeDir, []int64{1000})
179167

180168
mock := newMockS3Uploader()
181-
uploader := NewSnapshotUploader(homeDir, 0, mockUploaderFactory(mock))
169+
uploader := NewSnapshotUploader(homeDir, "my-bucket", "eu-central-1", "testchain", 0, mockUploaderFactory(mock))
182170

183-
err := uploader.Upload(context.Background(), SnapshotUploadRequest{
184-
Bucket: "my-bucket",
185-
Prefix: "state-sync",
186-
Region: "eu-central-1",
187-
})
171+
err := uploader.Upload(context.Background())
188172
if err != nil {
189173
t.Fatalf("Upload() error = %v", err)
190174
}
@@ -199,13 +183,9 @@ func TestUpload_WritesUploadState(t *testing.T) {
199183
setupSnapshotDirs(t, homeDir, []int64{1000, 2000})
200184

201185
mock := newMockS3Uploader()
202-
uploader := NewSnapshotUploader(homeDir, 0, mockUploaderFactory(mock))
186+
uploader := NewSnapshotUploader(homeDir, "my-bucket", "eu-central-1", "testchain", 0, mockUploaderFactory(mock))
203187

204-
err := uploader.Upload(context.Background(), SnapshotUploadRequest{
205-
Bucket: "my-bucket",
206-
Prefix: "state-sync",
207-
Region: "eu-central-1",
208-
})
188+
err := uploader.Upload(context.Background())
209189
if err != nil {
210190
t.Fatalf("Upload() error = %v", err)
211191
}

sidecar/tasks/typed_handler_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func TestDeserialize_ConfigReload(t *testing.T) {
204204
// TestDeserialize_SnapshotUpload verifies that the snapshot-upload handler
205205
// correctly deserializes simple string fields from the wire format.
206206
func TestDeserialize_SnapshotUpload(t *testing.T) {
207-
handler := NewSnapshotUploader(t.TempDir(), 0, nil).Handler()
207+
handler := NewSnapshotUploader(t.TempDir(), "b", "r", "c", 0, nil).Handler()
208208

209209
params := map[string]any{
210210
"bucket": "",

0 commit comments

Comments
 (0)