|
18 | 18 | package monix.execution.schedulers |
19 | 19 |
|
20 | 20 | import minitest.TestSuite |
21 | | -import monix.execution.ExecutionModel.{AlwaysAsyncExecution, Default => DefaultExecutionModel} |
| 21 | +import monix.execution.ExecutionModel.{ AlwaysAsyncExecution, Default => DefaultExecutionModel } |
22 | 22 | import monix.execution.cancelables.SingleAssignCancelable |
23 | 23 | import monix.execution.exceptions.DummyException |
24 | | -import monix.execution.{Cancelable, Scheduler, UncaughtExceptionReporter} |
| 24 | +import monix.execution.schedulers.ExecutorSchedulerSuite.TestException |
| 25 | +import monix.execution.{ Cancelable, Scheduler, UncaughtExceptionReporter } |
25 | 26 |
|
26 | | -import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException} |
| 27 | +import java.util.concurrent.{ CountDownLatch, Executors, TimeUnit, TimeoutException } |
27 | 28 | import scala.concurrent.duration._ |
28 | | -import scala.concurrent.{Await, Promise, blocking} |
| 29 | +import scala.concurrent.{ blocking, Await, Promise } |
29 | 30 |
|
30 | 31 | abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self => |
31 | 32 | var lastReportedFailure = null: Throwable |
@@ -152,108 +153,77 @@ abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self |
152 | 153 | } |
153 | 154 |
|
154 | 155 | test("reports errors on execute") { scheduler => |
155 | | - val latch = new CountDownLatch(1) |
156 | | - self.synchronized { |
157 | | - lastReportedFailure = null |
158 | | - lastReportedFailureLatch = latch |
159 | | - } |
| 156 | + val latch: CountDownLatch = setupReporterLatch() |
160 | 157 |
|
161 | 158 | try { |
162 | | - val ex = DummyException("dummy") |
163 | | - |
164 | | - scheduler.execute(new Runnable { |
165 | | - override def run() = |
166 | | - throw ex |
167 | | - }) |
| 159 | + scheduler.execute(() => throw TestException) |
168 | 160 |
|
169 | | - assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") |
170 | | - self.synchronized(assertEquals(lastReportedFailure, ex)) |
| 161 | + assertTestExceptionCaught(latch) |
171 | 162 | } finally { |
172 | | - self.synchronized { |
173 | | - lastReportedFailure = null |
174 | | - lastReportedFailureLatch = null |
175 | | - } |
| 163 | + clearReporterLatch() |
176 | 164 | } |
177 | 165 | } |
178 | 166 |
|
179 | 167 | test("reports errors on scheduleOnce") { scheduler => |
180 | | - val latch = new CountDownLatch(1) |
181 | | - self.synchronized { |
182 | | - lastReportedFailure = null |
183 | | - lastReportedFailureLatch = latch |
184 | | - } |
185 | | - |
186 | | - try { |
187 | | - val ex = DummyException("dummy") |
| 168 | + testScheduledErrorReporting( |
| 169 | + scheduleFailure = () => scheduler.scheduleOnce(1.milli)(throw TestException) |
| 170 | + ) |
| 171 | + } |
188 | 172 |
|
189 | | - scheduler.scheduleOnce( |
190 | | - 1, |
191 | | - TimeUnit.MILLISECONDS, |
192 | | - () => throw ex, |
193 | | - ) |
| 173 | + test("reports errors on scheduleAtFixedRate") { scheduler => |
| 174 | + testScheduledErrorReporting( |
| 175 | + scheduleFailure = () => scheduler.scheduleAtFixedRate(0.seconds, 1.second)(throw TestException) |
| 176 | + ) |
| 177 | + } |
194 | 178 |
|
195 | | - assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") |
196 | | - self.synchronized(assertEquals(lastReportedFailure, ex)) |
197 | | - } finally { |
198 | | - self.synchronized { |
199 | | - lastReportedFailure = null |
200 | | - lastReportedFailureLatch = null |
201 | | - } |
202 | | - } |
| 179 | + test("reports errors on scheduleWithFixedDelay") { scheduler => |
| 180 | + testScheduledErrorReporting( |
| 181 | + scheduleFailure = () => scheduler.scheduleWithFixedDelay(0.seconds, 1.second)(throw TestException), |
| 182 | + ) |
203 | 183 | } |
204 | 184 |
|
205 | | - test("reports errors on scheduleAtFixedRate") { scheduler => |
206 | | - val latch = new CountDownLatch(1) |
207 | | - self.synchronized { |
208 | | - lastReportedFailure = null |
209 | | - lastReportedFailureLatch = latch |
210 | | - } |
| 185 | + private def testScheduledErrorReporting(scheduleFailure: () => Cancelable): Unit = { |
| 186 | + val latch = setupReporterLatch() |
211 | 187 |
|
212 | | - val ex = DummyException("dummy") |
213 | | - val schedule = scheduler.scheduleAtFixedRate(0.seconds, 1.second) { |
214 | | - throw ex |
215 | | - } |
| 188 | + val schedule = scheduleFailure() |
216 | 189 |
|
217 | 190 | try { |
218 | | - assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") |
219 | | - self.synchronized(assertEquals(lastReportedFailure, ex)) |
| 191 | + assertTestExceptionCaught(latch) |
220 | 192 | } finally { |
221 | 193 | schedule.cancel() |
222 | | - self.synchronized { |
223 | | - lastReportedFailure = null |
224 | | - lastReportedFailureLatch = null |
225 | | - } |
| 194 | + clearReporterLatch() |
226 | 195 | } |
227 | 196 | } |
228 | 197 |
|
229 | | - test("reports errors on scheduleWithFixedDelay") { scheduler => |
| 198 | + private def setupReporterLatch() = { |
230 | 199 | val latch = new CountDownLatch(1) |
231 | 200 | self.synchronized { |
232 | 201 | lastReportedFailure = null |
233 | 202 | lastReportedFailureLatch = latch |
234 | 203 | } |
| 204 | + latch |
| 205 | + } |
235 | 206 |
|
236 | | - val ex = DummyException("dummy") |
237 | | - val schedule = scheduler.scheduleWithFixedDelay(0.seconds, 1.second) { |
238 | | - throw ex |
239 | | - } |
| 207 | + private def assertTestExceptionCaught(latch: CountDownLatch): Unit = { |
| 208 | + assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") |
| 209 | + self.synchronized(assertEquals(lastReportedFailure, TestException)) |
| 210 | + } |
240 | 211 |
|
241 | | - try { |
242 | | - assert(latch.await(15, TimeUnit.MINUTES), "lastReportedFailureLatch.await") |
243 | | - self.synchronized(assertEquals(lastReportedFailure, ex)) |
244 | | - } finally { |
245 | | - schedule.cancel() |
246 | | - self.synchronized { |
247 | | - lastReportedFailure = null |
248 | | - lastReportedFailureLatch = null |
249 | | - } |
| 212 | + private def clearReporterLatch(): Unit = { |
| 213 | + self.synchronized { |
| 214 | + lastReportedFailure = null |
| 215 | + lastReportedFailureLatch = null |
250 | 216 | } |
251 | 217 | } |
252 | 218 |
|
253 | 219 | def runnableAction(f: => Unit): Runnable = |
254 | 220 | new Runnable { def run() = f } |
255 | 221 | } |
256 | 222 |
|
| 223 | +object ExecutorSchedulerSuite { |
| 224 | + private val TestException = DummyException("dummy") |
| 225 | +} |
| 226 | + |
257 | 227 | object ComputationSchedulerSuite extends ExecutorSchedulerSuite { |
258 | 228 | def setup(): SchedulerService = |
259 | 229 | monix.execution.Scheduler |
|
0 commit comments