Skip to content

Commit 7ae83d8

Browse files
committed
refactor: move progress tracking utilities to a new utils package
- Removed redundant progress tracking code from oci.go. - Introduced NewCountingReader and StartTransferProgressTicker functions in transfer_progress.go for better modularity and reusability. - Updated oci.go to utilize the new utilities for tracking download and upload progress.
1 parent 389f5e8 commit 7ae83d8

2 files changed

Lines changed: 77 additions & 62 deletions

File tree

internal/core/services/registry/oci.go

Lines changed: 3 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8-
"io"
98
"net/http"
109
"os"
1110
"path/filepath"
@@ -43,64 +42,6 @@ func NewOciClient(credentialStore *CredentialStore) *OciClient {
4342
}
4443
}
4544

46-
func formatBytes(b int64) string {
47-
const (
48-
kb = 1024
49-
mb = kb * 1024
50-
gb = mb * 1024
51-
)
52-
switch {
53-
case b >= gb:
54-
return fmt.Sprintf("%.2f GB", float64(b)/float64(gb))
55-
case b >= mb:
56-
return fmt.Sprintf("%.2f MB", float64(b)/float64(mb))
57-
case b >= kb:
58-
return fmt.Sprintf("%.2f KB", float64(b)/float64(kb))
59-
default:
60-
return fmt.Sprintf("%d B", b)
61-
}
62-
}
63-
64-
type progressReader struct {
65-
inner io.Reader
66-
transferred *atomic.Int64
67-
}
68-
69-
func (r *progressReader) Read(p []byte) (int, error) {
70-
n, err := r.inner.Read(p)
71-
if n > 0 {
72-
r.transferred.Add(int64(n))
73-
}
74-
return n, err
75-
}
76-
77-
func startProgressTicker(direction string, bytesTransferred *atomic.Int64, totalBytes *atomic.Int64, layersDone *atomic.Int64, totalLayers *atomic.Int64) func() {
78-
ticker := time.NewTicker(60 * time.Second)
79-
done := make(chan struct{})
80-
go func() {
81-
for {
82-
select {
83-
case <-ticker.C:
84-
transferred := bytesTransferred.Load()
85-
total := totalBytes.Load()
86-
layers := layersDone.Load()
87-
layerTotal := totalLayers.Load()
88-
logger.Log().Info(fmt.Sprintf("%s progress", direction),
89-
zap.String("transferred", formatBytes(transferred)),
90-
zap.String("total", formatBytes(total)),
91-
zap.String("layers", fmt.Sprintf("%d/%d", layers, layerTotal)),
92-
)
93-
case <-done:
94-
return
95-
}
96-
}
97-
}()
98-
return func() {
99-
ticker.Stop()
100-
close(done)
101-
}
102-
}
103-
10445
func (c *OciClient) GetRepo(repoUrl string) (*remote.Repository, error) {
10546
repo, err := remote.NewRepository(repoUrl)
10647
if err != nil {
@@ -316,7 +257,7 @@ func (c *OciClient) PullSelective(dir string, artifact string, includeData bool,
316257
},
317258
}
318259

319-
stopProgress := startProgressTicker("Download", &bytesDownloaded, &totalPullBytes, &completed, &totalLayers)
260+
stopProgress := utils.StartTransferProgressTicker("Download", &bytesDownloaded, &totalPullBytes, &completed, &totalLayers)
320261

321262
// Use a constant destination reference for the local file store so digest references
322263
// (which contain ':' and other characters) don't become a tag key.
@@ -454,7 +395,7 @@ func pushBlobWithRetry(ctx context.Context, src *file.Store, dst *remote.Reposit
454395
return false, fmt.Errorf("failed to read local blob: %w", err)
455396
}
456397

457-
pr := &progressReader{inner: rc, transferred: bytesUploaded}
398+
pr := utils.NewCountingReader(rc, bytesUploaded)
458399
err = dst.Push(ctx, desc, pr)
459400
rc.Close()
460401
if err == nil {
@@ -503,7 +444,7 @@ func copyToRegistry(ctx context.Context, fs *file.Store, repo *remote.Repository
503444
var totalLayers atomic.Int64
504445
totalLayers.Store(int64(len(layers)))
505446

506-
stopProgress := startProgressTicker("Upload", &bytesUploaded, &totalBytes, &completed, &totalLayers)
447+
stopProgress := utils.StartTransferProgressTicker("Upload", &bytesUploaded, &totalBytes, &completed, &totalLayers)
507448

508449
for _, desc := range layers {
509450
title := desc.Annotations["org.opencontainers.image.title"]
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package utils
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"sync/atomic"
7+
"time"
8+
9+
"github.com/highcard-dev/daemon/internal/utils/logger"
10+
"go.uber.org/zap"
11+
)
12+
13+
// countingReader wraps r and adds every successful Read size to transferred.
14+
type countingReader struct {
15+
inner io.Reader
16+
transferred *atomic.Int64
17+
}
18+
19+
// NewCountingReader returns an io.Reader that counts bytes read into transferred.
20+
func NewCountingReader(r io.Reader, transferred *atomic.Int64) io.Reader {
21+
return &countingReader{inner: r, transferred: transferred}
22+
}
23+
24+
func (r *countingReader) Read(p []byte) (int, error) {
25+
n, err := r.inner.Read(p)
26+
if n > 0 {
27+
r.transferred.Add(int64(n))
28+
}
29+
return n, err
30+
}
31+
32+
// StartTransferProgressTicker logs periodic progress and a final "done" line
33+
// (transferred, total, layers, average speed, elapsed on done).
34+
// Call the returned function when the transfer finishes.
35+
func StartTransferProgressTicker(direction string, bytesTransferred, totalBytes, layersDone, totalLayers *atomic.Int64) func() {
36+
ticker := time.NewTicker(60 * time.Second)
37+
done := make(chan struct{})
38+
start := time.Now()
39+
40+
report := func(event string, withElapsed bool) {
41+
n := bytesTransferred.Load()
42+
sec := time.Since(start).Seconds()
43+
speed := ""
44+
if sec > 0 {
45+
speed = HumanizeBytes(int64(float64(n)/sec)) + "/s"
46+
}
47+
fields := []zap.Field{
48+
zap.String("transferred", HumanizeBytes(n)),
49+
zap.String("total", HumanizeBytes(totalBytes.Load())),
50+
zap.String("layers", fmt.Sprintf("%d/%d", layersDone.Load(), totalLayers.Load())),
51+
zap.String("speed", speed),
52+
}
53+
if withElapsed {
54+
fields = append(fields, zap.Duration("elapsed", time.Since(start)))
55+
}
56+
logger.Log().Info(direction+" "+event, fields...)
57+
}
58+
59+
go func() {
60+
for {
61+
select {
62+
case <-ticker.C:
63+
report("progress", false)
64+
case <-done:
65+
return
66+
}
67+
}
68+
}()
69+
return func() {
70+
ticker.Stop()
71+
close(done)
72+
report("done", true)
73+
}
74+
}

0 commit comments

Comments
 (0)