@@ -21,6 +21,7 @@ import minitest.TestSuite
2121import monix .execution .ExecutionModel .{AlwaysAsyncExecution , Default => DefaultExecutionModel }
2222import monix .execution .cancelables .SingleAssignCancelable
2323import monix .execution .exceptions .DummyException
24+ import monix .execution .schedulers .ExecutorSchedulerSuite .TestException
2425import monix .execution .{Cancelable , Scheduler , UncaughtExceptionReporter }
2526
2627import java .util .concurrent .{CountDownLatch , Executors , TimeUnit , TimeoutException }
@@ -152,108 +153,77 @@ abstract class ExecutorSchedulerSuite extends TestSuite[SchedulerService] { self
152153 }
153154
154155 test(" reports errors on execute" ) { scheduler =>
155- val latch = new CountDownLatch (1 )
156- self.synchronized {
157- lastReportedFailure = null
158- lastReportedFailureLatch = latch
159- }
156+ val latch = setupReporterLatch()
160157
161158 try {
162- val ex = DummyException (" dummy" )
163-
164- scheduler.execute(new Runnable {
165- override def run () =
166- throw ex
167- })
159+ scheduler.execute(() => throw TestException )
168160
169- assert(latch.await(15 , TimeUnit .MINUTES ), " lastReportedFailureLatch.await" )
170- self.synchronized (assertEquals(lastReportedFailure, ex))
161+ assertTestExceptionCaught(latch)
171162 } finally {
172- self.synchronized {
173- lastReportedFailure = null
174- lastReportedFailureLatch = null
175- }
163+ clearReporterLatch()
176164 }
177165 }
178166
179167 test(" reports errors on scheduleOnce" ) { scheduler =>
180- val latch = new CountDownLatch (1 )
181- self.synchronized {
182- lastReportedFailure = null
183- lastReportedFailureLatch = latch
184- }
185-
186- try {
187- val ex = DummyException (" dummy" )
168+ testScheduledErrorReporting(
169+ scheduleFailure = () => scheduler.scheduleOnce(1 .milli)(throw TestException )
170+ )
171+ }
188172
189- scheduler.scheduleOnce(
190- 1 ,
191- TimeUnit . MILLISECONDS ,
192- () => throw ex,
193- )
173+ test( " reports errors on scheduleAtFixedRate " ) { scheduler =>
174+ testScheduledErrorReporting(
175+ scheduleFailure = () => scheduler.scheduleAtFixedRate( 0 .seconds, 1 .second)( throw TestException )
176+ )
177+ }
194178
195- assert(latch.await(15 , TimeUnit .MINUTES ), " lastReportedFailureLatch.await" )
196- self.synchronized (assertEquals(lastReportedFailure, ex))
197- } finally {
198- self.synchronized {
199- lastReportedFailure = null
200- lastReportedFailureLatch = null
201- }
202- }
179+ test(" reports errors on scheduleWithFixedDelay" ) { scheduler =>
180+ testScheduledErrorReporting(
181+ scheduleFailure = () => scheduler.scheduleWithFixedDelay(0 .seconds, 1 .second)(throw TestException ),
182+ )
203183 }
204184
205- test(" reports errors on scheduleAtFixedRate" ) { scheduler =>
206- val latch = new CountDownLatch (1 )
207- self.synchronized {
208- lastReportedFailure = null
209- lastReportedFailureLatch = latch
210- }
185+ private def testScheduledErrorReporting (scheduleFailure : () => Cancelable ): Unit = {
186+ val latch = setupReporterLatch()
211187
212- val ex = DummyException (" dummy" )
213- val schedule = scheduler.scheduleAtFixedRate(0 .seconds, 1 .second) {
214- throw ex
215- }
188+ val schedule = scheduleFailure()
216189
217190 try {
218- assert(latch.await(15 , TimeUnit .MINUTES ), " lastReportedFailureLatch.await" )
219- self.synchronized (assertEquals(lastReportedFailure, ex))
191+ assertTestExceptionCaught(latch)
220192 } finally {
221193 schedule.cancel()
222- self.synchronized {
223- lastReportedFailure = null
224- lastReportedFailureLatch = null
225- }
194+ clearReporterLatch()
226195 }
227196 }
228197
229- test( " reports errors on scheduleWithFixedDelay " ) { scheduler =>
198+ private def setupReporterLatch () = {
230199 val latch = new CountDownLatch (1 )
231200 self.synchronized {
232201 lastReportedFailure = null
233202 lastReportedFailureLatch = latch
234203 }
204+ latch
205+ }
235206
236- val ex = DummyException ( " dummy " )
237- val schedule = scheduler.scheduleWithFixedDelay( 0 .seconds, 1 .second) {
238- throw ex
239- }
207+ private def assertTestExceptionCaught ( latch : CountDownLatch ) : Unit = {
208+ assert(latch.await( 15 , TimeUnit . MINUTES ), " lastReportedFailureLatch.await " )
209+ self. synchronized (assertEquals(lastReportedFailure, TestException ))
210+ }
240211
241- try {
242- assert(latch.await(15 , TimeUnit .MINUTES ), " lastReportedFailureLatch.await" )
243- self.synchronized (assertEquals(lastReportedFailure, ex))
244- } finally {
245- schedule.cancel()
246- self.synchronized {
247- lastReportedFailure = null
248- lastReportedFailureLatch = null
249- }
212+ private def clearReporterLatch (): Unit = {
213+ self.synchronized {
214+ lastReportedFailure = null
215+ lastReportedFailureLatch = null
250216 }
251217 }
252218
253219 def runnableAction (f : => Unit ): Runnable =
254220 new Runnable { def run () = f }
255221}
256222
223+ object ExecutorSchedulerSuite {
224+ private val TestException = DummyException (" dummy" )
225+ }
226+
257227object ComputationSchedulerSuite extends ExecutorSchedulerSuite {
258228 def setup (): SchedulerService =
259229 monix.execution.Scheduler
0 commit comments