Skip to content

Commit 01d1a25

Browse files
committed
chore: better progress bar for for push, pull
1 parent d4d5917 commit 01d1a25

1 file changed

Lines changed: 28 additions & 6 deletions

File tree

  • internal/core/services/registry

internal/core/services/registry/oci.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"io"
89
"net/http"
910
"os"
1011
"path/filepath"
@@ -60,6 +61,19 @@ func formatBytes(b int64) string {
6061
}
6162
}
6263

64+
type progressReader struct {
65+
inner io.Reader
66+
transferred *atomic.Int64
67+
}
68+
69+
func (r *progressReader) Read(p []byte) (int, error) {
70+
n, err := r.inner.Read(p)
71+
if n > 0 {
72+
r.transferred.Add(int64(n))
73+
}
74+
return n, err
75+
}
76+
6377
func startProgressTicker(direction string, bytesTransferred *atomic.Int64, totalBytes *atomic.Int64, layersDone *atomic.Int64, totalLayers *atomic.Int64) func() {
6478
ticker := time.NewTicker(60 * time.Second)
6579
done := make(chan struct{})
@@ -418,13 +432,20 @@ func (c *OciClient) createMetaDescriptors(fs *file.Store, folder string, fsPath
418432
}
419433

420434
// pushBlobWithRetry pushes a single blob from src to dst, retrying up to
421-
// maxRetries times on transient failures. Returns (true, nil) when the blob
422-
// already exists in the destination and was skipped.
423-
func pushBlobWithRetry(ctx context.Context, src *file.Store, dst *remote.Repository, desc v1.Descriptor, maxRetries int) (bool, error) {
435+
// maxRetries times on transient failures. bytesUploaded is updated in
436+
// real-time as data is streamed. Returns (true, nil) when the blob already
437+
// exists in the destination and was skipped.
438+
func pushBlobWithRetry(ctx context.Context, src *file.Store, dst *remote.Repository, desc v1.Descriptor, maxRetries int, bytesUploaded *atomic.Int64) (bool, error) {
439+
baseline := bytesUploaded.Load()
424440
var lastErr error
425441
for attempt := 0; attempt <= maxRetries; attempt++ {
442+
if attempt > 0 {
443+
bytesUploaded.Store(baseline)
444+
}
445+
426446
exists, err := dst.Exists(ctx, desc)
427447
if err == nil && exists {
448+
bytesUploaded.Store(baseline + desc.Size)
428449
return true, nil
429450
}
430451

@@ -433,13 +454,15 @@ func pushBlobWithRetry(ctx context.Context, src *file.Store, dst *remote.Reposit
433454
return false, fmt.Errorf("failed to read local blob: %w", err)
434455
}
435456

436-
err = dst.Push(ctx, desc, rc)
457+
pr := &progressReader{inner: rc, transferred: bytesUploaded}
458+
err = dst.Push(ctx, desc, pr)
437459
rc.Close()
438460
if err == nil {
439461
return false, nil
440462
}
441463

442464
if errors.Is(err, errdef.ErrAlreadyExists) {
465+
bytesUploaded.Store(baseline + desc.Size)
443466
return true, nil
444467
}
445468

@@ -491,14 +514,13 @@ func copyToRegistry(ctx context.Context, fs *file.Store, repo *remote.Repository
491514
zap.String("digest", desc.Digest.String()),
492515
)
493516

494-
skipped, err := pushBlobWithRetry(ctx, fs, repo, desc, maxRetries)
517+
skipped, err := pushBlobWithRetry(ctx, fs, repo, desc, maxRetries, &bytesUploaded)
495518
if err != nil {
496519
stopProgress()
497520
return fmt.Errorf("failed to push layer %s: %w", title, err)
498521
}
499522

500523
done := completed.Add(1)
501-
bytesUploaded.Add(desc.Size)
502524
total := totalLayers.Load()
503525

504526
if skipped {

0 commit comments

Comments
 (0)