Skip to content

Commit a0f481c

Browse files
authored
Merge pull request patr1ck-m#8 from patr1ck-m/feature/data_input
Add stream input. Update codebase for effekt version 0.57.0
2 parents eafeb2f + 959d599 commit a0f481c

9 files changed

Lines changed: 223 additions & 13 deletions

flake.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/lib/csv.effekt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type CsvValue = List[CsvRow]
6767
def build[R] { body: => R / CsvBuilder }: (R, CsvValue) = {
6868
var csvrows: List[CsvRow] = []
6969
val r: R = try body() with CsvBuilder {
70-
def rows() = resume { {row} =>
70+
def rows[R]() = resume { {row} =>
7171
val (ret, csvrow) = buildRow {row}
7272
csvrows = Cons(csvrow, csvrows)
7373
ret
@@ -122,7 +122,7 @@ def readQuotedString(): Unit / { Scan[Char], emit[Char], Exception[WrongFormat]
122122
}
123123
}
124124

125-
def decodeRow(): Unit / {Scan[Char], RowBuilder, Exception[WrongFormat]} = exhaustively {
125+
def decodeRow(): Unit / {Scan[Char], RowBuilder, Exception[WrongFormat]} = loop {
126126
val cell = readUntil { c => c == ',' || c == '\n' || c == '"' }
127127
do cell(cell)
128128

@@ -138,7 +138,7 @@ def decodeRow(): Unit / {Scan[Char], RowBuilder, Exception[WrongFormat]} = exhau
138138
}
139139
}
140140

141-
def decodeCsv(): Unit / {Scan[Char], CsvBuilder, Exception[WrongFormat]} = exhaustively {
141+
def decodeCsv(): Unit / {Scan[Char], CsvBuilder, Exception[WrongFormat]} = loop {
142142
do peek[Char]() match {
143143
case '\n' =>
144144
// do skip[Char]()
@@ -158,7 +158,7 @@ def encodeCsv[R] { csv: => R / CsvBuilder }: R / emit[String] = {
158158
}
159159

160160
try csv() with CsvBuilder {
161-
def rows() = resume { {row} =>
161+
def rows[R]() = resume { {row} =>
162162
sep(); encodeRow {row}
163163
}
164164
}
@@ -182,7 +182,7 @@ def validate[R] { csv: => R / CsvBuilder }: R / Exception[WrongFormat] = {
182182
var rowNumber = 0
183183

184184
try csv() with CsvBuilder {
185-
def rows() = resume { {r} =>
185+
def rows[R]() = resume { {r} =>
186186
rowNumber = rowNumber + 1
187187

188188
var cells = 0
@@ -214,7 +214,7 @@ def validate[R] { csv: => R / CsvBuilder }: R / Exception[WrongFormat] = {
214214
def getColumnOf[R](columnName: String) { csv: => R / CsvBuilder }: R / {emit[String], Exception[WrongFormat]} = {
215215
var columnId = None()
216216
try csv() with CsvBuilder {
217-
def rows() = resume { {r} =>
217+
def rows[R]() = resume { {r} =>
218218
var currentColumn = 0
219219
val ret = try r() with RowBuilder {
220220
def cell(s) = {

src/lib/stream_input.effekt

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
module src/lib/stream_input
2+
3+
import stream
4+
import src/lib/csv
5+
import queue
6+
import scanner
7+
import io/error
8+
import io/filesystem
9+
import io/time
10+
11+
def pushToPullStream[T] {pushStr: () => Unit / { emit[T], stop } } {pullStr: () => Unit / { read[T] } } = {
12+
def q = emptyQueue[T]()
13+
var running = true
14+
try pushStr() with emit[T] {
15+
def emit(v: T) = {
16+
q.pushBack(v)
17+
resume(())
18+
}
19+
} with stop {
20+
running = false
21+
}
22+
try pullStr() with read[T] {
23+
while (running) {
24+
val v = q.popFront()
25+
v match {
26+
case Some(value) => {resume { unbox value }}
27+
case None() => {}
28+
}
29+
}
30+
resume { do stop() }
31+
}
32+
}
33+
34+
/// Parse string to double (not in standard library???)
35+
extern def toDouble(s: String) at {}: Double =
36+
js "parseFloat(${s})"
37+
38+
def csvFeed(path: String, columnName: String, delayMs: Int): Unit / { emit[Double], Exception[IOError], Exception[WrongFormat] } = {
39+
readFileUTF8(path) {
40+
returning::scanner[Char, Unit] {
41+
try {
42+
getColumnOf(columnName) {
43+
decodeCsv()
44+
}
45+
} with emit[String] { s =>
46+
do emit(s.toDouble)
47+
time::sleep(delayMs)
48+
resume(())
49+
}
50+
}
51+
}
52+
}
53+
54+
def csvFeedStr(csvStr: String, columnName: String, delayMs: Int): Unit / { emit[Double], Exception[WrongFormat] } = {
55+
feed(csvStr) {
56+
returning::scanner[Char, Unit] {
57+
try {
58+
getColumnOf(columnName) {
59+
decodeCsv()
60+
}
61+
} with emit[String] { s =>
62+
do emit(s.toDouble)
63+
time::sleep(delayMs)
64+
resume(())
65+
}
66+
}
67+
}
68+
}
69+
70+
def csvFeedPull(path: String, columnName: String, delayMs: Int) {body: () => Unit / { read[Double], Exception[IOError], Exception[WrongFormat] } }: Unit / { Exception[IOError], Exception[WrongFormat] } = {
71+
pushToPullStream[Double] { csvFeed(path, columnName, delayMs) } { body() }
72+
}
73+
74+
def csvFeedStrPull(csvStr: String, columnName: String, delayMs: Int) {body: () => Unit / { read[Double], Exception[WrongFormat] } }: Unit / { Exception[WrongFormat] } = {
75+
pushToPullStream[Double] { csvFeedStr(csvStr, columnName, delayMs) } { body() }
76+
}
77+
78+
79+
namespace examples {
80+
def example1(): Unit = {
81+
with on[WrongFormat].panic()
82+
83+
val csv = "name,age\nJohn,42\nJoe,10\nJolene,27"
84+
85+
try csvFeedStr(csv, "age", 500) with emit[Double] { v =>
86+
println("Received value: " ++ v.show())
87+
resume(())
88+
}
89+
}
90+
91+
def example2(): Unit = {
92+
with on[IOError].panic()
93+
with on[WrongFormat].panic()
94+
95+
try csvFeed("src/test/testdata.csv", "value", 500) with emit[Double] { v =>
96+
println("Received value: " ++ v.show())
97+
resume(())
98+
}
99+
}
100+
101+
def example3(): Unit = {
102+
with on[IOError].panic()
103+
with on[WrongFormat].panic()
104+
with boundary
105+
csvFeedPull("src/test/testdata.csv", "value", 500) {
106+
while (true) {
107+
println("Waiting to read value...")
108+
val v = do read[Double]()
109+
println("Received value: " ++ v.show())
110+
}
111+
}
112+
}
113+
114+
def main(): Unit = {
115+
example2()
116+
}
117+
}

src/test.effekt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@ import process
44
import test
55
import src/test/aggregation_test
66
import src/test/anomaly_detection_test
7+
import src/test/stream_input_test
78

89
def main() = {
910
val testResults = [
1011
aggregation_test::testSuite(),
11-
anomaly_detection_test::testSuite()
12+
anomaly_detection_test::testSuite(),
13+
// stream_input_test::testSuite()
1214
]
1315
val allPassed = testResults.all { res => res }
1416
val exitCode = if (allPassed) 0 else 1
1517
process::exit(exitCode)
18+
()
1619
}

src/test/aggregation_test.effekt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import test
44
import src/lib/aggregation
55
import src/lib/event
66
import src/test/lib
7+
import process
78

89
def eventsFromList(values: List[Double]): List[Event] = {
910
def loop(vals: List[Double], ts: Int): List[Event] = {
@@ -99,4 +100,9 @@ def testSuite() = suite("aggregation") {
99100
}
100101
}
101102
}
103+
}
104+
105+
def main() = {
106+
process::exit(if (testSuite()) 0 else 1)
107+
()
102108
}

src/test/anomaly_detection_test.effekt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import test
44
import src/lib/anomaly_detection
55
import src/lib/event
66
import src/test/lib
7+
import process
78

89
def eventsFromList(values: List[Double]): List[Event] = {
910
def loop(vals: List[Double], ts: Int): List[Event] = {
@@ -114,4 +115,9 @@ def testSuite() = suite("anomaly_detection") {
114115
anomaly_detection::zScoreAnomalyDetector(1.0)
115116
}
116117
}
118+
}
119+
120+
def main() = {
121+
process::exit(if (testSuite()) 0 else 1)
122+
()
117123
}

src/test/lib.effekt

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ def assertEmits[T](expected: List[T]) { equals: (T, T) => Bool } { program: () =
1515
assertTrue(equals(head, v), "Emitted value did not match expected value")
1616
lst = tail
1717
}
18+
resume(())
1819
}
20+
assertTrue(lst.isEmpty(), "Did not emit all expected values")
1921
}
2022

2123
def assertNoEmits[T] { program: () => Unit / emit[T] } : Unit / { Assertion } = {
@@ -31,17 +33,40 @@ def assertReads[T](inputs: List[T]) { equals: (T, T) => Bool } { program: () =>
3133
var lst = inputs
3234
assertFalse(lst.isEmpty(), "assertReads requires a non empty list of inputs")
3335
val someValue: T = lst.head() // dummy value to return when we run out of inputs (not used because of the failing assert but necessary for type checking)
36+
var isStop: Bool = false
3437
try {
3538
program()
3639
} with read[T] { () =>
37-
val r: T = lst match {
40+
lst match {
3841
case Nil() =>
39-
assertTrue(false, "Read more values than expected")
40-
someValue // to satisfy type checker
42+
if (isStop) {
43+
assertTrue(false, "Read more values than expected (stop already signaled)")
44+
} else {
45+
isStop = true
46+
resume { do stop() }
47+
}
4148
case Cons(head, tail) =>
4249
lst = tail
43-
unbox head
50+
resume { unbox head }
51+
}
52+
}
53+
}
54+
55+
def assertDoRaise[E] {block: () => Unit / Exception[E]} = {
56+
try {
57+
block()
58+
assertTrue(false, "Expected exception was not raised")
59+
} with Exception[E] {
60+
def raise(e: E, msg: String) = {
61+
}
62+
}
63+
}
64+
def assertDoNotRaise[E] {block: () => Unit / Exception[E]} = {
65+
try {
66+
block()
67+
} with Exception[E] {
68+
def raise(e: E, msg: String) = {
69+
assertTrue(false, "Expected exception not to raise but was raised")
4470
}
45-
resume { unbox r }
4671
}
4772
}

src/test/stream_input_test.effekt

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
module src/test/stream_input_test
2+
3+
import test
4+
import src/lib/stream_input
5+
import src/lib/event
6+
import src/test/lib
7+
import io/error
8+
import list
9+
import process
10+
import io/filesystem
11+
12+
/// The content of the test_events.csv as a list of doubles for comparison
13+
def csvContentAsList(): List[Double] = {
14+
[10.0, 27.0, 5.5, 42.0, 14.5, 29.0, 20.0]
15+
}
16+
17+
def testSuite() = suite("stream_input_csv") {
18+
test("reads csv events push") {
19+
with assertDoNotRaise[WrongFormat]
20+
with assertDoNotRaise[IOError]
21+
assertEmits[Double](csvContentAsList()) { (a: Double, b: Double) => a == b } {
22+
stream_input::csvFeed("src/test/testdata.csv", "value", 0)
23+
}
24+
}
25+
26+
// test("raises IOError on missing file") {
27+
// with assertDoRaise[IOError]
28+
// with assertDoNotRaise[WrongFormat]
29+
// assertEmits[Double]([]) { (a: Double, b: Double) => a == b } {
30+
// stream_input::csvFeed("src/test/data/missing_file.csv", "value", 0)
31+
// }
32+
// }
33+
34+
// test("read non existing file") {
35+
// assertDoRaise[IOError] {
36+
// readFile("non_existing_file") // should raise IOError but backend error occurs
37+
// ()
38+
// }
39+
// }
40+
}
41+
42+
def main() = {
43+
process::exit(if (testSuite()) 0 else 1)
44+
()
45+
}

src/test/testdata.csv

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
value
2+
10.0
3+
27.0
4+
5.5
5+
42.0
6+
14.5
7+
29.0
8+
20.0

0 commit comments

Comments
 (0)