Skip to content

Commit 78894d7

Browse files
committed
[core] Add support for dcs_pfr/sor_grace_period variables
1 parent 13e0556 commit 78894d7

1 file changed

Lines changed: 98 additions & 2 deletions

File tree

core/integration/dcs/plugin.go

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,54 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
402402
return
403403
}
404404

405+
// We acquire a grace period during which we hope that DCS will become compatible with the operation.
406+
// During this period we'll keep checking our internal state for op compatibility as reported by DCS at 1Hz,
407+
// and if we don't get a compatible state within the grace period, we declare the operation failed.
408+
pfrGracePeriod := time.Duration(0)
409+
pfrGracePeriodS, ok := varStack["dcs_pfr_grace_period"]
410+
if ok {
411+
pfrGracePeriod, err = time.ParseDuration(pfrGracePeriodS)
412+
if err != nil {
413+
log.WithError(err).
414+
WithField("level", infologger.IL_Ops).
415+
WithField("partition", envId).
416+
WithField("call", "PrepareForRun").
417+
Warnf("cannot parse DCS PFR grace period, assuming 0 seconds")
418+
}
419+
} else {
420+
log.WithField("level", infologger.IL_Ops).
421+
WithField("partition", envId).
422+
WithField("call", "PrepareForRun").
423+
Info("DCS PFR grace period not set, defaulting to 0 seconds")
424+
}
425+
426+
pfrGraceTimeout := time.Now().Add(pfrGracePeriod)
427+
isCompatibleWithOperation := false
428+
405429
knownDetectorStates := p.getDetectorsPfrAvailability(dcsDetectors)
406-
isCompatibleWithOperation, err := knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE)
430+
isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE)
431+
432+
for {
433+
if isCompatibleWithOperation {
434+
break
435+
} else {
436+
log.WithField("level", infologger.IL_Ops).
437+
WithField("partition", envId).
438+
WithField("call", "PrepareForRun").
439+
WithField("grace_period", pfrGracePeriod.String()).
440+
WithField("remaining_grace_period", pfrGraceTimeout.Sub(time.Now()).String()).
441+
Infof("waiting for DCS operation readiness: %s", err.Error())
442+
time.Sleep(1 * time.Second)
443+
}
444+
445+
if time.Now().Before(pfrGraceTimeout) {
446+
knownDetectorStates = p.getDetectorsPfrAvailability(dcsDetectors)
447+
isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_PFR_AVAILABLE)
448+
} else {
449+
break
450+
}
451+
}
452+
407453
if !isCompatibleWithOperation {
408454
log.WithError(err).
409455
WithField("level", infologger.IL_Ops).
@@ -422,6 +468,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
422468
Warnf("cannot determine PFR readiness: %s", err.Error())
423469
}
424470

471+
// By now the DCS must be in a compatible state, so we proceed with gathering params for the operation
472+
425473
log.WithField("partition", envId).
426474
WithField("level", infologger.IL_Ops).
427475
Infof("performing DCS PFR for detectors: %s", strings.Join(dcsDetectors.EcsDetectorsSlice(), " "))
@@ -790,8 +838,54 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
790838
return
791839
}
792840

841+
// We acquire a grace period during which we hope that DCS will become compatible with the operation.
842+
// During this period we'll keep checking our internal state for op compatibility as reported by DCS at 1Hz,
843+
// and if we don't get a compatible state within the grace period, we declare the operation failed.
844+
sorGracePeriod := time.Duration(0)
845+
sorGracePeriodS, ok := varStack["dcs_sor_grace_period"]
846+
if ok {
847+
sorGracePeriod, err = time.ParseDuration(sorGracePeriodS)
848+
if err != nil {
849+
log.WithError(err).
850+
WithField("level", infologger.IL_Ops).
851+
WithField("partition", envId).
852+
WithField("call", "StartOfRun").
853+
Warnf("cannot parse DCS SOR grace period, assuming 0 seconds")
854+
}
855+
} else {
856+
log.WithField("level", infologger.IL_Ops).
857+
WithField("partition", envId).
858+
WithField("call", "StartOfRun").
859+
Info("DCS SOR grace period not set, defaulting to 0 seconds")
860+
}
861+
862+
sorGraceTimeout := time.Now().Add(sorGracePeriod)
863+
isCompatibleWithOperation := false
864+
793865
knownDetectorStates := p.getDetectorsSorAvailability(dcsDetectors)
794-
isCompatibleWithOperation, err := knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)
866+
isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)
867+
868+
for {
869+
if isCompatibleWithOperation {
870+
break
871+
} else {
872+
log.WithField("level", infologger.IL_Ops).
873+
WithField("partition", envId).
874+
WithField("call", "StartOfRun").
875+
WithField("grace_period", sorGracePeriod.String()).
876+
WithField("remaining_grace_period", sorGraceTimeout.Sub(time.Now()).String()).
877+
Infof("waiting for DCS operation readiness: %s", err.Error())
878+
time.Sleep(1 * time.Second)
879+
}
880+
881+
if time.Now().Before(sorGraceTimeout) {
882+
knownDetectorStates = p.getDetectorsSorAvailability(dcsDetectors)
883+
isCompatibleWithOperation, err = knownDetectorStates.compatibleWithDCSOperation(dcspb.DetectorState_SOR_AVAILABLE)
884+
} else {
885+
break
886+
}
887+
}
888+
795889
if !isCompatibleWithOperation {
796890
log.WithError(err).
797891
WithField("level", infologger.IL_Ops).
@@ -810,6 +904,8 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
810904
Warnf("cannot determine SOR readiness: %s", err.Error())
811905
}
812906

907+
// By now the DCS must be in a compatible state, so we proceed with gathering params for the operation
908+
813909
log.WithField("partition", envId).
814910
WithField("level", infologger.IL_Ops).
815911
WithField("run", runNumber64).

0 commit comments

Comments
 (0)