Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ loop:
if t.DesiredState < api.TaskStateAssigned || t.DesiredState > api.TaskStateCompleted {
continue
}
// Completed tasks should not consume node reservations. This is
// particularly important for replicated-job services, where tasks may
// remain with DesiredState=Completed after they finished.
if t.Status.State >= api.TaskStateCompleted {
continue
}

// Ensure that the node still satisfies placement constraints.
// NOTE: If the task is associacted with a service then we must use the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,82 @@ import (
"github.com/stretchr/testify/require"
)

func TestRejectNoncompliantTasksIgnoresCompletedJobTasksInReservations(t *testing.T) {
s := store.NewMemoryStore(nil)
require.NotNil(t, s)
t.Cleanup(func() { _ = s.Close() })

node := &api.Node{
ID: "node1",
Spec: api.NodeSpec{
Availability: api.NodeAvailabilityActive,
},
Description: &api.NodeDescription{
Resources: &api.Resources{
MemoryBytes: 1024,
},
},
}

// A live task that fits if completed job tasks are ignored.
runningTask := &api.Task{
ID: "running1",
NodeID: node.ID,
ServiceID: "svc1",
DesiredState: api.TaskStateRunning,
Status: api.TaskStatus{
State: api.TaskStateRunning,
},
Spec: api.TaskSpec{
Resources: &api.ResourceRequirements{
Reservations: &api.Resources{
MemoryBytes: 700,
},
},
},
}

// A completed replicated-job task that should not consume reservations.
completedJobTask := &api.Task{
ID: "job1",
NodeID: node.ID,
ServiceID: "jobsvc",
DesiredState: api.TaskStateCompleted,
Status: api.TaskStatus{
State: api.TaskStateCompleted,
},
Spec: api.TaskSpec{
Resources: &api.ResourceRequirements{
Reservations: &api.Resources{
MemoryBytes: 700,
},
},
},
}

require.NoError(t, s.Update(func(tx store.Tx) error {
if err := store.CreateNode(tx, node); err != nil {
return err
}
if err := store.CreateTask(tx, runningTask); err != nil {
return err
}
if err := store.CreateTask(tx, completedJobTask); err != nil {
return err
}
return nil
}))

ce := New(s)
ce.rejectNoncompliantTasks(node)

s.View(func(tx store.ReadTx) {
got := store.GetTask(tx, runningTask.ID)
require.NotNil(t, got)
assert.NotEqual(t, api.TaskStateRejected, got.Status.State, "running task unexpectedly rejected; completed job tasks should not count toward reservations")
})
}

func TestConstraintEnforcer(t *testing.T) {
nodes := []*api.Node{
// this node starts as a worker, but then is changed to a manager.
Expand Down
Loading