Skip to content

Commit 67cda98

Browse files
committed
[core] Emit run SOSOR/EOSOR/SOEOR/EOEOR events to aliecs.run topic
1 parent c32f70b commit 67cda98

9 files changed

Lines changed: 339 additions & 88 deletions

File tree

coconut/protos/o2control.pb.go

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

coconut/protos/o2control_grpc.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/event/writer.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
132132
Timestamp: timestamp.UnixMilli(),
133133
Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e},
134134
}
135+
case *pb.Ev_RunEvent:
136+
wrappedEvent = &pb.Event{
137+
Timestamp: timestamp.UnixMilli(),
138+
Payload: &pb.Event_RunEvent{RunEvent: e},
139+
}
135140
}
136141

137142
if wrappedEvent == nil {

common/protos/events.pb.go

Lines changed: 225 additions & 74 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/protos/events.proto

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,17 +112,28 @@ message Ev_IntegratedServiceEvent {
112112
string payload = 8; // any additional payload, depending on the integrated service; there is no schema, it can even be the raw return structure of a remote API call
113113
}
114114

115+
message Ev_RunEvent {
116+
string environmentId = 1;
117+
uint32 runNumber = 2;
118+
string state = 3;
119+
string error = 4;
120+
string transition = 5;
121+
OpStatus transitionStatus = 6;
122+
map<string, string> vars = 7;
123+
}
124+
115125
message Event {
116126
int64 timestamp = 1;
117127
reserved 2 to 10;
118-
reserved 16 to 100;
128+
reserved 17 to 100;
119129

120130
oneof Payload {
121131
Ev_EnvironmentEvent environmentEvent = 11;
122132
Ev_TaskEvent taskEvent = 12;
123133
Ev_RoleEvent roleEvent = 13;
124134
Ev_CallEvent callEvent = 14;
125135
Ev_IntegratedServiceEvent integratedServiceEvent = 15;
136+
Ev_RunEvent runEvent = 16;
126137

127138
Ev_MetaEvent_FrameworkEvent frameworkEvent = 101;
128139
Ev_MetaEvent_MesosHeartbeat mesosHeartbeatEvent = 102;

core/environment/environment.go

Lines changed: 86 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,20 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
186186
env.workflow.GetVars().Set("run_number", rnString)
187187
env.workflow.GetVars().Set("runNumber", rnString)
188188

189-
runStartTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
190-
env.workflow.SetRuntimeVar("run_start_time_ms", runStartTime)
191-
env.workflow.SetRuntimeVar("run_end_time_ms", "") // we delete previous EOR
189+
runStartTime := time.Now()
190+
runStartTimeS := strconv.FormatInt(runStartTime.UnixMilli(), 10)
191+
env.workflow.SetRuntimeVar("run_start_time_ms", runStartTimeS)
192+
env.workflow.SetRuntimeVar("run_end_time_ms", "") // we delete previous SOEOR
193+
194+
the.EventWriterWithTopic(topic.Run).WriteEventWithTimestamp(&pb.Ev_RunEvent{
195+
EnvironmentId: envId.String(),
196+
RunNumber: runNumber,
197+
State: env.Sm.Current(),
198+
Error: "",
199+
Transition: e.Event,
200+
TransitionStatus: pb.OpStatus_STARTED,
201+
Vars: nil,
202+
}, runStartTime)
192203

193204
cleanupCount := 0
194205
cleanupCountS, ok := env.GlobalVars.Get("__fmq_cleanup_count")
@@ -216,17 +227,43 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
216227
} else if e.Event == "STOP_ACTIVITY" {
217228
endTime, ok := env.workflow.GetUserVars().Get("run_end_time_ms")
218229
if ok && endTime == "" {
219-
runEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
220-
env.workflow.SetRuntimeVar("run_end_time_ms", runEndTime)
230+
runEndTime := time.Now()
231+
runEndTimeS := strconv.FormatInt(runEndTime.UnixMilli(), 10)
232+
env.workflow.SetRuntimeVar("run_end_time_ms", runEndTimeS)
233+
env.workflow.SetRuntimeVar("run_start_time_ms", "") // we delete previous SOSOR
234+
235+
the.EventWriterWithTopic(topic.Run).WriteEventWithTimestamp(&pb.Ev_RunEvent{
236+
EnvironmentId: envId.String(),
237+
RunNumber: env.GetCurrentRunNumber(),
238+
State: env.Sm.Current(),
239+
Error: "",
240+
Transition: e.Event,
241+
TransitionStatus: pb.OpStatus_STARTED,
242+
Vars: nil,
243+
}, runEndTime)
244+
221245
} else {
222246
log.WithField("partition", envId.String()).
223247
Debug("O2 End time already set before before_STOP_ACTIVITY")
224248
}
225249
} else if e.Event == "GO_ERROR" {
226250
endTime, ok := env.workflow.GetUserVars().Get("run_end_time_ms")
227251
if ok && endTime == "" {
228-
runEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
229-
env.workflow.SetRuntimeVar("run_end_time_ms", runEndTime)
252+
runEndTime := time.Now()
253+
runEndTimeS := strconv.FormatInt(runEndTime.UnixMilli(), 10)
254+
env.workflow.SetRuntimeVar("run_end_time_ms", runEndTimeS)
255+
env.workflow.SetRuntimeVar("run_start_time_ms", "") // we delete previous SOSOR
256+
257+
the.EventWriterWithTopic(topic.Run).WriteEventWithTimestamp(&pb.Ev_RunEvent{
258+
EnvironmentId: envId.String(),
259+
RunNumber: env.GetCurrentRunNumber(),
260+
State: env.Sm.Current(),
261+
Error: "",
262+
Transition: e.Event,
263+
TransitionStatus: pb.OpStatus_STARTED,
264+
Vars: nil,
265+
}, runEndTime)
266+
230267
} else {
231268
log.WithField("partition", envId.String()).
232269
Debug("O2 End time already set before before_GO_ERROR")
@@ -439,7 +476,49 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
439476
Infof("auto stop transition scheduled, expected execution at %s", expected)
440477
}
441478

479+
runStartCompletionTime := time.Now()
480+
runStartCompletionTimeS := strconv.FormatInt(runStartCompletionTime.UnixMilli(), 10)
481+
env.workflow.SetRuntimeVar("run_start_completion_time_ms", runStartCompletionTimeS)
482+
env.workflow.SetRuntimeVar("run_end_completion_time_ms", "") // we delete previous EOEOR
483+
484+
runEvent := &pb.Ev_RunEvent{
485+
EnvironmentId: envId.String(),
486+
RunNumber: env.GetCurrentRunNumber(),
487+
State: env.Sm.Current(),
488+
Error: "",
489+
Transition: e.Event,
490+
TransitionStatus: pb.OpStatus_DONE_OK,
491+
Vars: nil,
492+
}
493+
if e.Err != nil {
494+
runEvent.Error = e.Err.Error()
495+
runEvent.TransitionStatus = pb.OpStatus_DONE_ERROR
496+
}
497+
498+
the.EventWriterWithTopic(topic.Run).WriteEventWithTimestamp(runEvent, runStartCompletionTime)
499+
442500
} else if e.Event == "STOP_ACTIVITY" {
501+
runEndCompletionTime := time.Now()
502+
runEndCompletionTimeS := strconv.FormatInt(runEndCompletionTime.UnixMilli(), 10)
503+
env.workflow.SetRuntimeVar("run_end_completion_time_ms", runEndCompletionTimeS)
504+
env.workflow.SetRuntimeVar("run_start_completion_time_ms", "") // we delete previous EOSOR
505+
506+
runEvent := &pb.Ev_RunEvent{
507+
EnvironmentId: envId.String(),
508+
RunNumber: env.GetCurrentRunNumber(),
509+
State: env.Sm.Current(),
510+
Error: "",
511+
Transition: e.Event,
512+
TransitionStatus: pb.OpStatus_DONE_OK,
513+
Vars: nil,
514+
}
515+
if e.Err != nil {
516+
runEvent.Error = e.Err.Error()
517+
runEvent.TransitionStatus = pb.OpStatus_DONE_ERROR
518+
}
519+
520+
the.EventWriterWithTopic(topic.Run).WriteEventWithTimestamp(runEvent, runEndCompletionTime)
521+
443522
// If the event is STOP_ACTIVITY, we remove the active run number after all hooks are done.
444523
env.workflow.GetVars().Set("last_run_number", strconv.Itoa(int(env.currentRunNumber)))
445524
env.currentRunNumber = 0

core/environment/transition_configure.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ package environment
2626

2727
import (
2828
"errors"
29+
2930
"github.com/AliceO2Group/Control/core/workflow"
3031

3132
"github.com/AliceO2Group/Control/common/event"
@@ -36,7 +37,7 @@ import (
3637
func NewConfigureTransition(taskman *task.Manager) Transition {
3738
return &ConfigureTransition{
3839
baseTransition: baseTransition{
39-
name: "CONFIGURE",
40+
name: "CONFIGURE",
4041
taskman: taskman,
4142
},
4243
}
@@ -62,7 +63,7 @@ func (t ConfigureTransition) do(env *Environment) (err error) {
6263
}
6364
incomingEv := <-env.stateChangedCh
6465
// If some tasks failed to transition
65-
if tasksStateErrors := incomingEv.GetTasksStateChangedError(); tasksStateErrors != nil {
66+
if tasksStateErrors := incomingEv.GetTasksStateChangedError(); tasksStateErrors != nil {
6667
return tasksStateErrors
6768
}
6869

core/protos/o2control.pb.go

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/protos/o2control_grpc.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)