1+ module src/lib/event_logger
2+
3+ import src/lib/event
4+ import src/lib/csv
5+ import src/lib/anomaly_logger // only for the csv saving function (in internal namespace)
6+ import io/error
7+
8+
9+ /// An AnomalyDetection handler that logs all events to the console
10+ def logToConsole() {body: () => Unit / emit[Event]}: Unit = {
11+ try {
12+ body()
13+ } with emit[Event] { e =>
14+ println("Event: " ++ e.value.show() ++ " at t=" ++ e.timestamp.show())
15+ resume(())
16+ }
17+ }
18+
19+ /// An AnomalyDetection handler that logs all events to the console and rethrows the events to the caller
20+ def logToConsoleRethrow() {body: () => Unit / emit[Event]}: Unit / emit[Event] = {
21+ try {
22+ body()
23+ } with emit[Event] { e =>
24+ println("Event: " ++ e.value.show() ++ " at t=" ++ e.timestamp.show())
25+ do emit(e)
26+ resume(())
27+ }
28+ }
29+
30+ /// An AnomalyDetection handler that logs all events to a CSV file
31+ def logToFileCsv(filePath: String) {body: () => Unit / emit[Event]}: Unit / Exception[IOError] = {
32+ var buffer: List[CsvRow] = [["timestamp", "value"]]
33+ var first = true
34+ val maxBufferSize = 1000
35+ try {
36+ body()
37+ } with emit[Event] { e =>
38+ val row: CsvRow = [
39+ e.timestamp.show(),
40+ e.value.show()
41+ ]
42+ buffer = Cons(row, buffer)
43+ if (buffer.size >= maxBufferSize) {
44+ // Flush buffer to file
45+ internal::saveCsvRows(buffer.reverse, filePath, first)
46+ buffer = []
47+ first = false
48+ }
49+ resume(())
50+ }
51+ // Flush remaining buffer to file
52+ if (not(buffer.isEmpty())) {
53+ internal::saveCsvRows(buffer.reverse, filePath, first)
54+ }
55+ }
56+
57+ /// An AnomalyDetection handler that logs all events to a CSV file
58+ def logToFileCsvRethrow(filePath: String) {body: () => Unit / emit[Event]}: Unit / { emit[Event], Exception[IOError] } = {
59+ var buffer: List[CsvRow] = [["timestamp", "value"]]
60+ var first = true
61+ val maxBufferSize = 1000
62+ try {
63+ body()
64+ } with emit[Event] { e =>
65+ val row: CsvRow = [
66+ e.timestamp.show(),
67+ e.value.show()
68+ ]
69+ buffer = Cons(row, buffer)
70+ if (buffer.size >= maxBufferSize) {
71+ // Flush buffer to file
72+ internal::saveCsvRows(buffer.reverse, filePath, first)
73+ buffer = []
74+ first = false
75+ }
76+ do emit(e)
77+ resume(())
78+ }
79+ // Flush remaining buffer to file
80+ if (not(buffer.isEmpty())) {
81+ internal::saveCsvRows(buffer.reverse, filePath, first)
82+ }
83+ }
84+
85+
86+ namespace examples {
87+ def example1(): Unit = {
88+ with on[MissingValue].panic()
89+ with logToConsole()
90+ var lst: List[Event] = [
91+ Event(1, 1.0),
92+ Event(2, 3.0),
93+ Event(3, 6.0),
94+ Event(4, 0.5),
95+ Event(5, 4.0),
96+ Event(6, -1.0),
97+ Event(7, 3.0)
98+ ]
99+ foreach (lst) { e =>
100+ do emit(e)
101+ }
102+ }
103+
104+ /// Note: This will create (or overwrite) a file "anomalies_example_log.csv" in the current working directory
105+ def example2(): Unit = {
106+ with on[MissingValue].panic()
107+ with on[IOError].panic()
108+ with logToFileCsv("anomalies_example_log.csv")
109+ with logToConsoleRethrow()
110+ var lst: List[Event] = [
111+ Event(1, 1.0),
112+ Event(2, 3.0),
113+ Event(3, 6.0),
114+ Event(4, 0.5),
115+ Event(5, 4.0),
116+ Event(6, -1.0),
117+ Event(7, 3.0)
118+ ]
119+ foreach (lst) { e =>
120+ do emit(e)
121+ }
122+ }
123+ }
124+
125+ def main(): Unit = {
126+ examples::example2()
127+ ()
128+ }
0 commit comments