Skip to content

Commit 0da8604

Browse files
justonedev1teo
authored andcommitted
fix error handling in case of no resources error
1 parent 153a87c commit 0da8604

3 files changed

Lines changed: 46 additions & 21 deletions

File tree

core/environment/transition_deploy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func (t DeployTransition) do(env *Environment) (err error) {
275275
WithField("partition", env.Id().String()).
276276
WithField("timeout", deploymentTimeout).
277277
WithField("detector", detector).
278-
Error("role failed to deploy within timeout")
278+
Error("role failed to deploy because of timeout")
279279
undeployableTaskRoles = append(undeployableTaskRoles, role.GetPath())
280280
} else if roleStatus != task.ACTIVE {
281281
detector, ok := role.GetUserVars().Get("detector")
@@ -288,7 +288,7 @@ func (t DeployTransition) do(env *Environment) (err error) {
288288
WithField("partition", env.Id().String()).
289289
WithField("timeout", deploymentTimeout).
290290
WithField("detector", detector).
291-
Error("role failed to deploy within timeout")
291+
Error("role failed to deploy because of timeout")
292292
inactiveTaskRoles = append(inactiveTaskRoles, role.GetPath())
293293
}
294294
})

core/task/manager.go

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,14 @@ func (m *Manager) RefreshClasses(taskClassesRequired []string) (err error) {
326326
return
327327
}
328328

329+
// prefix will be prepended before name of descriptor
330+
func logDescriptors(prefix string, logFunc func(format string, args ...interface{}), descriptos Descriptors) {
331+
for _, desc := range descriptos {
332+
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
333+
logFunc("%s%s", prefix, printname)
334+
}
335+
}
336+
329337
func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err error) {
330338

331339
/*
@@ -341,6 +349,10 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
341349
4) start the tasks in tasksToRun
342350
5) ensure that all of them reach a CONFIGURED state
343351
*/
352+
353+
// TODO: switch to this logger with envId everywhere
354+
logWithId := log.WithField("partition", envId)
355+
344356
claimableTasks := m.roster.filtered(func(task *Task) bool {
345357
return task.IsClaimable()
346358
})
@@ -503,6 +515,17 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
503515
// The request object is used to pass the tasks to deploy and the outcome
504516
// channel to the deployment routine.
505517

518+
// reset all variable before try
519+
deploymentSuccess = true
520+
undeployedDescriptors = make(Descriptors, 0)
521+
undeployableDescriptors = make(Descriptors, 0)
522+
undeployedNonCriticalDescriptors = make(Descriptors, 0)
523+
undeployedCriticalDescriptors = make(Descriptors, 0)
524+
undeployableNonCriticalDescriptors = make(Descriptors, 0)
525+
undeployableCriticalDescriptors = make(Descriptors, 0)
526+
527+
deployedTasks = make(DeploymentMap)
528+
506529
outcomeCh := make(chan ResourceOffersOutcome)
507530
m.tasksToDeploy <- &ResourceOffersDeploymentRequest{
508531
tasksToDeploy: tasksToRun,
@@ -531,45 +554,30 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
531554
undeployedDescriptors = roOutcome.undeployed
532555
undeployableDescriptors = roOutcome.undeployable
533556

534-
log.WithField("tasks", deployedTasks).
535-
WithField("partition", envId).
557+
logWithId.WithField("tasks", deployedTasks).
536558
Debugf("resourceOffers is done, %d new tasks running", len(deployedTasks))
537559

538560
if len(deployedTasks) != len(tasksToRun) {
539561
// ↑ Not all roles could be deployed. If some were critical,
540562
// we cannot proceed with running this environment. Either way,
541563
// we keep the roles running since they might be useful in the future.
542-
log.WithField("partition", envId).
543-
Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRun), len(deployedTasks))
564+
logWithId.Errorf("environment deployment failure: %d tasks requested for deployment, but %d deployed", len(tasksToRun), len(deployedTasks))
544565

545566
for _, desc := range undeployedDescriptors {
546567
if desc.TaskRole.GetTaskTraits().Critical == true {
547568
deploymentSuccess = false
548569
undeployedCriticalDescriptors = append(undeployedCriticalDescriptors, desc)
549-
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
550-
log.WithField("partition", envId).
551-
Errorf("critical task deployment failure: %s", printname)
552570
} else {
553571
undeployedNonCriticalDescriptors = append(undeployedNonCriticalDescriptors, desc)
554-
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
555-
log.WithField("partition", envId).
556-
Warnf("non-critical task deployment failure: %s", printname)
557572
}
558573
}
559574

560575
for _, desc := range undeployableDescriptors {
561576
if desc.TaskRole.GetTaskTraits().Critical == true {
562577
deploymentSuccess = false
563578
undeployableCriticalDescriptors = append(undeployableCriticalDescriptors, desc)
564-
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
565-
log.WithField("partition", envId).
566-
Errorf("critical task deployment impossible: %s", printname)
567-
go desc.TaskRole.UpdateStatus(UNDEPLOYABLE)
568579
} else {
569580
undeployableNonCriticalDescriptors = append(undeployableNonCriticalDescriptors, desc)
570-
printname := fmt.Sprintf("%s->%s", desc.TaskRole.GetPath(), desc.TaskClassName)
571-
log.WithField("partition", envId).
572-
Warnf("non-critical task deployment impossible: %s", printname)
573581
}
574582
}
575583
}
@@ -587,6 +595,22 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
587595
}
588596
break DEPLOYMENT_ATTEMPTS_LOOP
589597
}
598+
599+
log.WithField("partition", envId).Errorf("Deployment failed %d/%d attempts. Check messages in IL to figure out why. Retrying...", attemptCount+1, MAX_ATTEMPTS_PER_DEPLOY_REQUEST)
600+
time.Sleep(time.Second * SLEEP_LENGTH_BETWEEN_PER_DEPLOY_REQUESTS)
601+
}
602+
}
603+
604+
logDescriptors("critical task deployment impossible: ", logWithId.Errorf, undeployableCriticalDescriptors)
605+
logDescriptors("critical task deployment failure: ", logWithId.Errorf, undeployedCriticalDescriptors)
606+
607+
logDescriptors("non-critical task deployment failure: ", logWithId.Warningf, undeployedNonCriticalDescriptors)
608+
logDescriptors("non-critical task deployment impossible: ", logWithId.Warningf, undeployableNonCriticalDescriptors)
609+
610+
// After retries notify environment about failed critical tasks
611+
for _, desc := range undeployableDescriptors {
612+
if desc.TaskRole.GetTaskTraits().Critical == true {
613+
desc.TaskRole.UpdateStatus(UNDEPLOYABLE)
590614
}
591615
}
592616

core/task/schedulerstate.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ import (
4949
)
5050

5151
const (
52-
MAX_CONCURRENT_DEPLOY_REQUESTS = 100
53-
MAX_ATTEMPTS_PER_DEPLOY_REQUEST = 3
52+
MAX_CONCURRENT_DEPLOY_REQUESTS = 100
53+
MAX_ATTEMPTS_PER_DEPLOY_REQUEST = 3
54+
SLEEP_LENGTH_BETWEEN_PER_DEPLOY_REQUESTS = 1 // in seconds
5455
)
5556

5657
type schedulerState struct {

0 commit comments

Comments
 (0)