Skip to content

Commit 41b501d

Browse files
committed
Add anomaly logging handlers for console and file (csv) logging
1 parent b99e073 commit 41b501d

2 files changed

Lines changed: 268 additions & 1 deletion

File tree

src/lib/anomaly_logger.effekt

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
module src/lib/anomaly_logger
2+
3+
import src/lib/anomaly_detection
4+
import src/lib/event
5+
import src/lib/csv
6+
import io/error
7+
import io/filesystem
8+
9+
/// An AnomalyDetection handler that logs only anomalies to the console
10+
def logAnomaliesToConsole() {body: () => Unit / AnomalyDetection}: Unit = {
11+
try {
12+
body()
13+
} with AnomalyDetection {
14+
def anomaly(ae: AnomalyEvaluation) = {
15+
println("Anomaly detected: " ++ ae.event.value.show() ++ " at t=" ++ ae.event.timestamp.show() ++
16+
" (score: " ++ ae.anomalyScore.show() ++ ")")
17+
resume(())
18+
}
19+
def noAnomaly(ae: AnomalyEvaluation) = {
20+
resume(())
21+
}
22+
}
23+
}
24+
25+
/// An AnomalyDetection handler that logs only anomalies to the console and rethrows the events to the caller
26+
def logAnomaliesToConsoleRethrow() {body: () => Unit / AnomalyDetection}: Unit / AnomalyDetection = {
27+
try {
28+
body()
29+
} with AnomalyDetection {
30+
def anomaly(ae: AnomalyEvaluation) = {
31+
println("Anomaly detected: " ++ ae.event.value.show() ++ " at t=" ++ ae.event.timestamp.show() ++
32+
" (score: " ++ ae.anomalyScore.show() ++ ")")
33+
do anomaly(ae)
34+
resume(())
35+
}
36+
def noAnomaly(ae: AnomalyEvaluation) = {
37+
do noAnomaly(ae)
38+
resume(())
39+
}
40+
}
41+
}
42+
43+
/// An AnomalyDetection handler that logs all events to the console
44+
def logToConsole() {body: () => Unit / AnomalyDetection}: Unit = {
45+
try {
46+
body()
47+
} with AnomalyDetection {
48+
def anomaly(ae: AnomalyEvaluation) = {
49+
println("Anomaly detected: " ++ ae.event.value.show() ++ " at t=" ++ ae.event.timestamp.show() ++
50+
" (score: " ++ ae.anomalyScore.show() ++ ")")
51+
resume(())
52+
}
53+
def noAnomaly(ae: AnomalyEvaluation) = {
54+
println("No anomaly: " ++ ae.event.value.show() ++ " at t=" ++ ae.event.timestamp.show() ++
55+
" (score: " ++ ae.anomalyScore.show() ++ ")")
56+
resume(())
57+
}
58+
}
59+
}
60+
61+
/// An AnomalyDetection handler that logs all events to the console and rethrows the events to the caller
62+
def logToConsoleRethrow() {body: () => Unit / AnomalyDetection}: Unit / AnomalyDetection = {
63+
try {
64+
body()
65+
} with AnomalyDetection {
66+
def anomaly(ae: AnomalyEvaluation) = {
67+
println("Anomaly detected: " ++ ae.event.value.show() ++ " at t=" ++ ae.event.timestamp.show() ++
68+
" (score: " ++ ae.anomalyScore.show() ++ ")")
69+
do anomaly(ae)
70+
resume(())
71+
}
72+
def noAnomaly(ae: AnomalyEvaluation) = {
73+
println("No anomaly: " ++ ae.event.value.show() ++ " at t=" ++ ae.event.timestamp.show() ++
74+
" (score: " ++ ae.anomalyScore.show() ++ ")")
75+
do noAnomaly(ae)
76+
resume(())
77+
}
78+
}
79+
}
80+
81+
/// An AnomalyDetection handler that logs all events to a CSV file
82+
def logToFileCsv(filePath: String) {body: () => Unit / AnomalyDetection}: Unit / Exception[IOError] = {
83+
var buffer: List[CsvRow] = [["timestamp", "value", "isAnomaly", "anomalyScore"]]
84+
var first = true
85+
val maxBufferSize = 1000
86+
try {
87+
body()
88+
} with AnomalyDetection {
89+
def anomaly(ae: AnomalyEvaluation) = {
90+
val row: CsvRow = [
91+
ae.event.timestamp.show(),
92+
ae.event.value.show(),
93+
"true",
94+
ae.anomalyScore.show()
95+
]
96+
buffer = Cons(row, buffer)
97+
if (buffer.size >= maxBufferSize) {
98+
// Flush buffer to file
99+
internal::saveCsvRows(buffer.reverse, filePath, first)
100+
buffer = []
101+
first = false
102+
}
103+
resume(())
104+
}
105+
def noAnomaly(ae: AnomalyEvaluation) = {
106+
val row: CsvRow = [
107+
ae.event.timestamp.show(),
108+
ae.event.value.show(),
109+
"false",
110+
ae.anomalyScore.show()
111+
]
112+
buffer = Cons(row, buffer)
113+
if (buffer.size >= maxBufferSize) {
114+
// Flush buffer to file
115+
internal::saveCsvRows(buffer.reverse, filePath, first)
116+
buffer = []
117+
first = false
118+
}
119+
resume(())
120+
}
121+
}
122+
// Flush remaining buffer to file
123+
if (not(buffer.isEmpty())) {
124+
internal::saveCsvRows(buffer.reverse, filePath, first)
125+
}
126+
}
127+
128+
/// An AnomalyDetection handler that logs only anomalies to a CSV file
129+
def logAnomaliesToFileCsv(filePath: String) {body: () => Unit / AnomalyDetection}: Unit / Exception[IOError] = {
130+
var buffer: List[CsvRow] = [["timestamp", "value", "isAnomaly", "anomalyScore"]]
131+
var first = true
132+
val maxBufferSize = 1000
133+
try {
134+
body()
135+
} with AnomalyDetection {
136+
def anomaly(ae: AnomalyEvaluation) = {
137+
val row: CsvRow = [
138+
ae.event.timestamp.show(),
139+
ae.event.value.show(),
140+
"true",
141+
ae.anomalyScore.show()
142+
]
143+
buffer = Cons(row, buffer)
144+
if (buffer.size >= maxBufferSize) {
145+
// Flush buffer to file
146+
internal::saveCsvRows(buffer.reverse, filePath, first)
147+
buffer = []
148+
first = false
149+
}
150+
resume(())
151+
}
152+
def noAnomaly(ae: AnomalyEvaluation) = {
153+
resume(())
154+
}
155+
}
156+
// Flush remaining buffer to file
157+
if (not(buffer.isEmpty())) {
158+
internal::saveCsvRows(buffer.reverse, filePath, first)
159+
}
160+
}
161+
162+
163+
namespace examples {
164+
def example1(): Unit = {
165+
with on[MissingValue].panic()
166+
with logAnomaliesToConsole()
167+
var lst: List[Event] = [
168+
Event(1, 1.0),
169+
Event(2, 3.0),
170+
Event(3, 6.0),
171+
Event(4, 0.5),
172+
Event(5, 4.0),
173+
Event(6, -1.0),
174+
Event(7, 3.0)
175+
]
176+
try {
177+
minMaxAnomalyDetector(0.0, 5.0)
178+
} with read[Event] { () =>
179+
if (lst.isEmpty()) {
180+
println("No more values to read, stopping...")
181+
resume { do stop() }
182+
} else {
183+
val first = lst.head()
184+
lst = lst.deleteAt(0)
185+
println("Providing value: " ++ first.value.show())
186+
resume { unbox first }
187+
}
188+
}
189+
}
190+
191+
def example2(): Unit = {
192+
with on[MissingValue].panic()
193+
with logToConsole()
194+
var lst: List[Event] = [
195+
Event(1, 1.0),
196+
Event(2, 3.0),
197+
Event(3, 6.0),
198+
Event(4, 0.5),
199+
Event(5, 4.0),
200+
Event(6, -1.0),
201+
Event(7, 3.0)
202+
]
203+
try {
204+
zScoreAnomalyDetector(1.0)
205+
} with read[Event] { () =>
206+
if (lst.isEmpty()) {
207+
println("No more values to read, stopping...")
208+
resume { do stop() }
209+
} else {
210+
val first = lst.head()
211+
lst = lst.deleteAt(0)
212+
println("Providing value: " ++ first.value.show())
213+
resume { unbox first }
214+
}
215+
}
216+
}
217+
218+
/// Note: This will create (or overwrite) a file "anomalies_example_log.csv" in the current working directory
219+
def example3(): Unit = {
220+
with on[MissingValue].panic()
221+
with on[IOError].panic()
222+
with logToFileCsv("anomalies_example_log.csv")
223+
with logToConsoleRethrow()
224+
var lst: List[Event] = [
225+
Event(1, 1.0),
226+
Event(2, 3.0),
227+
Event(3, 6.0),
228+
Event(4, 0.5),
229+
Event(5, 4.0),
230+
Event(6, -1.0),
231+
Event(7, 3.0)
232+
]
233+
try {
234+
zScoreAnomalyDetector(1.0)
235+
} with read[Event] { () =>
236+
if (lst.isEmpty()) {
237+
println("No more values to read, stopping...")
238+
resume { do stop() }
239+
} else {
240+
val first = lst.head()
241+
lst = lst.deleteAt(0)
242+
println("Providing value: " ++ first.value.show())
243+
resume { unbox first }
244+
}
245+
}
246+
}
247+
}
248+
249+
def main(): Unit = {
250+
examples::example3()
251+
()
252+
}
253+
254+
namespace internal {
255+
def saveCsvRows(rows: List[CsvRow], filePath: String, first: Bool): Unit / Exception[IOError] = {
256+
val line: String = collectList[String] {
257+
encodeCsv {
258+
unbuild(rows)
259+
}
260+
}.join("") ++ "\n"
261+
if (first) {
262+
writeFile(filePath, line)
263+
} else {
264+
appendFile(filePath, line)
265+
}
266+
}
267+
}

src/test.effekt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ def main() = {
1010
val testResults = [
1111
aggregation_test::testSuite(),
1212
anomaly_detection_test::testSuite(),
13-
// stream_input_test::testSuite()
13+
stream_input_test::testSuite()
1414
]
1515
val allPassed = testResults.all { res => res }
1616
val exitCode = if (allPassed) 0 else 1

0 commit comments

Comments
 (0)