@@ -125,6 +125,55 @@ def logToFileCsv(filePath: String) {body: () => Unit / AnomalyDetection}: Unit /
125125 }
126126}
127127
128+ /// An AnomalyDetection handler that logs all events to a CSV file and rethrows them to the caller
129+ def logToFileCsvRethrow(filePath: String) {body: () => Unit / AnomalyDetection}: Unit / { AnomalyDetection, Exception[IOError] } = {
130+ var buffer: List[CsvRow] = [["timestamp", "value", "isAnomaly", "anomalyScore"]]
131+ var first = true
132+ val maxBufferSize = 1000
133+ try {
134+ body()
135+ } with AnomalyDetection {
136+ def anomaly(ae: AnomalyEvaluation) = {
137+ val row: CsvRow = [
138+ ae.event.timestamp.show(),
139+ ae.event.value.show(),
140+ "true",
141+ ae.anomalyScore.show()
142+ ]
143+ buffer = Cons(row, buffer)
144+ if (buffer.size >= maxBufferSize) {
145+ // Flush buffer to file
146+ internal::saveCsvRows(buffer.reverse, filePath, first)
147+ buffer = []
148+ first = false
149+ }
150+ do anomaly(ae)
151+ resume(())
152+ }
153+ def noAnomaly(ae: AnomalyEvaluation) = {
154+ val row: CsvRow = [
155+ ae.event.timestamp.show(),
156+ ae.event.value.show(),
157+ "false",
158+ ae.anomalyScore.show()
159+ ]
160+ buffer = Cons(row, buffer)
161+ if (buffer.size >= maxBufferSize) {
162+ // Flush buffer to file
163+ internal::saveCsvRows(buffer.reverse, filePath, first)
164+ buffer = []
165+ first = false
166+ }
167+ do noAnomaly(ae)
168+ resume(())
169+ }
170+ }
171+ // Flush remaining buffer to file
172+ if (not(buffer.isEmpty())) {
173+ internal::saveCsvRows(buffer.reverse, filePath, first)
174+ }
175+ }
176+
128177/// An AnomalyDetection handler that logs only anomalies to a CSV file
129178def logAnomaliesToFileCsv(filePath: String) {body: () => Unit / AnomalyDetection}: Unit / Exception[IOError] = {
130179 var buffer: List[CsvRow] = [["timestamp", "value", "isAnomaly", "anomalyScore"]]
@@ -159,6 +208,42 @@ def logAnomaliesToFileCsv(filePath: String) {body: () => Unit / AnomalyDetection
159208 }
160209}
161210
211+ /// An AnomalyDetection handler that logs only anomalies to a CSV file and rethrows them to the caller
212+ def logAnomaliesToFileCsv(filePath: String) {body: () => Unit / AnomalyDetection}: Unit / { AnomalyDetection, Exception[IOError] } = {
213+ var buffer: List[CsvRow] = [["timestamp", "value", "isAnomaly", "anomalyScore"]]
214+ var first = true
215+ val maxBufferSize = 1000
216+ try {
217+ body()
218+ } with AnomalyDetection {
219+ def anomaly(ae: AnomalyEvaluation) = {
220+ val row: CsvRow = [
221+ ae.event.timestamp.show(),
222+ ae.event.value.show(),
223+ "true",
224+ ae.anomalyScore.show()
225+ ]
226+ buffer = Cons(row, buffer)
227+ if (buffer.size >= maxBufferSize) {
228+ // Flush buffer to file
229+ internal::saveCsvRows(buffer.reverse, filePath, first)
230+ buffer = []
231+ first = false
232+ }
233+ do anomaly(ae)
234+ resume(())
235+ }
236+ def noAnomaly(ae: AnomalyEvaluation) = {
237+ do noAnomaly(ae)
238+ resume(())
239+ }
240+ }
241+ // Flush remaining buffer to file
242+ if (not(buffer.isEmpty())) {
243+ internal::saveCsvRows(buffer.reverse, filePath, first)
244+ }
245+ }
246+
162247
163248namespace examples {
164249 def example1(): Unit = {
0 commit comments