99 "path/filepath"
1010 "strings"
1111 "sync/atomic"
12+ "time"
1213
1314 "github.com/highcard-dev/daemon/internal/core/domain"
1415 "github.com/highcard-dev/daemon/internal/utils"
@@ -38,6 +39,51 @@ func NewOciClient(credentialStore *CredentialStore) *OciClient {
3839 }
3940}
4041
42+ func formatBytes (b int64 ) string {
43+ const (
44+ kb = 1024
45+ mb = kb * 1024
46+ gb = mb * 1024
47+ )
48+ switch {
49+ case b >= gb :
50+ return fmt .Sprintf ("%.2f GB" , float64 (b )/ float64 (gb ))
51+ case b >= mb :
52+ return fmt .Sprintf ("%.2f MB" , float64 (b )/ float64 (mb ))
53+ case b >= kb :
54+ return fmt .Sprintf ("%.2f KB" , float64 (b )/ float64 (kb ))
55+ default :
56+ return fmt .Sprintf ("%d B" , b )
57+ }
58+ }
59+
60+ func startProgressTicker (direction string , bytesTransferred * atomic.Int64 , totalBytes * atomic.Int64 , layersDone * atomic.Int64 , totalLayers * atomic.Int64 ) func () {
61+ ticker := time .NewTicker (60 * time .Second )
62+ done := make (chan struct {})
63+ go func () {
64+ for {
65+ select {
66+ case <- ticker .C :
67+ transferred := bytesTransferred .Load ()
68+ total := totalBytes .Load ()
69+ layers := layersDone .Load ()
70+ layerTotal := totalLayers .Load ()
71+ logger .Log ().Info (fmt .Sprintf ("%s progress" , direction ),
72+ zap .String ("transferred" , formatBytes (transferred )),
73+ zap .String ("total" , formatBytes (total )),
74+ zap .String ("layers" , fmt .Sprintf ("%d/%d" , layers , layerTotal )),
75+ )
76+ case <- done :
77+ return
78+ }
79+ }
80+ }()
81+ return func () {
82+ ticker .Stop ()
83+ close (done )
84+ }
85+ }
86+
4187func (c * OciClient ) GetRepo (repoUrl string ) (* remote.Repository , error ) {
4288 repo , err := remote .NewRepository (repoUrl )
4389 if err != nil {
@@ -116,6 +162,12 @@ func (c *OciClient) PullSelective(dir string, artifact string, includeData bool,
116162 return fmt .Errorf ("reference (tag or digest) must be set" )
117163 }
118164
165+ logger .Log ().Info ("Starting pull from registry" ,
166+ zap .String ("repo" , repo ),
167+ zap .String ("ref" , ref ),
168+ zap .Bool ("includeData" , includeData ),
169+ )
170+
119171 ctx := context .Background ()
120172
121173 repoInstance , err := c .GetRepo (repo )
@@ -133,9 +185,10 @@ func (c *OciClient) PullSelective(dir string, artifact string, includeData bool,
133185 return err
134186 }
135187
136- // Track progress for data pulls
137188 var completed atomic.Int64
138189 var totalLayers atomic.Int64
190+ var totalPullBytes atomic.Int64
191+ var bytesDownloaded atomic.Int64
139192
140193 if progress != nil {
141194 progress .Mode .Store ("restore" )
@@ -153,8 +206,6 @@ func (c *OciClient) PullSelective(dir string, artifact string, includeData bool,
153206 if ! includeData {
154207 filtered := make ([]v1.Descriptor , 0 , len (successors ))
155208 for _ , s := range successors {
156- // Filter out data layers by checking media type.
157- // ORAS appends +gzip to the media type for directories.
158209 baseType := strings .TrimSuffix (s .MediaType , "+gzip" )
159210 if baseType == string (domain .ArtifactTypeScrollData ) {
160211 path := s .Annotations ["org.opencontainers.image.path" ]
@@ -164,42 +215,78 @@ func (c *OciClient) PullSelective(dir string, artifact string, includeData bool,
164215 filtered = append (filtered , s )
165216 }
166217 totalLayers .Store (int64 (len (filtered )))
218+ var size int64
219+ for _ , s := range filtered {
220+ size += s .Size
221+ }
222+ totalPullBytes .Store (size )
167223 return filtered , nil
168224 }
169225
170226 totalLayers .Store (int64 (len (successors )))
227+ var size int64
228+ for _ , s := range successors {
229+ size += s .Size
230+ }
231+ totalPullBytes .Store (size )
171232 return successors , nil
172233 },
234+ PreCopy : func (ctx context.Context , desc v1.Descriptor ) error {
235+ title := desc .Annotations ["org.opencontainers.image.title" ]
236+ compressed := strings .HasSuffix (desc .MediaType , "+gzip" )
237+ logger .Log ().Info ("Downloading layer" ,
238+ zap .String ("title" , title ),
239+ zap .String ("mediaType" , desc .MediaType ),
240+ zap .Int64 ("size" , desc .Size ),
241+ zap .Bool ("compressed" , compressed ),
242+ zap .String ("digest" , desc .Digest .String ()),
243+ )
244+ return nil
245+ },
173246 PostCopy : func (ctx context.Context , desc v1.Descriptor ) error {
174- if progress != nil {
175- done := completed .Add (1 )
176- total := totalLayers .Load ()
177- if total > 0 {
178- pct := done * 100 / total
179- progress .Percentage .Store (pct )
180- }
181- logger .Log ().Info ("Pulled layer" , zap .String ("digest" , desc .Digest .String ()), zap .Int64 ("completed" , done ), zap .Int64 ("total" , total ))
247+ done := completed .Add (1 )
248+ total := totalLayers .Load ()
249+ bytesDownloaded .Add (desc .Size )
250+ if progress != nil && total > 0 {
251+ pct := done * 100 / total
252+ progress .Percentage .Store (pct )
182253 }
254+ title := desc .Annotations ["org.opencontainers.image.title" ]
255+ logger .Log ().Info ("Pulled layer" ,
256+ zap .String ("title" , title ),
257+ zap .Int64 ("size" , desc .Size ),
258+ zap .String ("digest" , desc .Digest .String ()),
259+ zap .String ("progress" , fmt .Sprintf ("%d/%d" , done , total )),
260+ )
183261 return nil
184262 },
185263 OnCopySkipped : func (ctx context.Context , desc v1.Descriptor ) error {
186- if progress != nil {
187- done := completed .Add (1 )
188- total := totalLayers .Load ()
189- if total > 0 {
190- pct := done * 100 / total
191- progress .Percentage .Store (pct )
192- }
264+ done := completed .Add (1 )
265+ total := totalLayers .Load ()
266+ bytesDownloaded .Add (desc .Size )
267+ if progress != nil && total > 0 {
268+ pct := done * 100 / total
269+ progress .Percentage .Store (pct )
193270 }
271+ title := desc .Annotations ["org.opencontainers.image.title" ]
272+ logger .Log ().Info ("Layer already exists locally, skipped" ,
273+ zap .String ("title" , title ),
274+ zap .Int64 ("size" , desc .Size ),
275+ zap .String ("digest" , desc .Digest .String ()),
276+ zap .String ("progress" , fmt .Sprintf ("%d/%d" , done , total )),
277+ )
194278 return nil
195279 },
196280 },
197281 }
198282
283+ stopProgress := startProgressTicker ("Download" , & bytesDownloaded , & totalPullBytes , & completed , & totalLayers )
284+
199285 // Use a constant destination reference for the local file store so digest references
200286 // (which contain ':' and other characters) don't become a tag key.
201287 const dstRef = "root"
202288 manifestDescriptor , err := oras .Copy (ctx , repoInstance , ref , fs , dstRef , copyOpts )
289+ stopProgress ()
203290 if err != nil {
204291 if progress != nil {
205292 progress .Mode .Store ("noop" )
@@ -367,6 +454,9 @@ func (c *OciClient) Push(folder string, repo string, tag string, overrides map[s
367454 logger .Log ().Warn (fmt .Sprintf ("data chunk %s is empty, skipping" , chunk .Path ))
368455 continue
369456 }
457+
458+ logger .Log ().Info ("Packing data chunk" , zap .String ("path" , chunk .Path ), zap .Int64 ("size" , fileInfo .Size ()))
459+
370460 // Name the layer "data/<path>" so it extracts to the correct location on pull
371461 layerName := filepath .Join ("data" , chunk .Path )
372462 // Use a path relative to the file store root (folder), not the full chunkFullPath,
@@ -418,31 +508,76 @@ func (c *OciClient) Push(folder string, repo string, tag string, overrides map[s
418508 return v1.Descriptor {}, err
419509 }
420510
511+ var totalBytes atomic.Int64
512+ for _ , desc := range descriptorsForRoot {
513+ totalBytes .Add (desc .Size )
514+ }
515+ logger .Log ().Info ("Starting push to registry" ,
516+ zap .String ("repo" , repo ),
517+ zap .String ("tag" , tag ),
518+ zap .Int ("layers" , len (descriptorsForRoot )),
519+ zap .Int64 ("totalSize" , totalBytes .Load ()),
520+ )
521+
522+ var completed atomic.Int64
523+ var bytesUploaded atomic.Int64
524+ var totalLayers atomic.Int64
525+ totalLayers .Store (int64 (len (descriptorsForRoot )) + 1 ) // +1 for manifest
526+
421527 pushCopyOpts := oras.CopyOptions {
422528 CopyGraphOptions : oras.CopyGraphOptions {
423- PostCopy : func (ctx context.Context , desc v1.Descriptor ) error {
529+ PreCopy : func (ctx context.Context , desc v1.Descriptor ) error {
424530 title := desc .Annotations ["org.opencontainers.image.title" ]
425- logger .Log ().Info ("Pushed layer" ,
426- zap .String ("digest" , desc .Digest .String ()),
531+ compressed := strings .HasSuffix (desc .MediaType , "+gzip" )
532+ logger .Log ().Info ("Uploading layer" ,
533+ zap .String ("title" , title ),
427534 zap .String ("mediaType" , desc .MediaType ),
428535 zap .Int64 ("size" , desc .Size ),
536+ zap .Bool ("compressed" , compressed ),
537+ zap .String ("digest" , desc .Digest .String ()),
538+ )
539+ return nil
540+ },
541+ PostCopy : func (ctx context.Context , desc v1.Descriptor ) error {
542+ done := completed .Add (1 )
543+ bytesUploaded .Add (desc .Size )
544+ title := desc .Annotations ["org.opencontainers.image.title" ]
545+ logger .Log ().Info ("Pushed layer" ,
429546 zap .String ("title" , title ),
547+ zap .Int64 ("size" , desc .Size ),
548+ zap .String ("digest" , desc .Digest .String ()),
549+ zap .String ("progress" , fmt .Sprintf ("%d/%d" , done , totalLayers .Load ())),
430550 )
431551 return nil
432552 },
433553 OnCopySkipped : func (ctx context.Context , desc v1.Descriptor ) error {
554+ done := completed .Add (1 )
555+ bytesUploaded .Add (desc .Size )
434556 title := desc .Annotations ["org.opencontainers.image.title" ]
435- logger .Log ().Info ("Layer already exists in registry, skipping" ,
436- zap .String ("digest" , desc .Digest .String ()),
437- zap .String ("mediaType" , desc .MediaType ),
438- zap .Int64 ("size" , desc .Size ),
557+ logger .Log ().Info ("Layer already exists, skipped" ,
439558 zap .String ("title" , title ),
559+ zap .Int64 ("size" , desc .Size ),
560+ zap .String ("digest" , desc .Digest .String ()),
561+ zap .String ("progress" , fmt .Sprintf ("%d/%d" , done , totalLayers .Load ())),
440562 )
441563 return nil
442564 },
443565 },
444566 }
567+
568+ stopProgress := startProgressTicker ("Upload" , & bytesUploaded , & totalBytes , & completed , & totalLayers )
445569 _ , err = oras .Copy (ctx , fs , tag , repoInstance , tag , pushCopyOpts )
570+ stopProgress ()
571+
572+ if err != nil {
573+ return v1.Descriptor {}, err
574+ }
575+
576+ logger .Log ().Info ("Push complete" ,
577+ zap .String ("repo" , repo ),
578+ zap .String ("tag" , tag ),
579+ zap .String ("digest" , rootManifestDescriptor .Digest .String ()),
580+ )
446581
447582 return rootManifestDescriptor , err
448583}
@@ -484,31 +619,63 @@ func (c *OciClient) PushMeta(folder string, repo string) (v1.Descriptor, error)
484619 return v1.Descriptor {}, err
485620 }
486621
622+ var metaTotalBytes atomic.Int64
623+ for _ , desc := range manifestDescriptors {
624+ metaTotalBytes .Add (desc .Size )
625+ }
626+ logger .Log ().Info ("Starting meta push to registry" ,
627+ zap .String ("repo" , repo ),
628+ zap .Int ("layers" , len (manifestDescriptors )),
629+ zap .Int64 ("totalSize" , metaTotalBytes .Load ()),
630+ )
631+
632+ var metaCompleted atomic.Int64
633+ var metaBytesUploaded atomic.Int64
634+ var metaTotalLayers atomic.Int64
635+ metaTotalLayers .Store (int64 (len (manifestDescriptors )) + 1 )
636+
487637 metaCopyOpts := oras.CopyOptions {
488638 CopyGraphOptions : oras.CopyGraphOptions {
489- PostCopy : func (ctx context.Context , desc v1.Descriptor ) error {
639+ PreCopy : func (ctx context.Context , desc v1.Descriptor ) error {
490640 title := desc .Annotations ["org.opencontainers.image.title" ]
491- logger .Log ().Info ("Pushed layer" ,
492- zap .String ("digest " , desc . Digest . String () ),
641+ logger .Log ().Info ("Uploading meta layer" ,
642+ zap .String ("title " , title ),
493643 zap .String ("mediaType" , desc .MediaType ),
494644 zap .Int64 ("size" , desc .Size ),
645+ zap .String ("digest" , desc .Digest .String ()),
646+ )
647+ return nil
648+ },
649+ PostCopy : func (ctx context.Context , desc v1.Descriptor ) error {
650+ done := metaCompleted .Add (1 )
651+ metaBytesUploaded .Add (desc .Size )
652+ title := desc .Annotations ["org.opencontainers.image.title" ]
653+ logger .Log ().Info ("Pushed meta layer" ,
495654 zap .String ("title" , title ),
655+ zap .Int64 ("size" , desc .Size ),
656+ zap .String ("digest" , desc .Digest .String ()),
657+ zap .String ("progress" , fmt .Sprintf ("%d/%d" , done , metaTotalLayers .Load ())),
496658 )
497659 return nil
498660 },
499661 OnCopySkipped : func (ctx context.Context , desc v1.Descriptor ) error {
662+ done := metaCompleted .Add (1 )
663+ metaBytesUploaded .Add (desc .Size )
500664 title := desc .Annotations ["org.opencontainers.image.title" ]
501- logger .Log ().Info ("Layer already exists in registry, skipping" ,
502- zap .String ("digest" , desc .Digest .String ()),
503- zap .String ("mediaType" , desc .MediaType ),
504- zap .Int64 ("size" , desc .Size ),
665+ logger .Log ().Info ("Meta layer already exists, skipped" ,
505666 zap .String ("title" , title ),
667+ zap .Int64 ("size" , desc .Size ),
668+ zap .String ("digest" , desc .Digest .String ()),
669+ zap .String ("progress" , fmt .Sprintf ("%d/%d" , done , metaTotalLayers .Load ())),
506670 )
507671 return nil
508672 },
509673 },
510674 }
675+
676+ stopMetaProgress := startProgressTicker ("Meta upload" , & metaBytesUploaded , & metaTotalBytes , & metaCompleted , & metaTotalLayers )
511677 _ , err = oras .Copy (ctx , fs , tag , repoInstance , tag , metaCopyOpts )
678+ stopMetaProgress ()
512679
513680 return rootManifestDescriptor , err
514681}
0 commit comments