Skip to content

Commit a5b0ccf

Browse files
committed
[core] OCTRL-920 safer concurrency in KillTasks
Parallel attempts to kill tasks were found to be the primary cause for stuck auto-environments. In particular, it was due to channels in ackKilledTasks (and handling them) not expecting multiple listeners, so either one of the two kill acknowledgments would be stuck waiting for the acknowledgment to be received, or the other side, waiting for acknowledgment would never get it. It would cause KillTasks to be stuck indefinitely, which blocks the main auto-environment code-path.
1 parent e20e0f6 commit a5b0ccf

1 file changed

Lines changed: 12 additions & 3 deletions

File tree

core/task/manager.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ type Manager struct {
100100
schedulerState *schedulerState
101101
internalEventCh chan<- event.Event
102102
ackKilledTasks *safeAcks
103+
killTasksMu sync.Mutex // to avoid races when attempting to kill the same tasks in different goroutines
103104
}
104105

105106
func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *Manager, err error) {
@@ -1042,7 +1043,7 @@ func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) {
10421043
// If the task list includes locked tasks, TaskNotFoundError is returned.
10431044
func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err error) {
10441045
taskCanBeKilledFilter := func(t *Task) bool {
1045-
if t.IsLocked() {
1046+
if t.IsLocked() || m.ackKilledTasks.contains(t.taskId) {
10461047
return false
10471048
}
10481049
for _, id := range taskIds {
@@ -1053,20 +1054,27 @@ func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err
10531054
return false
10541055
}
10551056

1057+
if !m.killTasksMu.TryLock() {
1058+
log.WithField("level", infologger.IL_Support).Warnf("Scheduling killing tasks was delayed until another goroutine finishes doing so")
1059+
m.killTasksMu.Lock()
1060+
log.WithField("level", infologger.IL_Support).Infof("Scheduling killing tasks is resumed")
1061+
}
10561062
// TODO: use grouping instead of 2 passes of filtering for performance
10571063
toKill := m.roster.filtered(taskCanBeKilledFilter)
1058-
unkillable := m.roster.filtered(func(t *Task) bool { return !taskCanBeKilledFilter(t) })
10591064

10601065
if len(toKill) < len(taskIds) {
1066+
unkillable := m.roster.filtered(func(t *Task) bool { return !taskCanBeKilledFilter(t) })
10611067
log.WithField("taskIds", strings.Join(unkillable.GetTaskIds(), ", ")).
1062-
Debugf("some tasks cannot be physically killed (already dead?), will instead only be removed from roster")
1068+
Debugf("some tasks cannot be physically killed (already dead or being killed in another goroutine?), will instead only be removed from roster")
10631069
}
10641070

10651071
for _, id := range toKill.GetTaskIds() {
10661072
m.ackKilledTasks.addAckChannel(id)
10671073
}
10681074

10691075
killed, running, err = m.doKillTasks(toKill)
1076+
m.killTasksMu.Unlock()
1077+
10701078
for _, id := range killed.GetTaskIds() {
10711079
ack, ok := m.ackKilledTasks.getValue(id)
10721080
if ok {
@@ -1088,6 +1096,7 @@ func (m *Manager) doKillTasks(tasks Tasks) (killed Tasks, running Tasks, err err
10881096
inactiveTasks := tasks.Filtered(func(task *Task) bool {
10891097
return task.status != ACTIVE
10901098
})
1099+
10911100
// Remove from the roster the tasks which are also in the inactiveTasks list to delete
10921101
m.roster.updateTasks(m.roster.filtered(func(task *Task) bool {
10931102
return !inactiveTasks.Contains(func(t *Task) bool {

0 commit comments

Comments
 (0)