Skip to content

Commit 3126de9

Browse files
committed
Change stream input methods to use events. Fix bugs with the push to pull stream conversion method. Enhance CSV handling.
1 parent a22f098 commit 3126de9

8 files changed

Lines changed: 232 additions & 218 deletions

src/lib/anomaly_detection.effekt

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ record AnomalyEvaluation (
1414
anomalyScore: Double
1515
)
1616

17+
def show(a: AnomalyEvaluation): String = {
18+
"event=" ++ a.event.show() ++
19+
", isAnomaly=" ++ a.isAnomaly.show() ++
20+
", anomalyScore=" ++ a.anomalyScore.show()
21+
}
22+
1723
/// Check equality of two AnomalyEvaluation records
1824
///
1925
/// Performs a field-by-field equality check (timestamp, value, anomaly flag, score)
@@ -122,17 +128,16 @@ namespace examples {
122128
} else {
123129
val first = lst.head()
124130
lst = lst.deleteAt(0)
125-
println("Providing value: " ++ first.value.show())
131+
println("Providing value: " ++ first.show())
126132
resume { unbox first }
127133
}
128134
} with AnomalyDetection {
129135
def anomaly(ev: AnomalyEvaluation) = {
130-
println("Anomaly detected: " ++ ev.event.value.show() ++ " at t=" ++ ev.event.timestamp.show() ++
131-
" (score: " ++ ev.anomalyScore.show() ++ ")")
136+
println("Anomaly detected: " ++ ev.show())
132137
resume(())
133138
}
134139
def noAnomaly(ev: AnomalyEvaluation) = {
135-
println("No anomaly: " ++ ev.event.value.show() ++ " at t=" ++ ev.event.timestamp.show())
140+
println("No anomaly: " ++ ev.show())
136141
resume(())
137142
}
138143
}
@@ -158,16 +163,16 @@ namespace examples {
158163
} else {
159164
val first = lst.head()
160165
lst = lst.deleteAt(0)
161-
println("Providing value: " ++ first.value.show())
166+
println("Providing value: " ++ first.show())
162167
resume { unbox first }
163168
}
164169
} with AnomalyDetection {
165170
def anomaly(ev: AnomalyEvaluation) = {
166-
println("Anomaly detected: " ++ ev.event.value.show() ++ " at t=" ++ ev.event.timestamp.show() ++ " (score: " ++ ev.anomalyScore.show() ++ ")")
171+
println("Anomaly detected: " ++ ev.show())
167172
resume(())
168173
}
169174
def noAnomaly(ev: AnomalyEvaluation) = {
170-
println("No anomaly: " ++ ev.event.value.show() ++ " at t=" ++ ev.event.timestamp.show() ++ " (score: " ++ ev.anomalyScore.show() ++ ")")
175+
println("No anomaly: " ++ ev.show())
171176
resume(())
172177
}
173178
}

src/lib/csv.effekt

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import scanner
1515
import stream
1616
import string
1717
import partial
18+
import map
1819

1920

2021
/*
@@ -48,6 +49,16 @@ def collectString{ body: => Unit / { emit[Char] } }: String = {
4849
str
4950
}
5051

52+
def contains[T](lst: List[T], v: T) {comp: (T, T) => Bool}: Bool = {
53+
var found = false
54+
lst.foreach { x =>
55+
if (comp(x, v)) {
56+
found = true
57+
}
58+
}
59+
return found
60+
}
61+
5162
/*
5263
---------------------------------------------------
5364
--- The following part is written by @jiribenes ---
@@ -238,6 +249,43 @@ def getColumnOf[R](columnName: String) { csv: => R / CsvBuilder }: R / {emit[Str
238249
}
239250
}
240251

252+
/// Not in PR, added by myself
253+
///
254+
/// Get multiple columns by their names
255+
def getColumnsOf[R](columnNames: List[String]) { csv: => R / CsvBuilder }: R / {emit[Map[String, String]], Exception[WrongFormat]} = {
256+
with on[MissingValue].panic() // should never happen
257+
val compFuncString = box {(x: String, y: String) => if (x < y) Less() else if (x > y) Greater() else Equal()}
258+
val compFuncInt = box {(x: Int, y: Int) => if (x < y) Less() else if (x > y) Greater() else Equal()}
259+
var columnIdsMap = empty[Int, String](compFuncInt)
260+
try csv() with CsvBuilder {
261+
def rows[R]() = resume { {r} =>
262+
var currentColumn = 0
263+
var resultMap: Map[String, String] = empty[String, String](compFuncString)
264+
val ret = try r() with RowBuilder {
265+
def cell(s) = {
266+
currentColumn = currentColumn + 1
267+
if (not(columnIdsMap.contains(currentColumn)) and columnNames.contains(s.trim()) {(x, y) => x == y}) {
268+
columnIdsMap = columnIdsMap.put(currentColumn, s.trim())
269+
} else if (columnIdsMap.contains(currentColumn)) {
270+
val colName = columnIdsMap.get(currentColumn).value()
271+
resultMap = map::put(resultMap, colName, s.trim())
272+
}
273+
resume(())
274+
}
275+
}
276+
if (not(resultMap.isEmpty())) {
277+
do emit(resultMap)
278+
}
279+
280+
// it's the first iteration and we haven't found a column yet!
281+
if (columnIdsMap.size() < columnNames.size()) {
282+
wrongFormat("Header doesn't have the given column name: " ++ columnNames.show())
283+
}
284+
ret
285+
}
286+
}
287+
}
288+
241289

242290
def main() = {
243291
with on[WrongFormat].report

src/lib/event.effekt

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,41 @@
11
module src/lib/event
22

3+
import src/lib/timestamp
4+
import src/lib/typeconversion
5+
36
record Event (
47
timestamp: Int,
58
value: Double
6-
)
9+
)
10+
11+
def toEvent(value: Double): Event = {
12+
Event(currentTimestampMillis(), value)
13+
}
14+
15+
def toEvent(value: Double, timestamp: Int): Event = {
16+
Event(timestamp, value)
17+
}
18+
19+
def toEvent(value: Int): Event = {
20+
Event(currentTimestampMillis(), value.toDouble)
21+
}
22+
23+
def toEvent(value: Int, timestamp: Int): Event = {
24+
Event(timestamp, value.toDouble)
25+
}
26+
27+
def toEvent(value: String): Event / Exception[WrongFormat] = {
28+
Event(currentTimestampMillis(), value.toDouble)
29+
}
30+
31+
def show(e: Event): String = {
32+
"Event(value=" ++ e.value.show() ++ ", timestamp=" ++ timeMillisToString(e.timestamp) ++ ")"
33+
}
34+
35+
def equals(a: Event, b: Event): Bool = {
36+
a.timestamp == b.timestamp && a.value == b.value
37+
}
38+
39+
def equalsIgnoreTimestamp(a: Event, b: Event): Bool = {
40+
a.value == b.value
41+
}

src/lib/queue.effekt

Lines changed: 0 additions & 150 deletions
This file was deleted.

0 commit comments

Comments
 (0)