@@ -90,6 +90,121 @@ def testSuite() = suite("aggregation") {
9090 }
9191 }
9292 }
93+
94+ // ==================
95+ // Edge Case Tests
96+ // ==================
97+
98+ test("aggregate single element using min") {
99+ val input = eventsFromList([42.0])
100+ val expectedOutput = eventsFromList([42.0])
101+ assertEmits[Event]( expectedOutput ) { (a, b) => sameEvent(a, b) } {
102+ assertReads[Event](input) { (a, b) => sameEvent(a, b) } {
103+ aggregation::aggregateMin()
104+ }
105+ }
106+ }
107+
108+ test("aggregate single element using max") {
109+ val input = eventsFromList([42.0])
110+ val expectedOutput = eventsFromList([42.0])
111+ assertEmits[Event]( expectedOutput ) { (a, b) => sameEvent(a, b) } {
112+ assertReads[Event](input) { (a, b) => sameEvent(a, b) } {
113+ aggregation::aggregateMax()
114+ }
115+ }
116+ }
117+
118+ test("aggregate single element using mean") {
119+ val input = eventsFromList([42.0])
120+ val expectedOutput = eventsFromList([42.0])
121+ assertEmits[Event]( expectedOutput ) { (a, b) => sameEvent(a, b) } {
122+ assertReads[Event](input) { (a, b) => sameEvent(a, b) } {
123+ aggregation::aggregateMean()
124+ }
125+ }
126+ }
127+
128+ test("aggregate single element using median") {
129+ val input = eventsFromList([42.0])
130+ val expectedOutput = eventsFromList([42.0])
131+ assertEmits[Event]( expectedOutput ) { (a, b) => sameEvent(a, b) } {
132+ assertReads[Event](input) { (a, b) => sameEvent(a, b) } {
133+ aggregation::aggregateMedian()
134+ }
135+ }
136+ }
137+
138+ test("aggregate with negative values using min") {
139+ val input = eventsFromList([-5.0, -3.0, -8.0, -1.0])
140+ val expectedOutput = eventsFromList([-5.0, -5.0, -8.0, -8.0])
141+ assertEmits[Event]( expectedOutput ) { (a, b) => sameEvent(a, b) } {
142+ assertReads[Event](input) { (a, b) => sameEvent(a, b) } {
143+ aggregation::aggregateMin()
144+ }
145+ }
146+ }
147+
148+ test("aggregate with negative values using max") {
149+ val input = eventsFromList([-5.0, -3.0, -8.0, -1.0])
150+ val expectedOutput = eventsFromList([-5.0, -3.0, -3.0, -1.0])
151+ assertEmits[Event]( expectedOutput ) { (a, b) => sameEvent(a, b) } {
152+ assertReads[Event](input) { (a, b) => sameEvent(a, b) } {
153+ aggregation::aggregateMax()
154+ }
155+ }
156+ }
157+
158+ test("aggregate with negative values using mean") {
159+ val input = eventsFromList([-5.0, -3.0, -7.0, -1.0])
160+ val expectedOutput = eventsFromList([-5.0, -4.0, -5.0, -4.0])
161+ assertEmits[Event]( expectedOutput ) { (a, b) => sameEvent(a, b) } {
162+ assertReads[Event](input) { (a, b) => sameEvent(a, b) } {
163+ aggregation::aggregateMean()
164+ }
165+ }
166+ }
167+
168+ test("aggregate with negative values using median") {
169+ val input = eventsFromList([-5.0, -3.0, -8.0, -1.0])
170+ val expectedOutput = eventsFromList([-5.0, -4.0, -5.0, -4.0])
171+ assertEmits[Event]( expectedOutput ) { (a, b) => sameEvent(a, b) } {
172+ assertReads[Event](input) { (a, b) => sameEvent(a, b) } {
173+ aggregation::aggregateMedian()
174+ }
175+ }
176+ }
177+
178+ test("windowed min with window size 1") {
179+ val input = eventsFromList([3.0, 1.0, 4.0, 1.0, 5.0])
180+ val expectedOutput = eventsFromList([3.0, 1.0, 4.0, 1.0, 5.0])
181+ assertEmits[Event]( expectedOutput ) { (a, b) => sameEvent(a, b) } {
182+ assertReads[Event](input) { (a, b) => sameEvent(a, b) } {
183+ aggregation::aggregateMinWindow(1)
184+ }
185+ }
186+ }
187+
188+ test("windowed max with window size 1") {
189+ val input = eventsFromList([3.0, 1.0, 4.0, 1.0, 5.0])
190+ val expectedOutput = eventsFromList([3.0, 1.0, 4.0, 1.0, 5.0])
191+ assertEmits[Event]( expectedOutput ) { (a, b) => sameEvent(a, b) } {
192+ assertReads[Event](input) { (a, b) => sameEvent(a, b) } {
193+ aggregation::aggregateMaxWindow(1)
194+ }
195+ }
196+ }
197+
198+ test("windowed mean with window larger than input") {
199+ val input = eventsFromList([2.0, 4.0, 6.0])
200+ // Window size 10, but only 3 elements, so it computes mean of all available
201+ val expectedOutput = eventsFromList([2.0, 3.0, 4.0])
202+ assertEmits[Event]( expectedOutput ) { (a, b) => sameEvent(a, b) } {
203+ assertReads[Event](input) { (a, b) => sameEvent(a, b) } {
204+ aggregation::aggregateMeanWindow(10)
205+ }
206+ }
207+ }
93208}
94209
95210def main() = {
0 commit comments