Skip to content

Commit da5e0b2

Browse files
committed
New LHC plugin to follow LHC updates available in Kafka
The updates come from ALICE LHC DIP client. Work towards OCTRL-1049. Eventually, this plugin will also allow to get rid of BKP.RetrieveFillInfo call and consume Fill information without going through BKP.
1 parent a32256c commit da5e0b2

5 files changed

Lines changed: 216 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ There are two ways of interacting with AliECS:
113113
* [ECS2DCS2ECS mock server](/core/integration/README.md#ecs2dcs2ecs-mock-server)
114114
* [DD Scheduler](/core/integration/README.md#dd-scheduler)
115115
* [Kafka (legacy)](/core/integration/README.md#kafka-legacy)
116+
* [LHC](/core/integration/README.md)
116117
* [ODC](/core/integration/README.md#odc)
117118
* [Test plugin](/core/integration/README.md#test-plugin)
118119
* [Trigger](/core/integration/README.md#trigger)

cmd/o2-aliecs-core/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/AliceO2Group/Control/core/integration/dcs"
3636
"github.com/AliceO2Group/Control/core/integration/ddsched"
3737
"github.com/AliceO2Group/Control/core/integration/kafka"
38+
"github.com/AliceO2Group/Control/core/integration/lhc"
3839
"github.com/AliceO2Group/Control/core/integration/odc"
3940
"github.com/AliceO2Group/Control/core/integration/testplugin"
4041
"github.com/AliceO2Group/Control/core/integration/trg"
@@ -64,6 +65,10 @@ func init() {
6465
"kafka",
6566
"kafkaEndpoint",
6667
kafka.NewPlugin)
68+
integration.RegisterPlugin(
69+
"lhc",
70+
"kafkaEndpoints",
71+
lhc.NewPlugin)
6772
integration.RegisterPlugin(
6873
"odc",
6974
"odcEndpoint",

core/integration/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ DD scheduler plugin informs the Data Distribution software about the pool of FLP
177177

178178
See [Legacy events: Kafka plugin](/docs/kafka.md#legacy-events-kafka-plugin)
179179

180+
# LHC plugin
181+
182+
This plugin listens to Kafka messages coming from the LHC DIP Client and pushes any relevant internal notifications to the AliECS core.
183+
Its main purpose is to provide basic information about ongoing LHC activity (e.g. fill information) to affected parties and allow AliECS to react upon them (e.g. by automatically stopping a physics run when stable beams are over).
184+
180185
## ODC
181186

182187
ODC plugin communicates with the [Online Device Control (ODC)](https://github.com/FairRootGroup/ODC) instance of the ALICE experiment, which controls the event processing farm used in data taking and offline processing.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// filepath: /home/pkonopka/alice/Control/core/integration/lhc/event/lhcevent.go
2+
/*
3+
* === This file is part of ALICE O² ===
4+
*
5+
* LHC integrated service event definitions used by the LHC integration plugin.
6+
*/
7+
8+
package event
9+
10+
import (
11+
"github.com/AliceO2Group/Control/common/event"
12+
commonpb "github.com/AliceO2Group/Control/common/protos"
13+
)
14+
15+
// BeamInfo mirrors (a subset of) the information described in the proto draft.
16+
type BeamInfo struct {
17+
StableBeamsStart int64 `json:"stableBeamsStart,omitempty"`
18+
StableBeamsEnd int64 `json:"stableBeamsEnd,omitempty"`
19+
FillNumber int32 `json:"fillNumber,omitempty"`
20+
FillingSchemeName string `json:"fillingSchemeName,omitempty"`
21+
BeamType string `json:"beamType,omitempty"`
22+
BeamMode commonpb.BeamMode `json:"beamMode,omitempty"`
23+
}
24+
25+
type LhcStateChangeEvent struct {
26+
event.IntegratedServiceEventBase
27+
BeamInfo BeamInfo
28+
}
29+
30+
func (e *LhcStateChangeEvent) GetName() string {
31+
return "LHC_STATE_CHANGE_EVENT"
32+
}
33+
34+
func (e *LhcStateChangeEvent) GetBeamInfo() BeamInfo {
35+
if e == nil {
36+
return BeamInfo{}
37+
}
38+
return e.BeamInfo
39+
}

core/integration/lhc/plugin.go

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* LHC integration plugin: listens for BeamMode updates and notifies
5+
* the environment manager via NotifyIntegratedServiceEvent when BeamMode
6+
* (BeamMode enum change) occurs. Supports a Kafka-based client (kafka://)
7+
* or a local stub client for testing.
8+
*/
9+
10+
package lhc
11+
12+
import (
13+
"context"
14+
"encoding/json"
15+
"errors"
16+
"github.com/AliceO2Group/Control/common/event/topic"
17+
"github.com/AliceO2Group/Control/common/logger/infologger"
18+
pb "github.com/AliceO2Group/Control/common/protos"
19+
"io"
20+
"sync"
21+
"time"
22+
23+
cmnevent "github.com/AliceO2Group/Control/common/event"
24+
"github.com/AliceO2Group/Control/common/logger"
25+
"github.com/AliceO2Group/Control/common/utils/uid"
26+
"github.com/AliceO2Group/Control/core/environment"
27+
"github.com/AliceO2Group/Control/core/integration"
28+
lhcevent "github.com/AliceO2Group/Control/core/integration/lhc/event"
29+
"github.com/sirupsen/logrus"
30+
"github.com/spf13/viper"
31+
)
32+
33+
var log = logger.New(logrus.StandardLogger(), "lhcclient")
34+
var dipClientTopic topic.Topic = "todo"
35+
36+
// Plugin implements integration.Plugin and listens for LHC updates.
37+
type Plugin struct {
38+
endpoint string
39+
ctx context.Context
40+
//cancel context.CancelFunc
41+
//wg sync.WaitGroup
42+
mu sync.Mutex
43+
currentState *pb.BeamInfo
44+
reader cmnevent.Reader
45+
}
46+
47+
func NewPlugin(endpoint string) integration.Plugin {
48+
49+
return &Plugin{endpoint: endpoint, mu: sync.Mutex{}, currentState: &pb.BeamInfo{BeamMode: pb.BeamMode_UNKNOWN}}
50+
}
51+
52+
func (p *Plugin) Init(_ string) error {
53+
54+
// use a background context for reader loop; Destroy will Close the reader
55+
p.ctx = context.Background()
56+
57+
p.reader = cmnevent.NewReaderWithTopic(dipClientTopic, "", true)
58+
59+
if p.reader == nil {
60+
return errors.New("could not create a kafka reader for LHC plugin")
61+
}
62+
go p.readAndInjectLhcUpdates()
63+
64+
log.Debug("LHC plugin initialized (client started)")
65+
return nil
66+
}
67+
68+
func (p *Plugin) GetName() string { return "lhc" }
69+
func (p *Plugin) GetPrettyName() string { return "LHC (DIP/Kafka client)" }
70+
func (p *Plugin) GetEndpoint() string { return viper.GetString("kafkaEndpoints") }
71+
72+
func (p *Plugin) GetConnectionState() string {
73+
if p == nil || p.reader == nil {
74+
return "UNKNOWN"
75+
}
76+
return "READY" // Unfortunately, kafka.Reader does not provide any GetStatus method
77+
}
78+
79+
func (p *Plugin) GetData(_ []any) string {
80+
p.mu.Lock()
81+
defer p.mu.Unlock()
82+
b, _ := json.Marshal(&p.currentState) // todo see if this works like that
83+
return string(b)
84+
}
85+
86+
func (p *Plugin) GetEnvironmentsData(envIds []uid.ID) map[uid.ID]string {
87+
// there is nothing sensible we could provide here, LHC client is not environment-specific
88+
return nil
89+
}
90+
91+
func (p *Plugin) GetEnvironmentsShortData(envIds []uid.ID) map[uid.ID]string {
92+
return p.GetEnvironmentsData(envIds)
93+
}
94+
95+
func (p *Plugin) ObjectStack(_ map[string]string, _ map[string]string) (stack map[string]interface{}) {
96+
return make(map[string]interface{})
97+
}
98+
func (p *Plugin) CallStack(_ interface{}) (stack map[string]interface{}) {
99+
return make(map[string]interface{})
100+
}
101+
102+
func (p *Plugin) Destroy() error {
103+
// todo add mutex?
104+
if p == nil {
105+
return nil
106+
}
107+
108+
if p.reader != nil {
109+
err := p.reader.Close()
110+
if err != nil {
111+
return err
112+
}
113+
}
114+
return nil
115+
}
116+
117+
func (p *Plugin) readAndInjectLhcUpdates() {
118+
for {
119+
msg, err := p.reader.Next(p.ctx)
120+
if errors.Is(err, io.EOF) {
121+
// todo log healthy end of stream
122+
log.WithField(infologger.Level, infologger.IL_Support).
123+
Debug("received an EOF from Kafka reader, likely cancellation was requested, breaking")
124+
break
125+
}
126+
if err != nil {
127+
log.WithField(infologger.Level, infologger.IL_Support).
128+
WithError(err).Error("error while reading from Kafka")
129+
// in case of errors, we throttle the loop to mitigate the risk a log spam if error persists
130+
time.Sleep(time.Second * 1)
131+
continue
132+
}
133+
if msg == nil {
134+
log.WithField(infologger.Level, infologger.IL_Devel).
135+
Warn("received an empty message with no error. it's unexpected, but continuing")
136+
continue
137+
}
138+
139+
if bmEvt := msg.GetBeamModeEvent(); bmEvt != nil && bmEvt.GetBeamInfo() != nil {
140+
beamInfo := bmEvt.GetBeamInfo()
141+
// update plugin state
142+
p.mu.Lock()
143+
p.currentState = beamInfo
144+
p.mu.Unlock()
145+
146+
// convert to internal LHC event and notify environment manager
147+
go func(beamInfo *pb.BeamInfo) {
148+
envMan := environment.ManagerInstance()
149+
150+
ev := &lhcevent.LhcStateChangeEvent{
151+
IntegratedServiceEventBase: cmnevent.IntegratedServiceEventBase{ServiceName: "LHC"},
152+
BeamInfo: lhcevent.BeamInfo{
153+
BeamMode: beamInfo.GetBeamMode(),
154+
StableBeamsStart: beamInfo.GetStableBeamsStart(),
155+
StableBeamsEnd: beamInfo.GetStableBeamsEnd(),
156+
FillNumber: beamInfo.GetFillNumber(),
157+
FillingSchemeName: beamInfo.GetFillingSchemeName(),
158+
BeamType: beamInfo.GetBeamType(),
159+
},
160+
}
161+
envMan.NotifyIntegratedServiceEvent(ev)
162+
}(beamInfo)
163+
continue
164+
}
165+
}
166+
}

0 commit comments

Comments
 (0)