Skip to content

Commit 127c1cd

Browse files
committed
feat: use different state transitions for podman
- podman has only pulling fs layer -> downloading layer -> layer download complete - podman has only layer events, no pull events - map layer download complete to layer pull complete - send synthetic pull complete event when pull reader is eof and no errors were received
1 parent 6beb649 commit 127c1cd

7 files changed

Lines changed: 195 additions & 63 deletions

File tree

examples/state_test/main.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/distribution/reference"
8+
"github.com/docker/docker/api/types/image"
9+
"github.com/silenium-dev/docker-wrapper/pkg/client"
10+
"github.com/silenium-dev/docker-wrapper/pkg/client/pull/events"
11+
)
12+
13+
func main() {
14+
ctx, cancel := context.WithCancel(context.Background())
15+
defer cancel()
16+
17+
cli, _ := client.NewWithOpts(client.FromEnv, client.WithVersionNegotiation)
18+
ref, _ := reference.ParseNormalizedNamed("localstack/localstack:3")
19+
id, manifest, eventChan, err := cli.ImagePullWithEvents(ctx, ref, image.PullOptions{})
20+
if err != nil {
21+
panic(err)
22+
}
23+
fmt.Println("Pulling:", id.String(), manifest.MediaType)
24+
layerTracker := map[string][]events.PullEvent{}
25+
var tracker []events.PullEvent
26+
for event := range eventChan {
27+
if le, ok := event.(events.LayerEvent); ok {
28+
layerTracker[le.LayerId()] = append(layerTracker[le.LayerId()], event)
29+
} else {
30+
tracker = append(tracker, event)
31+
}
32+
}
33+
fmt.Println("\nPull Events:")
34+
for _, pullEvent := range tracker {
35+
fmt.Printf(" %s\n", pullEvent.String())
36+
}
37+
fmt.Println("\nLayer Events:")
38+
for _, layer := range manifest.Layers {
39+
fmt.Printf("Layer %s:\n", layer.Digest.String())
40+
layerEvents := layerTracker[layer.Digest.Hex[:12]]
41+
for _, event := range layerEvents {
42+
fmt.Printf(" %s\n", event.String())
43+
}
44+
}
45+
}

pkg/client/pull.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package client
33
import (
44
"context"
55
"fmt"
6+
67
"github.com/containerd/platforms"
78
"github.com/distribution/reference"
89
"github.com/docker/docker/api/types/image"
@@ -18,7 +19,9 @@ import (
1819
"go.uber.org/zap"
1920
)
2021

21-
func (c *Client) ImagePullWithEvents(ctx context.Context, ref reference.Named, options image.PullOptions) (v1.Hash, *v1.Manifest, chan events.PullEvent, error) {
22+
func (c *Client) ImagePullWithEvents(ctx context.Context, ref reference.Named, options image.PullOptions) (
23+
v1.Hash, *v1.Manifest, chan events.PullEvent, error,
24+
) {
2225
if options.RegistryAuth != "" || options.PrivilegeFunc != nil {
2326
c.logger.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).Warnf("privilege function and registry auth in options are not supported, please use auth provider instead")
2427
options.RegistryAuth = ""
@@ -48,13 +51,20 @@ func (c *Client) ImagePullWithEvents(ctx context.Context, ref reference.Named, o
4851
return imageId, manifest, pull.ParseStream(ctx, reader), nil
4952
}
5053

51-
func (c *Client) ImagePullWithState(ctx context.Context, ref reference.Named, options image.PullOptions) (v1.Hash, *v1.Manifest, chan state.Pull, error) {
54+
func (c *Client) ImagePullWithState(ctx context.Context, ref reference.Named, options image.PullOptions) (
55+
v1.Hash, *v1.Manifest, chan state.Pull, error,
56+
) {
57+
isPodman, err := c.IsPodman(ctx)
58+
if err != nil {
59+
return v1.Hash{}, nil, nil, err
60+
}
61+
5262
id, manifest, eventChan, err := c.ImagePullWithEvents(ctx, ref, options)
5363
if err != nil {
5464
return v1.Hash{}, nil, nil, err
5565
}
5666

57-
return id, manifest, pull.StateFromStream(ctx, ref, eventChan, manifest, id), nil
67+
return id, manifest, pull.StateFromStream(ctx, ref, isPodman, eventChan, manifest, id), nil
5868
}
5969

6070
func (c *Client) ImagePull(ctx context.Context, ref reference.Named, options image.PullOptions) (digest.Digest, error) {
@@ -69,7 +79,9 @@ func (c *Client) ImagePull(ctx context.Context, ref reference.Named, options ima
6979
return digest.Digest(dig.String()), nil
7080
}
7181

72-
func (c *Client) ImageGetManifest(ctx context.Context, ref reference.Named, platform *v1.Platform) (v1.Hash, *v1.Manifest, error) {
82+
func (c *Client) ImageGetManifest(ctx context.Context, ref reference.Named, platform *v1.Platform) (
83+
v1.Hash, *v1.Manifest, error,
84+
) {
7385
var err error
7486
if platform == nil {
7587
platform, err = c.ImageDefaultPlatform(ctx)
@@ -109,10 +121,12 @@ func (c *Client) ImageDefaultPlatform(ctx context.Context) (*v1.Platform, error)
109121
if err != nil {
110122
return nil, err
111123
}
112-
normalized := platforms.Normalize(v2.Platform{
113-
OS: info.OSType,
114-
Architecture: info.Architecture,
115-
})
124+
normalized := platforms.Normalize(
125+
v2.Platform{
126+
OS: info.OSType,
127+
Architecture: info.Architecture,
128+
},
129+
)
116130

117131
return &v1.Platform{
118132
OS: normalized.OS,
@@ -123,7 +137,9 @@ func (c *Client) ImageDefaultPlatform(ctx context.Context) (*v1.Platform, error)
123137
}, nil
124138
}
125139

126-
func (c *Client) getManifest(ctx context.Context, ref reference.Named, options image.PullOptions) (v1.Hash, *v1.Manifest, error) {
140+
func (c *Client) getManifest(ctx context.Context, ref reference.Named, options image.PullOptions) (
141+
v1.Hash, *v1.Manifest, error,
142+
) {
127143
var platform *v1.Platform
128144
var err error
129145
if options.Platform != "" {

pkg/client/pull/events/event.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package events
22

33
import (
44
"fmt"
5-
"github.com/opencontainers/go-digest"
6-
"github.com/silenium-dev/docker-wrapper/pkg/client/pull/base"
75
"strings"
86
"time"
7+
8+
"github.com/opencontainers/go-digest"
9+
"github.com/silenium-dev/docker-wrapper/pkg/client/pull/base"
910
)
1011

1112
type PullEvent interface {

pkg/client/pull/state.go

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,44 @@ package pull
22

33
import (
44
"context"
5+
56
"github.com/distribution/reference"
67
v1 "github.com/google/go-containerregistry/pkg/v1"
8+
"github.com/opencontainers/go-digest"
79
"github.com/silenium-dev/docker-wrapper/pkg/client/pull/events"
810
"github.com/silenium-dev/docker-wrapper/pkg/client/pull/state"
911
)
1012

11-
func StateFromStream(ctx context.Context, ref reference.Named, ch chan events.PullEvent, manifest *v1.Manifest, dig v1.Hash) chan state.Pull {
13+
func StateFromStream(
14+
ctx context.Context, ref reference.Named, isPodman bool, ch chan events.PullEvent, manifest *v1.Manifest,
15+
dig v1.Hash,
16+
) chan state.Pull {
1217
out := make(chan state.Pull)
1318

14-
go processEvents(ctx, ref, ch, manifest, dig, out)
19+
go processEvents(ctx, ref, isPodman, ch, manifest, dig, out)
1520

1621
return out
1722
}
1823

19-
func processEvents(ctx context.Context, ref reference.Named, ch chan events.PullEvent, manifest *v1.Manifest, dig v1.Hash, out chan state.Pull) {
24+
func processEvents(
25+
ctx context.Context, ref reference.Named, isPodman bool,
26+
ch chan events.PullEvent, manifest *v1.Manifest,
27+
dig v1.Hash, out chan state.Pull,
28+
) {
2029
defer close(out)
2130
var current state.Pull
2231
var err error
2332
for {
2433
select {
2534
case <-ctx.Done():
26-
return
35+
goto done
2736
case event, ok := <-ch:
2837
if !ok {
29-
return
38+
goto done
3039
}
3140
var next state.Pull
3241
if current == nil {
33-
next, err = state.NewPullState(ref, manifest, dig, event)
42+
next, err = state.NewPullState(ref, isPodman, manifest, dig, event)
3443
} else {
3544
next, err = current.Next(event)
3645
}
@@ -41,4 +50,29 @@ func processEvents(ctx context.Context, ref reference.Named, ch chan events.Pull
4150
out <- current
4251
}
4352
}
53+
done:
54+
_, pullComplete := current.(*state.PullComplete)
55+
_, pullErrored := current.(*state.PullErrored)
56+
if !pullComplete && !pullErrored {
57+
success := true
58+
pulledNewer := false
59+
for _, l := range current.Layers() {
60+
_, isComplete := l.(*state.LayerPullComplete)
61+
_, isAlreadyExists := l.(*state.LayerAlreadyExists)
62+
if isComplete {
63+
pulledNewer = true
64+
}
65+
if !isComplete && !isAlreadyExists {
66+
success = false
67+
break
68+
}
69+
}
70+
if success {
71+
out <- &state.PullComplete{
72+
PullBase: current.Base(),
73+
ImageDigest: digest.Digest(dig.String()),
74+
DownloadedNewer: pulledNewer,
75+
}
76+
}
77+
}
4478
}

pkg/client/pull/state/layer.go

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,20 @@ package state
22

33
import (
44
"fmt"
5-
"github.com/silenium-dev/docker-wrapper/pkg/client/pull/events"
65
"time"
6+
7+
"github.com/silenium-dev/docker-wrapper/pkg/client/pull/events"
78
)
89

9-
func NewLayer(event events.LayerEvent) (Layer, error) {
10+
func NewLayer(isPodman bool, event events.LayerEvent) (Layer, error) {
11+
base := layerBase{id: event.LayerId(), isPodman: isPodman}
1012
switch event := event.(type) {
1113
case *events.PullingFSLayer:
12-
return &LayerPullingFSLayer{layerBase{event.LayerId()}}, nil
14+
return &LayerPullingFSLayer{base}, nil
1315
case *events.AlreadyExists:
14-
return &LayerAlreadyExists{layerBase{event.LayerId()}}, nil
16+
return &LayerAlreadyExists{base}, nil
1517
case *events.LayerError:
16-
return &LayerErrored{layerBase{event.LayerId()}, event.Error}, nil
18+
return &LayerErrored{base, event.Error}, nil
1719
}
1820
return nil, fmt.Errorf("invalid initial event (%T)", event)
1921
}
@@ -46,7 +48,11 @@ func (l *LayerPullingFSLayer) Next(event events.LayerEvent) (Layer, error) {
4648
case *events.Downloading:
4749
return &LayerDownloading{l.layerBase, event.Progress()}, nil
4850
case *events.DownloadComplete:
49-
return &LayerDownloadComplete{l.layerBase}, nil
51+
if l.isPodman {
52+
return &LayerPullComplete{l.layerBase}, nil
53+
} else {
54+
return &LayerDownloadComplete{l.layerBase}, nil
55+
}
5056
case *events.AlreadyExists:
5157
return &LayerAlreadyExists{l.layerBase}, nil
5258
case *events.LayerError:
@@ -70,9 +76,13 @@ func (l *LayerWaiting) Next(event events.LayerEvent) (Layer, error) {
7076
case *events.Downloading:
7177
return &LayerDownloading{l.layerBase, event.Progress()}, nil
7278
case *events.DownloadComplete:
73-
return &LayerDownloadComplete{l.layerBase}, nil
79+
if l.isPodman {
80+
return &LayerPullComplete{l.layerBase}, nil
81+
} else {
82+
return &LayerDownloadComplete{l.layerBase}, nil
83+
}
7484
case *events.LayerError:
75-
return &LayerErrored{layerBase{event.LayerId()}, event.Error}, nil
85+
return &LayerErrored{l.layerBase, event.Error}, nil
7686
}
7787
return nil, fmt.Errorf("invalid transition (waiting + %T)", event)
7888
}
@@ -95,13 +105,17 @@ func (l *LayerDownloading) Next(event events.LayerEvent) (Layer, error) {
95105
case *events.Downloading:
96106
return &LayerDownloading{l.layerBase, event.Progress()}, nil
97107
case *events.DownloadComplete:
98-
return &LayerDownloadComplete{l.layerBase}, nil
108+
if l.isPodman {
109+
return &LayerPullComplete{l.layerBase}, nil
110+
} else {
111+
return &LayerDownloadComplete{l.layerBase}, nil
112+
}
99113
case *events.VerifyingChecksum:
100114
return &LayerVerifyingChecksum{l.layerBase}, nil
101115
case *events.Extracting:
102116
return parseLayerExtracting(l.layerBase, event), nil
103117
case *events.LayerError:
104-
return &LayerErrored{layerBase{event.LayerId()}, event.Error}, nil
118+
return &LayerErrored{l.layerBase, event.Error}, nil
105119
}
106120
return nil, fmt.Errorf("invalid transition (downloading + %T)", event)
107121
}
@@ -121,7 +135,7 @@ func (l *LayerVerifyingChecksum) Next(event events.LayerEvent) (Layer, error) {
121135
case *events.Extracting:
122136
return parseLayerExtracting(l.layerBase, event), nil
123137
case *events.LayerError:
124-
return &LayerErrored{layerBase{event.LayerId()}, event.Error}, nil
138+
return &LayerErrored{l.layerBase, event.Error}, nil
125139
}
126140
return nil, fmt.Errorf("invalid transition (verifying-checksum + %T)", event)
127141
}
@@ -139,11 +153,15 @@ func (l *LayerDownloadComplete) Next(event events.LayerEvent) (Layer, error) {
139153
case *events.Extracting:
140154
return parseLayerExtracting(l.layerBase, event), nil
141155
case *events.LayerError:
142-
return &LayerErrored{layerBase{event.LayerId()}, event.Error}, nil
156+
return &LayerErrored{l.layerBase, event.Error}, nil
143157
case *events.PullComplete:
144158
return &LayerPullComplete{l.layerBase}, nil
145159
case *events.DownloadComplete:
146-
return &LayerPullComplete{l.layerBase}, nil
160+
if l.isPodman {
161+
return &LayerPullComplete{l.layerBase}, nil
162+
} else {
163+
return &LayerDownloadComplete{l.layerBase}, nil
164+
}
147165
}
148166
return nil, fmt.Errorf("invalid transition (download-complete + %T)", event)
149167
}
@@ -180,7 +198,7 @@ func (l *LayerExtracting) Next(event events.LayerEvent) (Layer, error) {
180198
case *events.PullComplete:
181199
return &LayerPullComplete{l.layerBase}, nil
182200
case *events.LayerError:
183-
return &LayerErrored{layerBase{event.LayerId()}, event.Error}, nil
201+
return &LayerErrored{l.layerBase, event.Error}, nil
184202
}
185203
return nil, fmt.Errorf("invalid transition (extracting + %T)", event)
186204
}
@@ -210,7 +228,7 @@ func (l *LayerAlreadyExists) Next(event events.LayerEvent) (Layer, error) {
210228
case *events.Extracting:
211229
return parseLayerExtracting(l.layerBase, event), nil
212230
case *events.LayerError:
213-
return &LayerErrored{layerBase{event.LayerId()}, event.Error}, nil
231+
return &LayerErrored{l.layerBase, event.Error}, nil
214232
case *events.PullComplete:
215233
return &LayerPullComplete{l.layerBase}, nil
216234
case *events.AlreadyExists:
@@ -228,5 +246,11 @@ func (l *LayerPullComplete) Status() string {
228246
}
229247

230248
func (l *LayerPullComplete) Next(event events.LayerEvent) (Layer, error) {
249+
switch event.(type) {
250+
case *events.DownloadComplete:
251+
if l.isPodman {
252+
return &LayerPullComplete{l.layerBase}, nil
253+
}
254+
}
231255
return nil, fmt.Errorf("already completed, tried %T on layer-pull-complete", event)
232256
}

0 commit comments

Comments
 (0)