Skip to content

Commit adb5765

Browse files
authored
Merge pull request patr1ck-m#11 from patr1ck-m/improvement/data_input
Improve push to pull stream converter by setting max queue size.
2 parents cde3ddd + 84f1139 commit adb5765

2 files changed

Lines changed: 158 additions & 4 deletions

File tree

src/lib/queue.effekt

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
module src/lib/queue
2+
3+
import array
4+
5+
/**
6+
7+
Note: this is a copy of the queue.effekt from the standard library, because we need to add a size method.
8+
This allows us to implement more efficient implementations of stream buffers by slowing the emit down when the
9+
queue size exceeds a certain threshold.
10+
11+
*/
12+
13+
/// Mutable, automatically resizing queue.
14+
interface Queue[T] {
15+
def empty?(): Bool
16+
17+
def popFront(): Option[T]
18+
19+
def popBack(): Option[T]
20+
21+
def peekFront(): Option[T]
22+
23+
def peekBack(): Option[T]
24+
25+
def pushFront(el: T): Unit
26+
27+
def pushBack(el: T): Unit
28+
29+
def size(): Int
30+
}
31+
32+
33+
def emptyQueue[T](): Queue[T] at global =
34+
emptyQueue[T](64)
35+
36+
def emptyQueue[T](initialCapacity: Int): Queue[T] at global = {
37+
38+
val contents = ref(array[Option[T]](initialCapacity, None()))
39+
val head = ref(0)
40+
val tail = ref(0)
41+
val size = ref(0)
42+
val capacity = ref(initialCapacity)
43+
44+
def remove(arr: Array[Option[T]], index: Int): Option[T] = {
45+
with on[OutOfBounds].default { None() }
46+
val value = arr.get(index)
47+
arr.set(index, None())
48+
value
49+
}
50+
51+
def nonEmpty[T] { p: => Option[T] / Exception[OutOfBounds] }: Option[T] =
52+
if (size.get <= 0) None() else on[OutOfBounds].default { None() } { p() }
53+
54+
// Exponential back-off
55+
def resizeTo(requiredSize: Int): Unit =
56+
if (requiredSize <= capacity.get) () else {
57+
with on[OutOfBounds].ignore // should not happen
58+
59+
val oldSize = capacity.get
60+
val newSize = capacity.get * 2
61+
val oldContents = contents.get
62+
val newContents = array[Option[T]](newSize, None())
63+
64+
if (head.get < tail.get) {
65+
// The queue does not wrap around; direct copy is possible.
66+
copy(oldContents, head.get, newContents, 0, size.get) // changed tail to size
67+
} else if (size.get > 0) {
68+
// The queue wraps around; copy in two segments.
69+
copy(oldContents, head.get, newContents, 0, oldSize - head.get) // changed oldSize to oldSize - head
70+
copy(oldContents, 0, newContents, oldSize - head.get, tail.get) // changed oldSize - head to oldSize - head
71+
}
72+
73+
contents.set(newContents)
74+
capacity.set(newSize)
75+
head.set(0)
76+
tail.set(oldSize)
77+
}
78+
79+
def queue = new Queue[T] {
80+
def empty?() = size.get <= 0
81+
82+
def popFront() =
83+
nonEmpty {
84+
val result = contents.get.remove(head.get)
85+
head.set(mod(head.get + 1, capacity.get))
86+
size.set(size.get - 1)
87+
result
88+
}
89+
90+
def popBack() =
91+
nonEmpty {
92+
tail.set(mod(tail.get - 1 + capacity.get, capacity.get))
93+
val result = contents.get.remove(tail.get)
94+
size.set(size.get - 1)
95+
result
96+
}
97+
98+
def peekFront() = nonEmpty { contents.get.get(head.get) }
99+
100+
def peekBack() = nonEmpty { contents.get.get(tail.get) }
101+
102+
def pushFront(el: T) = {
103+
resizeTo(size.get + 1)
104+
head.set(mod(head.get - 1 + capacity.get, capacity.get))
105+
size.set(size.get + 1)
106+
contents.get.set(head.get, Some(el))
107+
}
108+
109+
def pushBack(el: T) = {
110+
resizeTo(size.get + 1)
111+
contents.get.set(tail.get, Some(el))
112+
size.set(size.get + 1)
113+
tail.set(mod(tail.get + 1, capacity.get))
114+
}
115+
116+
def size() = size.get
117+
}
118+
box queue
119+
}
120+
121+
namespace examples {
122+
def main() = {
123+
// queue with initial capacity 4
124+
def b = emptyQueue[Int](4)
125+
println(b.empty?)
126+
b.pushFront(1)
127+
b.pushBack(2)
128+
b.pushFront(3)
129+
b.pushBack(4)
130+
// this will cause resizing:
131+
b.pushBack(5)
132+
b.pushBack(6)
133+
b.pushBack(7)
134+
b.pushBack(8)
135+
// and again:
136+
b.pushBack(9)
137+
138+
println(b.empty?)
139+
println(b.popFront()) // Some(3)
140+
println(b.popFront()) // Some(1)
141+
println(b.popFront()) // Some(2)
142+
println(b.popFront()) // Some(4)
143+
println(b.popFront()) // Some(5)
144+
println(b.popFront()) // Some(6)
145+
println(b.popFront()) // Some(7)
146+
println(b.popFront()) // Some(8)
147+
println(b.popFront()) // Some(9)
148+
println(b.popFront()) // None()
149+
}
150+
}

src/lib/stream_input.effekt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,22 @@ module src/lib/stream_input
22

33
import stream
44
import src/lib/csv
5-
import queue
5+
import src/lib/queue
66
import scanner
77
import io/error
88
import io/filesystem
99
import io/time
1010

11-
def pushToPullStream[T] {pushStr: () => Unit / { emit[T], stop } } {pullStr: () => Unit / { read[T] } } = {
11+
/// Connect a push-based stream to a pull-based stream using an internal queue which slows emits down when reaching the maximum size.
12+
def pushToPullStream[T](maxQueueSize: Int) {pushStr: () => Unit / { emit[T], stop } } {pullStr: () => Unit / { read[T] } } = {
1213
def q = emptyQueue[T]()
1314
var running = true
1415
try pushStr() with emit[T] {
1516
def emit(v: T) = {
1617
q.pushBack(v)
18+
while (q.size() >= maxQueueSize) {
19+
time::sleep(10)
20+
}
1721
resume(())
1822
}
1923
} with stop {
@@ -68,11 +72,11 @@ def csvFeedStr(csvStr: String, columnName: String, delayMs: Int): Unit / { emit[
6872
}
6973

7074
def csvFeedPull(path: String, columnName: String, delayMs: Int) {body: () => Unit / { read[Double], Exception[IOError], Exception[WrongFormat] } }: Unit / { Exception[IOError], Exception[WrongFormat] } = {
71-
pushToPullStream[Double] { csvFeed(path, columnName, delayMs) } { body() }
75+
pushToPullStream[Double](100) { csvFeed(path, columnName, delayMs) } { body() }
7276
}
7377

7478
def csvFeedStrPull(csvStr: String, columnName: String, delayMs: Int) {body: () => Unit / { read[Double], Exception[WrongFormat] } }: Unit / { Exception[WrongFormat] } = {
75-
pushToPullStream[Double] { csvFeedStr(csvStr, columnName, delayMs) } { body() }
79+
pushToPullStream[Double](100) { csvFeedStr(csvStr, columnName, delayMs) } { body() }
7680
}
7781

7882

0 commit comments

Comments
 (0)