Skip to content

Commit f5901f7

Browse files
authored
Merge pull request patr1ck-m#25 from patr1ck-m/issue/detection_handlers_use_push_streams
Refactor anomaly detection methods to use push streams as input
2 parents 80e56eb + 7ce59c1 commit f5901f7

2 files changed

Lines changed: 49 additions & 68 deletions

File tree

src/lib/anomaly_detection.effekt

Lines changed: 29 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,15 @@ interface AnomalyDetection {
4343
/// is the distance outside of the range. Otherwise it is considered normal and receives a score of `0.0`.
4444
///
4545
/// O(1) space complexity, O(1) time complexity per read
46-
def minMaxAnomalyDetector(min: Double, max: Double): Unit / { read[Event], AnomalyDetection } = {
47-
with boundary
48-
while (true) {
49-
val ev = do read[Event]()
46+
def minMaxAnomalyDetector(min: Double, max: Double) { body: () => Unit / { emit[Event] } }: Unit / { AnomalyDetection } = {
47+
try body() with emit[Event] { ev =>
5048
val value = ev.value
5149
if (value < min || value > max) {
5250
do anomaly(AnomalyEvaluation(ev, true, if (value < min) min - value else value - max))
5351
} else {
5452
do noAnomaly(AnomalyEvaluation(ev, false, 0.0))
5553
}
54+
resume(())
5655
}
5756
}
5857

@@ -64,11 +63,9 @@ def minMaxAnomalyDetector(min: Double, max: Double): Unit / { read[Event], Anoma
6463
/// and can be positive or negative depending on whether the value is above or below the mean.
6564
///
6665
/// O(1) space complexity, O(1) time complexity per read
67-
def meanThresholdAnomalyDetector(threshold: Double): Unit / { read[Event], AnomalyDetection } = {
68-
with boundary
66+
def meanThresholdAnomalyDetector(threshold: Double) { body: () => Unit / { emit[Event] } }: Unit / { AnomalyDetection } = {
6967
var count = counter::empty()
70-
while (true) {
71-
val ev = do read[Event]()
68+
try body() with emit[Event] { ev =>
7269
val value = ev.value
7370
count = count.add(value)
7471
val deviation = value - count.mean()
@@ -77,6 +74,7 @@ def meanThresholdAnomalyDetector(threshold: Double): Unit / { read[Event], Anoma
7774
} else {
7875
do noAnomaly(AnomalyEvaluation(ev, false, deviation))
7976
}
77+
resume(())
8078
}
8179
}
8280

@@ -88,11 +86,9 @@ def meanThresholdAnomalyDetector(threshold: Double): Unit / { read[Event], Anoma
8886
/// `anomalyScore` is the computed z-score and can be positive or negative.
8987
///
9088
/// O(1) space complexity, O(1) time complexity per read
91-
def zScoreAnomalyDetector(zThreshold: Double): Unit / { read[Event], AnomalyDetection } = {
92-
with boundary
89+
def zScoreAnomalyDetector(zThreshold: Double) { body: () => Unit / { emit[Event] } }: Unit / { AnomalyDetection } = {
9390
var count = counter::empty()
94-
while (true) {
95-
val ev = do read[Event]()
91+
try body() with emit[Event] { ev =>
9692
val value = ev.value
9793
count = count.add(value)
9894
val mean = count.mean()
@@ -103,33 +99,23 @@ def zScoreAnomalyDetector(zThreshold: Double): Unit / { read[Event], AnomalyDete
10399
} else {
104100
do noAnomaly(AnomalyEvaluation(ev, false, zScore))
105101
}
102+
resume(())
106103
}
107104
}
108105

109106

110107
namespace examples {
111108
def example1(): Unit = {
112109
with on[MissingValue].panic()
113-
var lst: List[Event] = [
114-
Event(1, 1.0),
115-
Event(2, 3.0),
116-
Event(3, 6.0),
117-
Event(4, 0.5),
118-
Event(5, 4.0),
119-
Event(6, -1.0),
120-
Event(7, 3.0)
121-
]
122110
try {
123-
minMaxAnomalyDetector(0.0, 5.0)
124-
} with read[Event] { () =>
125-
if (lst.isEmpty()) {
126-
println("No more values to read, stopping...")
127-
resume { do stop() }
128-
} else {
129-
val first = lst.head()
130-
lst = lst.deleteAt(0)
131-
println("Providing value: " ++ first.show())
132-
resume { unbox first }
111+
minMaxAnomalyDetector(0.0, 5.0) {
112+
do emit(Event(1, 1.0))
113+
do emit(Event(2, 3.0))
114+
do emit(Event(3, 6.0))
115+
do emit(Event(4, 0.5))
116+
do emit(Event(5, 4.0))
117+
do emit(Event(6, -1.0))
118+
do emit(Event(7, 3.0))
133119
}
134120
} with AnomalyDetection {
135121
def anomaly(ev: AnomalyEvaluation) = {
@@ -145,26 +131,15 @@ namespace examples {
145131

146132
def example2(): Unit = {
147133
with on[MissingValue].panic()
148-
var lst: List[Event] = [
149-
Event(1, 1.0),
150-
Event(2, 3.0),
151-
Event(3, 6.0),
152-
Event(4, 0.5),
153-
Event(5, 4.0),
154-
Event(6, -1.0),
155-
Event(7, 3.0)
156-
]
157134
try {
158-
zScoreAnomalyDetector(1.0)
159-
} with read[Event] { () =>
160-
if (lst.isEmpty()) {
161-
println("No more values to read, stopping...")
162-
resume { do stop() }
163-
} else {
164-
val first = lst.head()
165-
lst = lst.deleteAt(0)
166-
println("Providing value: " ++ first.show())
167-
resume { unbox first }
135+
zScoreAnomalyDetector(1.0) {
136+
do emit(Event(1, 1.0))
137+
do emit(Event(2, 3.0))
138+
do emit(Event(3, 6.0))
139+
do emit(Event(4, 0.5))
140+
do emit(Event(5, 4.0))
141+
do emit(Event(6, -1.0))
142+
do emit(Event(7, 3.0))
168143
}
169144
} with AnomalyDetection {
170145
def anomaly(ev: AnomalyEvaluation) = {
@@ -177,4 +152,8 @@ namespace examples {
177152
}
178153
}
179154
}
155+
156+
def main() = {
157+
example1()
158+
}
180159
}

src/test/anomaly_detection_test.effekt

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,12 @@ def sameEvaluation(a: AnomalyEvaluation, b: AnomalyEvaluation): Bool = {
3131
approx(a.anomalyScore, b.anomalyScore)
3232
}
3333

34-
def assertAnomalyDetection(inputs: List[Event], expected: List[AnomalyEvaluation]) { program: () => Unit / { read[Event], AnomalyDetection } } : Unit / { Assertion } = {
35-
var remainingInputs = inputs
34+
def assertAnomalyDetection(expected: List[AnomalyEvaluation]) { program: () => Unit / { AnomalyDetection } } : Unit / { Assertion } = {
3635
var remainingExpected = expected
3736

3837
try {
3938
program()
40-
} with read[Event] { () =>
41-
remainingInputs match {
42-
case Nil() =>
43-
resume { do stop() }
44-
case Cons(head, tail) =>
45-
remainingInputs = tail
46-
resume { unbox head }
47-
}
48-
} with AnomalyDetection {
39+
} with AnomalyDetection {
4940
def anomaly(ev: AnomalyEvaluation) = {
5041
remainingExpected match {
5142
case Nil() =>
@@ -68,7 +59,6 @@ def assertAnomalyDetection(inputs: List[Event], expected: List[AnomalyEvaluation
6859
}
6960
}
7061

71-
assertTrue(remainingInputs.isEmpty(), "Did not read all inputs")
7262
assertTrue(remainingExpected.isEmpty(), "Did not receive all expected detection events")
7363
}
7464

@@ -83,8 +73,12 @@ def testSuite() = suite("anomaly_detection") {
8373
AnomalyEvaluation(Event(4, 3.0), false, 0.0)
8474
]
8575

86-
assertAnomalyDetection(input, expected) {
87-
anomaly_detection::minMaxAnomalyDetector(0.0, 4.0)
76+
assertAnomalyDetection(expected) {
77+
anomaly_detection::minMaxAnomalyDetector(0.0, 4.0) {
78+
input.foreach { e =>
79+
do emit(e)
80+
}
81+
}
8882
}
8983
}
9084

@@ -97,8 +91,12 @@ def testSuite() = suite("anomaly_detection") {
9791
AnomalyEvaluation(Event(3, 2.0), false, -0.5)
9892
]
9993

100-
assertAnomalyDetection(input, expected) {
101-
anomaly_detection::meanThresholdAnomalyDetector(1.5)
94+
assertAnomalyDetection(expected) {
95+
anomaly_detection::meanThresholdAnomalyDetector(1.5) {
96+
input.foreach { e =>
97+
do emit(e)
98+
}
99+
}
102100
}
103101
}
104102

@@ -111,8 +109,12 @@ def testSuite() = suite("anomaly_detection") {
111109
AnomalyEvaluation(Event(3, 1.0), false, -0.5)
112110
]
113111

114-
assertAnomalyDetection(input, expected) {
115-
anomaly_detection::zScoreAnomalyDetector(1.0)
112+
assertAnomalyDetection(expected) {
113+
anomaly_detection::zScoreAnomalyDetector(1.0) {
114+
input.foreach { e =>
115+
do emit(e)
116+
}
117+
}
116118
}
117119
}
118120
}

0 commit comments

Comments
 (0)