Skip to content

Commit 153a87c

Browse files
justonedev1teo
authored andcommitted
Revert "use two channel to communicate mesos REVIVE"
This reverts commit e99d419.
1 parent e619a77 commit 153a87c

3 files changed

Lines changed: 41 additions & 35 deletions

File tree

core/task/manager.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,8 @@ type Manager struct {
9191

9292
tasksToDeploy chan<- *ResourceOffersDeploymentRequest
9393

94-
reviveOffersTrg chan struct{}
95-
reviveOffersDone chan struct{}
96-
cq *controlcommands.CommandQueue
94+
reviveOffersTrg chan struct{}
95+
cq *controlcommands.CommandQueue
9796

9897
tasksLaunched int
9998
tasksFinished int
@@ -142,7 +141,6 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M
142141
taskman.cq = taskman.schedulerState.commandqueue
143142
taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy
144143
taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg
145-
taskman.reviveOffersDone = taskman.schedulerState.reviveOffersDone
146144
taskman.ackKilledTasks = newAcks()
147145

148146
schedState.setupCli()
@@ -158,8 +156,7 @@ func (m *Manager) newTaskForMesosOffer(
158156
offer *mesos.Offer,
159157
descriptor *Descriptor,
160158
localBindMap channel.BindMap,
161-
executorId mesos.ExecutorID,
162-
) (t *Task) {
159+
executorId mesos.ExecutorID) (t *Task) {
163160
newId := uid.New().String()
164161
t = &Task{
165162
name: fmt.Sprintf("%s#%s", descriptor.TaskClassName, newId),
@@ -200,8 +197,8 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass.
200197
if err != nil {
201198
return
202199
}
203-
repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] // get IRepo pointer from RepoManager
204-
if repo == nil { // should never end up here
200+
repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] //get IRepo pointer from RepoManager
201+
if repo == nil { //should never end up here
205202
return nil, errors.New("getTaskClassList: repo not found for " + taskClass)
206203
}
207204

@@ -226,6 +223,7 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass.
226223
taskInfo := strings.Split(taskPath, "/tasks/")
227224
if len(taskInfo) == 1 {
228225
taskFilename = taskInfo[0]
226+
229227
} else {
230228
taskFilename = taskInfo[1]
231229
}
@@ -282,7 +280,7 @@ func (m *Manager) removeInactiveClasses() {
282280
return
283281
}
284282

285-
func (m *Manager) RemoveReposClasses(repoPath string) { // Currently unused
283+
func (m *Manager) RemoveReposClasses(repoPath string) { //Currently unused
286284
utils.EnsureTrailingSlash(&repoPath)
287285

288286
_ = m.classes.Do(func(classMap *map[string]*taskclass.Class) error {
@@ -329,6 +327,7 @@ func (m *Manager) RefreshClasses(taskClassesRequired []string) (err error) {
329327
}
330328

331329
func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err error) {
330+
332331
/*
333332
Here's what's gonna happen:
334333
1) check if any tasks are already in Roster, whether they are already locked
@@ -517,7 +516,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
517516
timeReviveOffers := time.Now()
518517
timeDeployMu := time.Now()
519518
m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers
520-
<-m.reviveOffersDone // we only continue when it's done
519+
<-m.reviveOffersTrg // we only continue when it's done
521520
utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers",
522521
log.WithField("tasksToRun", len(tasksToRun)).
523522
WithField("partition", envId))
@@ -598,7 +597,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
598597
// can't lock some of them, so we must roll back and keep them
599598
// unlocked in the roster.
600599
var deployedTaskIds []string
601-
for taskPtr := range deployedTasks {
600+
for taskPtr, _ := range deployedTasks {
602601
taskPtr.SetParent(nil)
603602
deployedTaskIds = append(deployedTaskIds, taskPtr.taskId)
604603
}
@@ -613,11 +612,11 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
613612
}
614613

615614
// Finally, we write to the roster. Point of no return!
616-
for taskPtr := range deployedTasks {
615+
for taskPtr, _ := range deployedTasks {
617616
m.roster.append(taskPtr)
618617
}
619618
if deploymentSuccess {
620-
for taskPtr := range deployedTasks {
619+
for taskPtr, _ := range deployedTasks {
621620
taskPtr.GetParent().SetTask(taskPtr)
622621
}
623622
for taskPtr, descriptor := range tasksAlreadyRunning {
@@ -630,6 +629,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
630629
}
631630

632631
func (m *Manager) releaseTasks(envId uid.ID, tasks Tasks) error {
632+
633633
taskReleaseErrors := make(map[string]error)
634634
taskIdsReleased := make([]string, 0)
635635

@@ -686,7 +686,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
686686
taskPath := task.GetParentRolePath()
687687
for inbChName, endpoint := range task.GetLocalBindMap() {
688688
var bindMapKey string
689-
if strings.HasPrefix(inbChName, "::") { // global channel alias
689+
if strings.HasPrefix(inbChName, "::") { //global channel alias
690690
bindMapKey = inbChName
691691

692692
// deduplication
@@ -785,6 +785,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
785785
func (m *Manager) transitionTasks(envId uid.ID, tasks Tasks, src string, event string, dest string, commonArgs controlcommands.PropertyMap) error {
786786
notify := make(chan controlcommands.MesosCommandResponse)
787787
receivers, err := tasks.GetMesosCommandTargets()
788+
788789
if err != nil {
789790
return err
790791
}
@@ -869,6 +870,7 @@ func (m *Manager) TriggerHooks(envId uid.ID, tasks Tasks) error {
869870

870871
notify := make(chan controlcommands.MesosCommandResponse)
871872
receivers, err := tasks.GetMesosCommandTargets()
873+
872874
if err != nil {
873875
return err
874876
}
@@ -933,6 +935,7 @@ func (m *Manager) GetTask(id string) *Task {
933935
}
934936

935937
func (m *Manager) updateTaskState(taskId string, state string) {
938+
936939
taskPtr := m.roster.getByTaskId(taskId)
937940
if taskPtr == nil {
938941
log.WithField("taskId", taskId).
@@ -986,7 +989,7 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {
986989
}
987990
if ack, ok := m.ackKilledTasks.getValue(taskId); ok {
988991
ack <- struct{}{}
989-
// close(ack) // It can even be left open?
992+
//close(ack) // It can even be left open?
990993
}
991994

992995
return
@@ -1027,6 +1030,7 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {
10271030

10281031
// Kill all tasks outside an environment (all unlocked tasks)
10291032
func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) {
1033+
10301034
toKill := m.roster.filtered(func(t *Task) bool {
10311035
return !t.IsLocked()
10321036
})

core/task/scheduler.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ var schedEventsCh = make(chan scheduler.Event_Type)
8484

8585
func runSchedulerController(ctx context.Context,
8686
state *schedulerState,
87-
fidStore store.Singleton,
88-
) error {
87+
fidStore store.Singleton) error {
8988
// Set up communication from controller to state machine.
9089
go func() {
9190
for {
@@ -104,7 +103,7 @@ func runSchedulerController(ctx context.Context,
104103
for {
105104
<-state.reviveOffersTrg
106105
doReviveOffers(ctx, state)
107-
state.reviveOffersDone <- struct{}{}
106+
state.reviveOffersTrg <- struct{}{}
108107
}
109108
}()
110109

@@ -273,6 +272,7 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc {
273272
// only one entry in the list, we signal back to commandqueue
274273
// otherwise, we log and ignore.
275274
return func(ctx context.Context, e *scheduler.Event) (err error) {
275+
276276
mesosMessage := e.GetMessage()
277277
if mesosMessage == nil {
278278
err = errors.New("message handler got bad MESSAGE")
@@ -336,7 +336,7 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc {
336336
return
337337
}
338338
state.taskman.internalEventCh <- ev
339-
// state.handleDeviceEvent(ev)
339+
//state.handleDeviceEvent(ev)
340340
} else {
341341
log.WithFields(logrus.Fields{
342342
"type": incomingEvent.Type.String(),
@@ -437,7 +437,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
437437
timeResourceOffersCall := time.Now()
438438
var (
439439
offers = e.GetOffers().GetOffers()
440-
callOption = calls.RefuseSeconds(time.Second) // calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds)
440+
callOption = calls.RefuseSeconds(time.Second) //calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds)
441441
tasksLaunchedThisCycle = 0
442442
offersDeclined = 0
443443
)
@@ -613,7 +613,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
613613
var offerWaitGroup sync.WaitGroup
614614
offerWaitGroup.Add(len(offers))
615615

616-
for offerIndex := range offers {
616+
for offerIndex, _ := range offers {
617617
go func(offerIndex int) {
618618
defer offerWaitGroup.Done()
619619

@@ -1013,6 +1013,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
10131013
WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)).
10141014
WithField("offers", len(offers)).
10151015
WithField("offerHost", offer.Hostname))
1016+
10161017
}(offerIndex) // end for offer closure
10171018
} // end for _, offer := range offers
10181019
offerWaitGroup.Wait()
@@ -1093,7 +1094,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
10931094
machinesUsedSlice := func(machines map[string]struct{}) []string { // StringSet to StringSlice
10941095
out := make([]string, len(machines))
10951096
i := 0
1096-
for k := range machines {
1097+
for k, _ := range machines {
10971098
out[i] = k
10981099
i++
10991100
}
@@ -1183,7 +1184,6 @@ func (state *schedulerState) statusUpdate() events.HandlerFunc {
11831184
// have set through ACCEPT or DECLINE calls, in the hope that Mesos then sends us new resource offers.
11841185
// This should generally run when we have received a TASK_FINISHED for some tasks, and we have more
11851186
// tasks to run.
1186-
11871187
func (state *schedulerState) tryReviveOffers(ctx context.Context) {
11881188
// limit the rate at which we request offer revival
11891189
select {
@@ -1274,7 +1274,7 @@ func logAllEvents() eventrules.Rule {
12741274
}
12751275
}
12761276
offerIds := make([]string, len(off))
1277-
for i := range off {
1277+
for i, _ := range off {
12781278
offerIds[i] = off[i].GetID().Value
12791279
}
12801280
fields["offerIds"] = strings.Join(offerIds, ",")
@@ -1335,6 +1335,7 @@ func makeTaskForMesosResources(
13351335
descriptorDetector string,
13361336
offerIDsToDecline map[mesos.OfferID]struct{},
13371337
) (*Task, *mesos.TaskInfo) {
1338+
13381339
bindMap := make(channel.BindMap)
13391340
for _, ch := range wants.InboundChannels {
13401341
if ch.Addressing == channel.IPC {
@@ -1367,7 +1368,7 @@ func makeTaskForMesosResources(
13671368
Attributes: offer.Attributes,
13681369
Hostname: offer.Hostname,
13691370
}
1370-
state.taskman.AgentCache.Update(agentForCache) // thread safe
1371+
state.taskman.AgentCache.Update(agentForCache) //thread safe
13711372
machinesUsed[offer.Hostname] = struct{}{}
13721373

13731374
taskPtr := state.taskman.newTaskForMesosOffer(offer, descriptor, bindMap, targetExecutorId)
@@ -1565,11 +1566,12 @@ func makeTaskForMesosResources(
15651566
ldLibPath, ok := agentForCache.Attributes.Get("executor_env_LD_LIBRARY_PATH")
15661567
mesosTaskInfo.Executor.Command.Environment = &mesos.Environment{}
15671568
if ok {
1568-
mesosTaskInfo.Executor.Command.Environment.Variables = append(mesosTaskInfo.Executor.Command.Environment.Variables,
1569-
mesos.Environment_Variable{
1570-
Name: "LD_LIBRARY_PATH",
1571-
Value: proto.String(ldLibPath),
1572-
})
1569+
mesosTaskInfo.Executor.Command.Environment.Variables =
1570+
append(mesosTaskInfo.Executor.Command.Environment.Variables,
1571+
mesos.Environment_Variable{
1572+
Name: "LD_LIBRARY_PATH",
1573+
Value: proto.String(ldLibPath),
1574+
})
15731575
}
15741576

15751577
return taskPtr, &mesosTaskInfo

core/task/schedulerstate.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,8 @@ type schedulerState struct {
6969
reviveTokens <-chan struct{}
7070
tasksToDeploy chan *ResourceOffersDeploymentRequest
7171

72-
reviveOffersTrg chan struct{}
73-
reviveOffersDone chan struct{}
74-
random *rand.Rand
72+
reviveOffersTrg chan struct{}
73+
random *rand.Rand
7574

7675
// shouldn't change at runtime, so thread safe:
7776
role string
@@ -107,6 +106,8 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
107106

108107
tasksToDeploy := make(chan *ResourceOffersDeploymentRequest, MAX_CONCURRENT_DEPLOY_REQUESTS)
109108

109+
reviveOffersTrg := make(chan struct{})
110+
110111
state := &schedulerState{
111112
taskman: taskman,
112113
fidStore: fidStore,
@@ -116,8 +117,7 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
116117
viper.GetDuration("mesosReviveWait"),
117118
nil),
118119
tasksToDeploy: tasksToDeploy,
119-
reviveOffersTrg: make(chan struct{}),
120-
reviveOffersDone: make(chan struct{}),
120+
reviveOffersTrg: reviveOffersTrg,
121121
wantsTaskResources: mesos.Resources{},
122122
executor: executorInfo,
123123
metricsAPI: metricsAPI,

0 commit comments

Comments
 (0)