@@ -27,6 +27,20 @@ def logToConsoleRethrow() {body: () => Unit / emit[Event]}: Unit / emit[Event] =
2727 }
2828}
2929
30+ /// An AnomalyDetection handler that logs all events to the console for pull-based streams
31+ def logToConsolePull() {body: () => Unit / read[Event]}: Unit / read[Event] = {
32+ with boundary
33+ try {
34+ body()
35+ } with read[Event] {
36+ val e = do read[Event]()
37+ resume {
38+ println("Event: " ++ e.value.show() ++ " at t=" ++ e.timestamp.show())
39+ e
40+ }
41+ }
42+ }
43+
3044/// An AnomalyDetection handler that logs all events to a CSV file
3145def logToFileCsv(filePath: String) {body: () => Unit / emit[Event]}: Unit / Exception[IOError] = {
3246 var buffer: List[CsvRow] = [["timestamp", "value"]]
@@ -82,6 +96,39 @@ def logToFileCsvRethrow(filePath: String) {body: () => Unit / emit[Event]}: Unit
8296 }
8397}
8498
99+ def logToFileCsvPull(filePath: String) {body: () => Unit / read[Event]}: Unit / { read[Event], Exception[IOError] } = {
100+ var buffer: List[CsvRow] = [["timestamp", "value"]]
101+ var first = true
102+ val maxBufferSize = 1000
103+ try { // Double try is required because the read in the with block throws stop when the stream ends
104+ try {
105+ body()
106+ } with read[Event] {
107+ val e = do read[Event]()
108+ resume {
109+ val row: CsvRow = [
110+ e.timestamp.show(),
111+ e.value.show()
112+ ]
113+ buffer = Cons(row, buffer)
114+ if (buffer.size >= maxBufferSize) {
115+ // Flush buffer to file
116+ internal::saveCsvRows(buffer.reverse, filePath, first)
117+ buffer = []
118+ first = false
119+ }
120+ e
121+ }
122+ }
123+ } with stop {
124+ ()
125+ }
126+ // Flush remaining buffer to file
127+ if (not(buffer.isEmpty())) {
128+ internal::saveCsvRows(buffer.reverse, filePath, first)
129+ }
130+ }
131+
85132
86133namespace examples {
87134 def example1(): Unit = {
0 commit comments