Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions swarmd/cmd/swarmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"os"
"os/signal"

engineapi "github.com/docker/docker/client"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
engineapi "github.com/moby/moby/client"
"github.com/moby/swarmkit/swarmd/dockerexec"
"github.com/moby/swarmkit/swarmd/internal/defaults"
"github.com/moby/swarmkit/v2/api"
Expand Down Expand Up @@ -171,7 +171,7 @@ var (
return err
}

client, err := engineapi.NewClientWithOpts(
client, err := engineapi.New(
engineapi.WithHost(engineAddr),
)
if err != nil {
Expand Down
63 changes: 36 additions & 27 deletions swarmd/dockerexec/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"strings"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
engineapi "github.com/docker/docker/client"
gogotypes "github.com/gogo/protobuf/types"
"github.com/moby/moby/api/types/container"
"github.com/moby/moby/api/types/events"
engineapi "github.com/moby/moby/client"
"github.com/moby/swarmkit/v2/agent/exec"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/log"
Expand Down Expand Up @@ -42,16 +41,16 @@ func newContainerAdapter(client engineapi.APIClient, nodeDescription *api.NodeDe
}, nil
}

func noopPrivilegeFn() (string, error) { return "", nil }
func noopPrivilegeFn(context.Context) (string, error) { return "", nil }

func (c *containerConfig) imagePullOptions() types.ImagePullOptions {
func (c *containerConfig) imagePullOptions() engineapi.ImagePullOptions {
var registryAuth string

if c.spec().PullOptions != nil {
registryAuth = c.spec().PullOptions.RegistryAuth
}

return types.ImagePullOptions{
return engineapi.ImagePullOptions{
// if the image needs to be pulled, the auth config will be retrieved and updated
RegistryAuth: registryAuth,
PrivilegeFunc: noopPrivilegeFn,
Expand Down Expand Up @@ -130,7 +129,7 @@ func (c *containerAdapter) createNetworks(ctx context.Context) error {

func (c *containerAdapter) removeNetworks(ctx context.Context) error {
for _, nid := range c.container.networks() {
if err := c.client.NetworkRemove(ctx, nid); err != nil {
if _, err := c.client.NetworkRemove(ctx, nid, engineapi.NetworkRemoveOptions{}); err != nil {
if isActiveEndpointError(err) {
continue
}
Expand All @@ -144,32 +143,36 @@ func (c *containerAdapter) removeNetworks(ctx context.Context) error {
}

func (c *containerAdapter) create(ctx context.Context) error {
_, err := c.client.ContainerCreate(ctx,
c.container.config(),
c.container.hostConfig(),
c.container.networkingConfig(),
nil,
c.container.name(),
)
_, err := c.client.ContainerCreate(ctx, engineapi.ContainerCreateOptions{
Config: c.container.config(),
HostConfig: c.container.hostConfig(),
NetworkingConfig: c.container.networkingConfig(),
Name: c.container.name(),
})

return err
}

func (c *containerAdapter) start(ctx context.Context) error {
// TODO(nishanttotla): Consider adding checkpoint handling later
return c.client.ContainerStart(ctx, c.container.name(), types.ContainerStartOptions{})
_, err := c.client.ContainerStart(ctx, c.container.name(), engineapi.ContainerStartOptions{})
return err
}

func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
return c.client.ContainerInspect(ctx, c.container.name())
func (c *containerAdapter) inspect(ctx context.Context) (container.InspectResponse, error) {
res, err := c.client.ContainerInspect(ctx, c.container.name(), engineapi.ContainerInspectOptions{})
if err != nil {
return container.InspectResponse{}, err
}
return res.Container, nil
}

// events issues a call to the events API and returns a channel with all
// events. The stream of events can be shutdown by cancelling the context.
//
// A chan struct{} is returned that will be closed if the event processing
// fails and needs to be restarted.
func (c *containerAdapter) events(ctx context.Context) (<-chan events.Message, <-chan struct{}, error) {
func (c *containerAdapter) events(ctx context.Context) engineapi.EventsResult {
// TODO(stevvooe): Move this to a single, global event dispatch. For
// now, we create a connection per container.
var (
Expand All @@ -180,7 +183,7 @@ func (c *containerAdapter) events(ctx context.Context) (<-chan events.Message, <
log.G(ctx).Debugf("waiting on events")
// TODO(stevvooe): For long running tasks, it is likely that we will have
// to restart this under failure.
eventCh, errCh := c.client.Events(ctx, types.EventsOptions{
res := c.client.Events(ctx, engineapi.EventsListOptions{
Since: "0",
Filters: c.container.eventFilter(),
})
Expand All @@ -190,13 +193,13 @@ func (c *containerAdapter) events(ctx context.Context) (<-chan events.Message, <

for {
select {
case msg := <-eventCh:
case msg := <-res.Messages:
select {
case eventsq <- msg:
case <-ctx.Done():
return
}
case err := <-errCh:
case err := <-res.Err:
log.G(ctx).WithError(err).Error("error from events stream")
return
case <-ctx.Done():
Expand All @@ -206,7 +209,10 @@ func (c *containerAdapter) events(ctx context.Context) (<-chan events.Message, <
}
}()

return eventsq, closed, nil
return engineapi.EventsResult{
Messages: eventsq,
Err: nil,
}
}

func (c *containerAdapter) shutdown(ctx context.Context) error {
Expand All @@ -220,18 +226,21 @@ func (c *containerAdapter) shutdown(ctx context.Context) error {
stopgraceFromProto, _ := gogotypes.DurationFromProto(spec.StopGracePeriod)
stopgraceSeconds = int(stopgraceFromProto.Seconds())
}
return c.client.ContainerStop(ctx, c.container.name(), container.StopOptions{Timeout: &stopgraceSeconds})
_, err := c.client.ContainerStop(ctx, c.container.name(), engineapi.ContainerStopOptions{Timeout: &stopgraceSeconds})
return err
}

func (c *containerAdapter) terminate(ctx context.Context) error {
return c.client.ContainerKill(ctx, c.container.name(), "")
_, err := c.client.ContainerKill(ctx, c.container.name(), engineapi.ContainerKillOptions{})
return err
}

func (c *containerAdapter) remove(ctx context.Context) error {
return c.client.ContainerRemove(ctx, c.container.name(), types.ContainerRemoveOptions{
_, err := c.client.ContainerRemove(ctx, c.container.name(), engineapi.ContainerRemoveOptions{
RemoveVolumes: true,
Force: true,
})
return err
}

func (c *containerAdapter) createVolumes(ctx context.Context) error {
Expand Down Expand Up @@ -268,7 +277,7 @@ func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscription
return nil, errors.New("logs not supported on services with TTY")
}

apiOptions := types.ContainerLogsOptions{
apiOptions := engineapi.ContainerLogsOptions{
Follow: options.Follow,
Timestamps: true,
Details: false,
Expand Down
97 changes: 53 additions & 44 deletions swarmd/dockerexec/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@ import (
"errors"
"fmt"
"net"
"net/netip"
"strconv"
"strings"
"time"

"github.com/docker/docker/api/types"
enginecontainer "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
enginemount "github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/volume"
"github.com/docker/go-connections/nat"
"github.com/docker/go-units"
gogotypes "github.com/gogo/protobuf/types"
enginecontainer "github.com/moby/moby/api/types/container"
"github.com/moby/moby/api/types/events"
enginemount "github.com/moby/moby/api/types/mount"
"github.com/moby/moby/api/types/network"
engineapi "github.com/moby/moby/client"
"github.com/moby/swarmkit/v2/agent/exec"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/api/genericresource"
Expand Down Expand Up @@ -93,12 +91,13 @@ func (c *containerConfig) image() string {
return c.spec().Image
}

func portSpec(port uint32, protocol api.PortConfig_Protocol) nat.Port {
return nat.Port(fmt.Sprintf("%d/%s", port, strings.ToLower(protocol.String())))
func portSpec(port uint32, protocol api.PortConfig_Protocol) network.Port {
p, _ := network.ParsePort(fmt.Sprintf("%d/%s", port, strings.ToLower(protocol.String())))
return p
}

func (c *containerConfig) portBindings() nat.PortMap {
portBindings := nat.PortMap{}
func (c *containerConfig) portBindings() network.PortMap {
portBindings := network.PortMap{}
if c.task.Endpoint == nil {
return portBindings
}
Expand All @@ -109,7 +108,7 @@ func (c *containerConfig) portBindings() nat.PortMap {
}

port := portSpec(portConfig.TargetPort, portConfig.Protocol)
binding := []nat.PortBinding{
binding := []network.PortBinding{
{},
}

Expand All @@ -125,17 +124,18 @@ func (c *containerConfig) portBindings() nat.PortMap {
func (c *containerConfig) isolation() enginecontainer.Isolation {
switch c.spec().Isolation {
case api.ContainerIsolationDefault:
return enginecontainer.Isolation("default")
return "default"
case api.ContainerIsolationHyperV:
return enginecontainer.Isolation("hyperv")
return "hyperv"
case api.ContainerIsolationProcess:
return enginecontainer.Isolation("process")
return "process"
default:
return ""
}
return enginecontainer.Isolation("")
}

func (c *containerConfig) exposedPorts() map[nat.Port]struct{} {
exposedPorts := make(map[nat.Port]struct{})
func (c *containerConfig) exposedPorts() network.PortSet {
exposedPorts := make(network.PortSet)
if c.task.Endpoint == nil {
return exposedPorts
}
Expand Down Expand Up @@ -427,7 +427,7 @@ func getMountMask(m *api.Mount) string {
}

// This handles the case of volumes that are defined inside a service Mount
func (c *containerConfig) volumeCreateRequest(mount *api.Mount) *volume.CreateOptions {
func (c *containerConfig) volumeCreateRequest(mount *api.Mount) *engineapi.VolumeCreateOptions {
var (
driverName string
driverOpts map[string]string
Expand All @@ -441,7 +441,7 @@ func (c *containerConfig) volumeCreateRequest(mount *api.Mount) *volume.CreateOp
}

// FIXME: do we need the ClusterVolumeSpec here?
return &volume.CreateOptions{
return &engineapi.VolumeCreateOptions{
Name: mount.Source,
Driver: driverName,
DriverOpts: driverOpts,
Expand Down Expand Up @@ -513,20 +513,20 @@ func (c *containerConfig) virtualIP(networkID string) string {
func (c *containerConfig) networkingConfig() *network.NetworkingConfig {
epConfig := make(map[string]*network.EndpointSettings)
for _, na := range c.task.Networks {
var ipv4, ipv6 string
var ipv4, ipv6 netip.Addr
for _, addr := range na.Addresses {
ip, _, err := net.ParseCIDR(addr)
prefix, err := netip.ParsePrefix(addr)
if err != nil {
continue
}

if ip.To4() != nil {
ipv4 = ip.String()
ip := prefix.Addr()
if ip.Is4() {
ipv4 = ip
continue
}

if ip.To16() != nil {
ipv6 = ip.String()
if ip.Is6() {
ipv6 = ip
}
}

Expand Down Expand Up @@ -556,39 +556,48 @@ func (c *containerConfig) networks() []string {
return networks
}

func (c *containerConfig) networkCreateOptions(name string) (types.NetworkCreate, error) {
func (c *containerConfig) networkCreateOptions(name string) (engineapi.NetworkCreateOptions, error) {
na, ok := c.networksAttachments[name]
if !ok {
return types.NetworkCreate{}, errors.New("container: unknown network referenced")
return engineapi.NetworkCreateOptions{}, errors.New("container: unknown network referenced")
}

options := types.NetworkCreate{
options := engineapi.NetworkCreateOptions{
Driver: na.Network.DriverState.Name,
IPAM: &network.IPAM{
Driver: na.Network.IPAM.Driver.Name,
},
Options: na.Network.DriverState.Options,
CheckDuplicate: true,
Options: na.Network.DriverState.Options,
}

for _, ic := range na.Network.IPAM.Configs {
c := network.IPAMConfig{
Subnet: ic.Subnet,
IPRange: ic.Range,
Gateway: ic.Gateway,
sn, err := netip.ParsePrefix(ic.Subnet)
if err != nil {
continue
}
r, err := netip.ParsePrefix(ic.Range)
if err != nil {
continue
}
gw, err := netip.ParseAddr(ic.Gateway)
if err != nil {
continue
}
options.IPAM.Config = append(options.IPAM.Config, c)
options.IPAM.Config = append(options.IPAM.Config, network.IPAMConfig{
Subnet: sn,
IPRange: r,
Gateway: gw,
})
}

return options, nil
}

func (c containerConfig) eventFilter() filters.Args {
filter := filters.NewArgs()
filter.Add("type", string(events.ContainerEventType))
filter.Add("name", c.name())
filter.Add("label", fmt.Sprintf("%v.task.id=%v", systemLabelPrefix, c.task.ID))
return filter
func (c containerConfig) eventFilter() engineapi.Filters {
return make(engineapi.Filters).
Add("type", string(events.ContainerEventType)).
Add("name", c.name()).
Add("label", fmt.Sprintf("%v.task.id=%v", systemLabelPrefix, c.task.ID))
}

func (c *containerConfig) init() *bool {
Expand Down
6 changes: 3 additions & 3 deletions swarmd/dockerexec/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"testing"
"time"

enginecontainer "github.com/docker/docker/api/types/container"
enginemount "github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/strslice"
"github.com/docker/go-units"
gogotypes "github.com/gogo/protobuf/types"
enginecontainer "github.com/moby/moby/api/types/container"
enginemount "github.com/moby/moby/api/types/mount"
"github.com/moby/moby/api/types/strslice"
"github.com/moby/swarmkit/v2/api"
)

Expand Down
Loading
Loading