Skip to content

Commit 87a8a6a

Browse files
committed
[core] Write to Kafka asynchronously
1 parent 377c7df commit 87a8a6a

1 file changed

Lines changed: 59 additions & 50 deletions

File tree

common/event/writer.go

Lines changed: 59 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -61,58 +61,67 @@ func (w *Writer) WriteEvent(e interface{}) {
6161
}
6262

6363
func (w *Writer) WriteEventWithTimestamp(e interface{}, timestamp time.Time) {
64-
var err error
65-
switch e := e.(type) {
66-
case *pb.Ev_MetaEvent_CoreStart:
67-
err = w.doWriteEvent(&pb.Event{
68-
Timestamp: timestamp.UnixMilli(),
69-
Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e},
70-
})
71-
case *pb.Ev_MetaEvent_MesosHeartbeat:
72-
err = w.doWriteEvent(&pb.Event{
73-
Timestamp: timestamp.UnixMilli(),
74-
Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e},
75-
})
76-
case *pb.Ev_MetaEvent_FrameworkEvent:
77-
err = w.doWriteEvent(&pb.Event{
78-
Timestamp: timestamp.UnixMilli(),
79-
Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e},
80-
})
81-
case *pb.Ev_TaskEvent:
82-
err = w.doWriteEvent(&pb.Event{
83-
Timestamp: timestamp.UnixMilli(),
84-
Payload: &pb.Event_TaskEvent{TaskEvent: e},
85-
})
86-
case *pb.Ev_RoleEvent:
87-
err = w.doWriteEvent(&pb.Event{
88-
Timestamp: timestamp.UnixMilli(),
89-
Payload: &pb.Event_RoleEvent{RoleEvent: e},
90-
})
91-
case *pb.Ev_EnvironmentEvent:
92-
err = w.doWriteEvent(&pb.Event{
93-
Timestamp: timestamp.UnixMilli(),
94-
Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e},
95-
})
96-
case *pb.Ev_CallEvent:
97-
err = w.doWriteEvent(&pb.Event{
98-
Timestamp: timestamp.UnixMilli(),
99-
Payload: &pb.Event_CallEvent{CallEvent: e},
100-
})
101-
case *pb.Ev_IntegratedServiceEvent:
102-
err = w.doWriteEvent(&pb.Event{
103-
Timestamp: timestamp.UnixMilli(),
104-
Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e},
105-
})
64+
go func() {
65+
var (
66+
err error
67+
wrappedEvent *pb.Event
68+
)
10669

107-
default:
108-
err = fmt.Errorf("unsupported event type")
109-
}
110-
if err != nil {
70+
switch e := e.(type) {
71+
case *pb.Ev_MetaEvent_CoreStart:
72+
wrappedEvent = &pb.Event{
73+
Timestamp: timestamp.UnixMilli(),
74+
Payload: &pb.Event_CoreStartEvent{CoreStartEvent: e},
75+
}
76+
case *pb.Ev_MetaEvent_MesosHeartbeat:
77+
wrappedEvent = &pb.Event{
78+
Timestamp: timestamp.UnixMilli(),
79+
Payload: &pb.Event_MesosHeartbeatEvent{MesosHeartbeatEvent: e},
80+
}
81+
case *pb.Ev_MetaEvent_FrameworkEvent:
82+
wrappedEvent = &pb.Event{
83+
Timestamp: timestamp.UnixMilli(),
84+
Payload: &pb.Event_FrameworkEvent{FrameworkEvent: e},
85+
}
86+
case *pb.Ev_TaskEvent:
87+
wrappedEvent = &pb.Event{
88+
Timestamp: timestamp.UnixMilli(),
89+
Payload: &pb.Event_TaskEvent{TaskEvent: e},
90+
}
91+
case *pb.Ev_RoleEvent:
92+
wrappedEvent = &pb.Event{
93+
Timestamp: timestamp.UnixMilli(),
94+
Payload: &pb.Event_RoleEvent{RoleEvent: e},
95+
}
96+
case *pb.Ev_EnvironmentEvent:
97+
wrappedEvent = &pb.Event{
98+
Timestamp: timestamp.UnixMilli(),
99+
Payload: &pb.Event_EnvironmentEvent{EnvironmentEvent: e},
100+
}
101+
case *pb.Ev_CallEvent:
102+
wrappedEvent = &pb.Event{
103+
Timestamp: timestamp.UnixMilli(),
104+
Payload: &pb.Event_CallEvent{CallEvent: e},
105+
}
106+
case *pb.Ev_IntegratedServiceEvent:
107+
wrappedEvent = &pb.Event{
108+
Timestamp: timestamp.UnixMilli(),
109+
Payload: &pb.Event_IntegratedServiceEvent{IntegratedServiceEvent: e},
110+
}
111+
}
111112

112-
log.WithField("event", e).
113-
WithField("level", infologger.IL_Support).
114-
Error(err.Error())
115-
}
113+
if wrappedEvent == nil {
114+
err = fmt.Errorf("unsupported event type")
115+
} else {
116+
err = w.doWriteEvent(wrappedEvent)
117+
}
118+
119+
if err != nil {
120+
log.WithField("event", e).
121+
WithField("level", infologger.IL_Support).
122+
Error(err.Error())
123+
}
124+
}()
116125
}
117126

118127
func (w *Writer) doWriteEvent(e *pb.Event) error {

0 commit comments

Comments
 (0)