Skip to content

Commit 7d484d7

Browse files
committed
[core] DCS client now uses operation availability values from DCS
1 parent 4ae0552 commit 7d484d7

2 files changed

Lines changed: 86 additions & 31 deletions

File tree

core/integration/dcs/plugin.go

Lines changed: 76 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,15 @@ type Plugin struct {
6363
dcsClient *RpcClient
6464
pendingEORs map[string] /*envId*/ int64
6565

66-
detectorMatrix []*dcspb.DetectorInfo
67-
detectorMatrixMu sync.RWMutex
66+
detectorMap DCSDetectorInfoMap
67+
detectorMapMu sync.RWMutex
6868
}
6969

7070
type DCSDetectors []dcspb.Detector
7171

72-
type DCSDetectorStatesMap map[dcspb.Detector]dcspb.DetectorState
72+
type DCSDetectorOpAvailabilityMap map[dcspb.Detector]dcspb.DetectorState
73+
74+
type DCSDetectorInfoMap map[dcspb.Detector]*dcspb.DetectorInfo
7375

7476
func NewPlugin(endpoint string) integration.Plugin {
7577
u, err := url.Parse(endpoint)
@@ -83,11 +85,11 @@ func NewPlugin(endpoint string) integration.Plugin {
8385
portNumber, _ := strconv.Atoi(u.Port())
8486

8587
newPlugin := &Plugin{
86-
dcsHost: u.Hostname(),
87-
dcsPort: portNumber,
88-
dcsClient: nil,
89-
pendingEORs: make(map[string]int64),
90-
detectorMatrix: make([]*dcspb.DetectorInfo, 0),
88+
dcsHost: u.Hostname(),
89+
dcsPort: portNumber,
90+
dcsClient: nil,
91+
pendingEORs: make(map[string]int64),
92+
detectorMap: make(DCSDetectorInfoMap),
9193
}
9294
return newPlugin
9395
}
@@ -121,9 +123,9 @@ func (p *Plugin) GetData(_ []any) string {
121123
outMap := make(map[string]interface{})
122124
outMap["partitions"] = p.partitionStatesForEnvs(environmentIds)
123125

124-
p.detectorMatrixMu.RLock()
125-
outMap["detectors"] = p.detectorMatrix
126-
p.detectorMatrixMu.RUnlock()
126+
p.detectorMapMu.RLock()
127+
outMap["detectors"] = p.detectorMap.ToEcsDetectors()
128+
p.detectorMapMu.RUnlock()
127129

128130
out, err := json.Marshal(outMap)
129131
if err != nil {
@@ -155,6 +157,37 @@ func (p *Plugin) partitionStatesForEnvs(envIds []uid.ID) map[uid.ID]string {
155157
return out
156158
}
157159

160+
func (p *Plugin) updateLastKnownDetectorStates(detectorMatrix []*dcspb.DetectorInfo) {
161+
p.detectorMapMu.Lock()
162+
defer p.detectorMapMu.Unlock()
163+
164+
for _, detInfo := range detectorMatrix {
165+
dcsDet := detInfo.GetDetector()
166+
if _, ok := p.detectorMap[dcsDet]; !ok {
167+
p.detectorMap[dcsDet] = detInfo
168+
} else {
169+
if detInfo.State != dcspb.DetectorState_NULL_STATE {
170+
p.detectorMap[dcsDet].State = detInfo.State
171+
}
172+
}
173+
}
174+
}
175+
176+
func (p *Plugin) updateDetectorOpAvailabilities(detectorMatrix []*dcspb.DetectorInfo) {
177+
p.detectorMapMu.Lock()
178+
defer p.detectorMapMu.Unlock()
179+
180+
for _, detInfo := range detectorMatrix {
181+
dcsDet := detInfo.GetDetector()
182+
if _, ok := p.detectorMap[dcsDet]; !ok {
183+
p.detectorMap[dcsDet] = detInfo
184+
} else {
185+
p.detectorMap[dcsDet].PfrAvailability = detInfo.PfrAvailability
186+
p.detectorMap[dcsDet].SorAvailability = detInfo.SorAvailability
187+
}
188+
}
189+
}
190+
158191
func (p *Plugin) Init(instanceId string) error {
159192
if p.dcsClient == nil {
160193
cxt, cancel := context.WithCancel(context.Background())
@@ -192,19 +225,15 @@ func (p *Plugin) Init(instanceId string) error {
192225
if ev != nil && ev.Eventtype == dcspb.EventType_HEARTBEAT {
193226
log.Trace("received DCS heartbeat event")
194227
if dm := ev.GetDetectorMatrix(); len(dm) > 0 {
195-
p.detectorMatrixMu.Lock()
196-
p.detectorMatrix = dm
197-
p.detectorMatrixMu.Unlock()
228+
p.updateDetectorOpAvailabilities(dm)
198229
}
199230
continue
200231
}
201232

202233
if ev != nil && ev.Eventtype == dcspb.EventType_STATE_CHANGE_EVENT {
203234
log.Trace("received DCS state change event")
204235
if dm := ev.GetDetectorMatrix(); len(dm) > 0 {
205-
p.detectorMatrixMu.Lock()
206-
p.detectorMatrix = dm
207-
p.detectorMatrixMu.Unlock()
236+
p.updateLastKnownDetectorStates(dm)
208237
}
209238
continue
210239
}
@@ -242,26 +271,44 @@ func (p *Plugin) ObjectStack(_ map[string]string, _ map[string]string) (stack ma
242271
return stack
243272
}
244273

245-
func (p *Plugin) getDetectorsStates(dcsDetectors DCSDetectors) DCSDetectorStatesMap {
246-
p.detectorMatrixMu.RLock()
247-
defer p.detectorMatrixMu.RUnlock()
274+
func (p *Plugin) getDetectorsPfrAvailability(dcsDetectors DCSDetectors) DCSDetectorOpAvailabilityMap {
275+
p.detectorMapMu.RLock()
276+
defer p.detectorMapMu.RUnlock()
248277

249-
detectorStatesMap := make(DCSDetectorStatesMap)
278+
availabilityMap := make(DCSDetectorOpAvailabilityMap)
250279

251280
for _, dcsDet := range dcsDetectors {
252281
if dcsDet == dcspb.Detector_DCS {
253282
continue
254283
}
255-
detectorStatesMap[dcsDet] = dcspb.DetectorState_NULL_STATE
284+
availabilityMap[dcsDet] = dcspb.DetectorState_NULL_STATE
256285

257-
for _, detInfo := range p.detectorMatrix {
258-
if detInfo.Detector == dcsDet {
259-
detectorStatesMap[dcsDet] = detInfo.State
260-
}
286+
if _, contains := p.detectorMap[dcsDet]; contains {
287+
availabilityMap[dcsDet] = p.detectorMap[dcsDet].PfrAvailability
288+
}
289+
}
290+
291+
return availabilityMap
292+
}
293+
294+
func (p *Plugin) getDetectorsSorAvailability(dcsDetectors DCSDetectors) DCSDetectorOpAvailabilityMap {
295+
p.detectorMapMu.RLock()
296+
defer p.detectorMapMu.RUnlock()
297+
298+
availabilityMap := make(DCSDetectorOpAvailabilityMap)
299+
300+
for _, dcsDet := range dcsDetectors {
301+
if dcsDet == dcspb.Detector_DCS {
302+
continue
303+
}
304+
availabilityMap[dcsDet] = dcspb.DetectorState_NULL_STATE
305+
306+
if _, contains := p.detectorMap[dcsDet]; contains {
307+
availabilityMap[dcsDet] = p.detectorMap[dcsDet].SorAvailability
261308
}
262309
}
263310

264-
return detectorStatesMap
311+
return availabilityMap
265312
}
266313

267314
func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
@@ -304,7 +351,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
304351
return
305352
}
306353

307-
knownDetectorStates := p.getDetectorsStates(dcsDetectors)
354+
knownDetectorStates := p.getDetectorsPfrAvailability(dcsDetectors)
308355
isCompatibleWithOperation, err := knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)
309356
if !isCompatibleWithOperation {
310357
log.WithError(err).
@@ -692,7 +739,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
692739
return
693740
}
694741

695-
knownDetectorStates := p.getDetectorsStates(dcsDetectors)
742+
knownDetectorStates := p.getDetectorsSorAvailability(dcsDetectors)
696743
isCompatibleWithOperation, err := knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)
697744
if !isCompatibleWithOperation {
698745
log.WithError(err).

core/integration/dcs/structs.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (d DCSDetectors) EcsDetectorsSlice() (sslice []string) {
6161
return
6262
}
6363

64-
func (dsm DCSDetectorStatesMap) makeDetectorsByStateMap() map[dcspb.DetectorState]DCSDetectors {
64+
func (dsm DCSDetectorOpAvailabilityMap) makeDetectorsByStateMap() map[dcspb.DetectorState]DCSDetectors {
6565
detectorsByState := make(map[dcspb.DetectorState]DCSDetectors)
6666
for det, detState := range dsm {
6767
if _, ok := detectorsByState[detState]; !ok {
@@ -73,7 +73,7 @@ func (dsm DCSDetectorStatesMap) makeDetectorsByStateMap() map[dcspb.DetectorStat
7373
}
7474

7575
// Returns true if the provided detectors are either all in conditionState or in NULL_STATE
76-
func (dsm DCSDetectorStatesMap) compatibleWithDCSOperation(conditionState dcspb.DetectorState) (bool, error) {
76+
func (dsm DCSDetectorOpAvailabilityMap) compatibleWithDCSOperation(conditionState dcspb.DetectorState) (bool, error) {
7777
detectorsByState := dsm.makeDetectorsByStateMap()
7878

7979
detectorsInConditionState, thereAreDetectorsInConditionState := detectorsByState[conditionState]
@@ -101,3 +101,11 @@ func (dsm DCSDetectorStatesMap) compatibleWithDCSOperation(conditionState dcspb.
101101
return false, fmt.Errorf("detectors are in incompatible states: %v", strings.Join(reportByState, "; "))
102102
}
103103
}
104+
105+
func (m DCSDetectorInfoMap) ToEcsDetectors() map[string]*dcspb.DetectorInfo {
106+
out := make(map[string]*dcspb.DetectorInfo)
107+
for k, v := range m {
108+
out[dcsToEcsDetector(k)] = v
109+
}
110+
return out
111+
}

0 commit comments

Comments
 (0)