Skip to content

Commit 3da3af0

Browse files
committed
[executor] Add executorId in critical log messages
1 parent 0a5e009 commit 3da3af0

4 files changed

Lines changed: 26 additions & 11 deletions

File tree

core/task/scheduler.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -627,20 +627,20 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
627627
targetExecutorId.Value = uid.New().String()
628628
log.WithField("executorId", targetExecutorId.Value).
629629
WithField("offerHost", offer.GetHostname()).
630-
WithField("level", infologger.IL_Devel).
630+
WithField("level", infologger.IL_Support).
631631
Info("received offer without executor ID, will start new executor if accepted")
632632
} else {
633633
targetExecutorId.Value = offer.ExecutorIDs[0].Value
634634
if len(offer.ExecutorIDs) == 1 {
635635
log.WithField("executorId", targetExecutorId.Value).
636636
WithField("offerHost", offer.GetHostname()).
637-
WithField("level", infologger.IL_Devel).
637+
WithField("level", infologger.IL_Support).
638638
Warn("received offer with one executor ID, will use existing executor")
639639
} else if len(offer.ExecutorIDs) > 1 {
640640
log.WithField("executorId", targetExecutorId.Value).
641641
WithField("executorIds", offer.ExecutorIDs).
642642
WithField("offerHost", offer.GetHostname()).
643-
WithField("level", infologger.IL_Devel).
643+
WithField("level", infologger.IL_Support).
644644
Warn("received offer with more than one executor ID, will use first one")
645645
}
646646
}
@@ -965,6 +965,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
965965
WithField("detector", detector).
966966
WithField("level", infologger.IL_Support).
967967
WithField("offerHost", offer.Hostname).
968+
WithField("executorId", targetExecutorId.Value).
968969
Infof("launch request sent to %s: %d tasks", offer.Hostname, n)
969970
for _, taskInfo := range taskInfosToLaunchForCurrentOffer {
970971
log.WithPrefix("scheduler").

executor/executable/task.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,10 @@ func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceE
107107
log.WithField("level", infologger.IL_Support).
108108
WithField("partition", envId.String()).
109109
WithField("detector", detector).
110-
WithField("level", infologger.IL_Devel).
111-
Infof("launching task %s: %s", taskInfo.TaskID.GetValue(), rawCommand)
110+
Infof("launching task %s on executorId %s: %s",
111+
taskInfo.TaskID.GetValue(),
112+
taskInfo.GetExecutor().GetExecutorID().Value,
113+
rawCommand)
112114
} else {
113115
if err != nil {
114116
log.WithError(err).

executor/executor.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -168,43 +168,53 @@ func Run(cfg config.Config) {
168168
// tasks and updates are empty.
169169
subscribe := calls.Subscribe(unacknowledgedTasks(state), unacknowledgedUpdates(state))
170170

171-
log.Debug("subscribing to agent for events")
171+
log.WithField("executorId", state.executor.GetExecutorID().Value).
172+
Debug("subscribing to agent for events")
172173
// ↓ empty context ↓ we get a plain RequestFunc from the executor.Call value
173174
resp, err := subscriber.Send(context.TODO(), calls.NonStreaming(subscribe))
174175
if resp != nil {
175176
defer resp.Close()
176177
}
177178
if err == nil {
178179
log.WithField("level", infologger.IL_Support).
180+
WithField("executorId", state.executor.GetExecutorID().Value).
179181
Info("executor subscribed, ready to receive events")
180182
// We're officially connected, start decoding events
181183
err = eventLoop(state, resp, handler)
182184
// If we're out of the eventLoop, means a disconnect happened, willingly or not.
183185
disconnected = time.Now()
184-
log.Debug("event loop finished")
186+
log.WithField("level", infologger.IL_Support).
187+
WithField("executorId", state.executor.GetExecutorID().Value).
188+
Info("event loop finished")
185189
}
186190
if err != nil && err != io.EOF {
187191
log.WithField("error", err).
192+
WithField("executorId", state.executor.GetExecutorID().Value).
188193
Error("executor disconnected with error")
189194
} else {
190-
log.Info("executor disconnected")
195+
log.WithField("executorId", state.executor.GetExecutorID().Value).
196+
Info("executor disconnected")
191197
}
192198
}()
193199
if state.shouldQuit {
194-
log.Info("gracefully shutting down on request")
200+
log.WithField("executorId", state.executor.GetExecutorID().Value).
201+
Info("gracefully shutting down on request")
195202
return
196203
}
197204
// The purpose of checkpointing is to handle recovery when mesos-agent exits.
198205
if !cfg.Checkpoint {
199-
log.Info("gracefully exiting because framework checkpointing is not enabled")
206+
log.WithField("executorId", state.executor.GetExecutorID().Value).
207+
Info("gracefully exiting because framework checkpointing is not enabled")
200208
return
201209
}
202210
if time.Now().Sub(disconnected) > cfg.RecoveryTimeout {
203211
log.WithField("timeout", cfg.RecoveryTimeout).
212+
WithField("executorId", state.executor.GetExecutorID().Value).
204213
Error("failed to re-establish subscription with agent within recovery timeout, aborting")
205214
return
206215
}
207-
log.Debug("waiting for reconnect timeout")
216+
log.WithField("executorId", state.executor.GetExecutorID().Value).
217+
Debug("waiting for reconnect timeout")
208218
<-shouldReconnect // wait for some amount of time before retrying subscription
209219
}
210220
}

executor/handlers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func handleMessageEvent(state *internalState, data []byte) (err error) {
8080
"message": string(data[:]),
8181
"error": err.Error(),
8282
}).
83+
WithField("executorId", state.executor.GetExecutorID().Value).
8384
Error("no task for incoming MESSAGE")
8485
return
8586
}
@@ -168,6 +169,7 @@ func handleMessageEvent(state *internalState, data []byte) (err error) {
168169
"message": string(data[:]),
169170
"error": err.Error(),
170171
}).
172+
WithField("executorId", state.executor.GetExecutorID().Value).
171173
Error("no task for incoming MESSAGE")
172174
return
173175
}

0 commit comments

Comments
 (0)