Skip to content

Commit 8989162

Browse files
justonedev1knopers8
authored andcommitted
creating copy of ExecutorInfo for each mesos offer so there is not race condition
1 parent a5b0ccf commit 8989162

2 files changed

Lines changed: 18 additions & 5 deletions

File tree

core/task/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1526,7 +1526,7 @@ func makeTaskForMesosResources(
15261526

15271527
newTaskId := taskPtr.GetTaskId()
15281528

1529-
executor := state.executor
1529+
executor := state.CopyExecutor()
15301530
executor.ExecutorID.Value = taskPtr.GetExecutorId()
15311531
envIdS := envId.String()
15321532

core/task/schedulerstate.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
9494
viper.GetFloat64("executorMemory")),
9595
viper.GetDuration("mesosJobRestartDelay"),
9696
)
97-
9897
if err != nil {
9998
return nil, err
10099
}
@@ -161,11 +160,10 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
161160
},
162161
"leave_CONNECTED": func(_ context.Context, e *fsm.Event) {
163162
log.Debug("leave_CONNECTED")
164-
165163
},
166164
"before_NEW_ENVIRONMENT": func(_ context.Context, e *fsm.Event) {
167165
log.Debug("before_NEW_ENVIRONMENT")
168-
e.Async() //transition frozen until the corresponding fsm.Transition call
166+
e.Async() // transition frozen until the corresponding fsm.Transition call
169167
},
170168
"enter_CONNECTED": func(_ context.Context, e *fsm.Event) {
171169
log.Debug("enter_CONNECTED")
@@ -208,10 +206,25 @@ func (state *schedulerState) Start(ctx context.Context) {
208206
if state.err != nil {
209207
err = state.err
210208
log.WithField("error", err.Error()).Debug("scheduler quit with error, main state machine GO_ERROR")
211-
state.sm.Event(context.Background(), "GO_ERROR", err) //TODO: use error information in GO_ERROR
209+
state.sm.Event(context.Background(), "GO_ERROR", err) // TODO: use error information in GO_ERROR
212210
} else {
213211
log.Debug("scheduler quit, no errors")
214212
state.sm.Event(context.Background(), "EXIT")
215213
}
216214
}()
217215
}
216+
217+
func (state *schedulerState) CopyExecutor() *mesos.ExecutorInfo {
218+
executorInfoCopy := &mesos.ExecutorInfo{}
219+
220+
marshaled, err := state.executor.Marshal()
221+
if err != nil {
222+
return nil
223+
}
224+
225+
err = executorInfoCopy.Unmarshal(marshaled)
226+
if err != nil {
227+
return nil
228+
}
229+
return executorInfoCopy
230+
}

0 commit comments

Comments
 (0)