Skip to content

Commit 0c827fd

Browse files
mikkolajMikołaj Bul
andauthored
Fix unsafe modification of Trampoline resources (#3)
* execute remaining tasks on resuming thread's trampoline * don't pretend to fork when using immediate trampoline execution context * fix benchmarks compilation --------- Co-authored-by: Mikołaj Bul <m.bul@avsystem.com>
1 parent a8c89db commit 0c827fd

10 files changed

Lines changed: 305 additions & 85 deletions

File tree

benchmarks/shared/src/main/scala/monix/benchmarks/ChunkedEvalFilterMapSumBenchmark.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package monix.benchmarks
1919

20-
import java.util.concurrent.TimeUnit
21-
2220
import akka.actor.ActorSystem
2321
import akka.stream.scaladsl.{Keep, RunnableGraph, Sink => AkkaSink, Source => AkkaSource}
2422
import fs2.{Stream => FS2Stream}
@@ -28,7 +26,7 @@ import org.openjdk.jmh.annotations._
2826
import zio.stream.{Stream => ZStream}
2927
import zio.{Chunk, UIO}
3028

31-
import scala.collection.immutable.IndexedSeq
29+
import java.util.concurrent.TimeUnit
3230
import scala.concurrent.duration.Duration
3331
import scala.concurrent.{Await, Future}
3432

@@ -82,8 +80,8 @@ class ChunkedEvalFilterMapSumBenchmark {
8280
@Setup
8381
def setup(): Unit = {
8482
chunks = (1 to chunkCount).map(i => Array.fill(chunkSize)(i))
85-
fs2Chunks = chunks.map(fs2.Chunk.array)
86-
zioChunks = chunks.map(zio.Chunk.fromArray)
83+
fs2Chunks = chunks.map(fs2.Chunk.array[Int])
84+
zioChunks = chunks.map(zio.Chunk.fromArray[Int])
8785
allElements = chunks.flatten
8886
expectedSum = allElements.map(_.toLong).sum
8987
}

benchmarks/shared/src/main/scala/monix/benchmarks/ChunkedMapFilterSumBenchmark.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package monix.benchmarks
1919

20-
import java.util.concurrent.TimeUnit
21-
2220
import akka.actor.ActorSystem
2321
import akka.stream.scaladsl.{Keep, Sink => AkkaSink, Source => AkkaSource}
2422
import fs2.{Stream => FS2Stream}
@@ -29,7 +27,7 @@ import monix.reactive.observers.Subscriber
2927
import org.openjdk.jmh.annotations._
3028
import zio.stream.{Stream => ZStream}
3129

32-
import scala.collection.immutable.IndexedSeq
30+
import java.util.concurrent.TimeUnit
3331
import scala.concurrent.duration.Duration
3432
import scala.concurrent.{Await, Promise}
3533

@@ -68,8 +66,8 @@ class ChunkedMapFilterSumBenchmark {
6866
@Setup
6967
def setup(): Unit = {
7068
chunks = (1 to chunkCount).map(i => Array.fill(chunkSize)(i))
71-
fs2Chunks = chunks.map(fs2.Chunk.array)
72-
zioChunks = chunks.map(zio.Chunk.fromArray)
69+
fs2Chunks = chunks.map(fs2.Chunk.array[Int])
70+
zioChunks = chunks.map(zio.Chunk.fromArray[Int])
7371
allElements = chunks.flatten
7472
allElementsVector = allElements.toVector
7573
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (c) 2014-2021 by The Monix Project Developers.
3+
* See the project homepage at: https://monix.io
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package monix.benchmarks
19+
20+
import monix.execution.schedulers.TrampolineExecutionContext
21+
import org.openjdk.jmh.annotations._
22+
23+
import java.util.concurrent.TimeUnit
24+
import scala.concurrent.blocking
25+
26+
/** To do comparative benchmarks between versions:
27+
*
28+
* benchmarks/run-benchmark TrampolineExecutionContextBenchmark
29+
*
30+
* This will generate results in `benchmarks/results`.
31+
*
32+
* Or to run the benchmark from within SBT:
33+
*
34+
* jmh:run monix.benchmarks.TrampolineExecutionContextBenchmark
35+
* The above test will take default values as "10 iterations", "10 warm-up iterations",
36+
* "2 forks", "1 thread".
37+
*
38+
* Or to specify custom values use below format:
39+
*
40+
* jmh:run -i 20 -wi 20 -f 4 -t 2 monix.benchmarks.TrampolineExecutionContextBenchmark
41+
*
42+
* Which means "20 iterations", "20 warm-up iterations", "4 forks", "2 thread".
43+
* Please note that benchmarks should be usually executed at least in
44+
* 10 iterations (as a rule of thumb), but more is better.
45+
*/
46+
@State(Scope.Thread)
47+
@BenchmarkMode(Array(Mode.Throughput))
48+
@OutputTimeUnit(TimeUnit.SECONDS)
49+
@Measurement(iterations = 10)
50+
@Warmup(iterations = 10)
51+
@Fork(2)
52+
@Threads(1)
53+
class TrampolineExecutionContextBenchmark {
54+
@Param(Array("3000"))
55+
var size: Int = _
56+
57+
@Benchmark
58+
def immediateBlocking(): Long = {
59+
var sum = 0L
60+
val context = TrampolineExecutionContext.immediate
61+
62+
context.execute { () =>
63+
var i = size
64+
while (i > size / 2) {
65+
context.execute(() => sum += i)
66+
i -= 1
67+
}
68+
blocking {
69+
while (i > 0) {
70+
context.execute(() => sum += i)
71+
i -= 1
72+
}
73+
}
74+
}
75+
sum
76+
}
77+
}

benchmarks/shared/src/main/scala/monix/benchmarks/TrampolinedRunnableBenchmark.scala

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@ import org.openjdk.jmh.annotations._
2929
*
3030
* Or to run the benchmark from within SBT:
3131
*
32-
* jmh:run monix.benchmarks.TaskShiftBenchmark
32+
* jmh:run monix.benchmarks.TrampolinedRunnableBenchmark
3333
* The above test will take default values as "10 iterations", "10 warm-up iterations",
3434
* "2 forks", "1 thread".
3535
*
3636
* Or to specify custom values use below format:
3737
*
38-
* jmh:run -i 20 -wi 20 -f 4 -t 2 monix.benchmarks.TaskShiftBenchmark
38+
* jmh:run -i 20 -wi 20 -f 4 -t 2 monix.benchmarks.TrampolinedRunnableBenchmark
3939
*
4040
* Which means "20 iterations", "20 warm-up iterations", "4 forks", "2 thread".
4141
* Please note that benchmarks should be usually executed at least in
@@ -66,17 +66,17 @@ class TrampolinedRunnableBenchmark {
6666
sum
6767
}
6868

69-
// @Benchmark
70-
// def deep(): Long = {
71-
// var sum = 0L
72-
//
73-
// global.executeTrampolined { () =>
74-
// var i = size
75-
// while (i > 0) {
76-
// global.executeTrampolined(() => sum += i)
77-
// i += 1
78-
// }
79-
// }
80-
// sum
81-
// }
69+
@Benchmark
70+
def deep(): Long = {
71+
var sum = 0L
72+
73+
global.executeTrampolined { () =>
74+
var i = size
75+
while (i > 0) {
76+
global.executeTrampolined(() => sum += i)
77+
i -= 1
78+
}
79+
}
80+
sum
81+
}
8282
}

monix-execution/js/src/main/scala/monix/execution/schedulers/TrampolineExecutionContext.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package monix.execution.schedulers
1919

2020
import monix.execution.internal.Trampoline
21+
import monix.execution.internal.Trampoline.{ForkingTrampolineEC, ImmediateTrampolineEC, TrampolineEC}
2122

2223
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
2324

@@ -51,7 +52,7 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
5152
* @param underlying is the `ExecutionContext` to which the it defers
5253
* to in case real asynchronous is needed
5354
*/
54-
final class TrampolineExecutionContext private (underlying: ExecutionContext) extends ExecutionContextExecutor {
55+
final class TrampolineExecutionContext private (underlying: TrampolineEC) extends ExecutionContextExecutor {
5556

5657
private[this] val trampoline = new Trampoline
5758

@@ -69,7 +70,7 @@ object TrampolineExecutionContext {
6970
* is needed
7071
*/
7172
def apply(underlying: ExecutionContext): TrampolineExecutionContext =
72-
new TrampolineExecutionContext(underlying)
73+
new TrampolineExecutionContext(new ForkingTrampolineEC(underlying))
7374

7475
/** [[TrampolineExecutionContext]] instance that executes everything
7576
* immediately, on the current thread.
@@ -85,8 +86,5 @@ object TrampolineExecutionContext {
8586
* have no way to override it)
8687
*/
8788
val immediate: TrampolineExecutionContext =
88-
TrampolineExecutionContext(new ExecutionContext {
89-
def execute(r: Runnable): Unit = r.run()
90-
def reportFailure(e: Throwable): Unit = throw e
91-
})
89+
new TrampolineExecutionContext(ImmediateTrampolineEC)
9290
}

monix-execution/jvm/src/main/scala/monix/execution/schedulers/TrampolineExecutionContext.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
package monix.execution.schedulers
1919

2020
import monix.execution.internal.Trampoline
21+
import monix.execution.internal.Trampoline.{ForkingTrampolineEC, ImmediateTrampolineEC, TrampolineEC}
2122

22-
import scala.util.control.NonFatal
2323
import scala.concurrent.{BlockContext, ExecutionContext, ExecutionContextExecutor}
24+
import scala.util.control.NonFatal
2425

2526
/** A `scala.concurrentExecutionContext` implementation
2627
* that executes runnables immediately, on the current thread,
@@ -52,7 +53,7 @@ import scala.concurrent.{BlockContext, ExecutionContext, ExecutionContextExecuto
5253
* @param underlying is the `ExecutionContext` to which the it defers
5354
* to in case real asynchronous is needed
5455
*/
55-
final class TrampolineExecutionContext private (underlying: ExecutionContext) extends ExecutionContextExecutor {
56+
final class TrampolineExecutionContext private (underlying: TrampolineEC) extends ExecutionContextExecutor {
5657
override def execute(runnable: Runnable): Unit =
5758
TrampolineExecutionContext.trampoline.get().execute(runnable, underlying)
5859
override def reportFailure(t: Throwable): Unit =
@@ -67,7 +68,7 @@ object TrampolineExecutionContext {
6768
* is needed
6869
*/
6970
def apply(underlying: ExecutionContext): TrampolineExecutionContext =
70-
new TrampolineExecutionContext(underlying)
71+
new TrampolineExecutionContext(new ForkingTrampolineEC(underlying))
7172

7273
/** [[TrampolineExecutionContext]] instance that executes everything
7374
* immediately, on the current thread.
@@ -83,10 +84,7 @@ object TrampolineExecutionContext {
8384
* have no way to override it)
8485
*/
8586
val immediate: TrampolineExecutionContext =
86-
TrampolineExecutionContext(new ExecutionContext {
87-
def execute(r: Runnable): Unit = r.run()
88-
def reportFailure(e: Throwable): Unit = throw e
89-
})
87+
new TrampolineExecutionContext(ImmediateTrampolineEC)
9088

9189
/** Returns the `localContext`, allowing us to bypass calling
9290
* `BlockContext.withBlockContext`, as an optimization trick.
@@ -124,8 +122,8 @@ object TrampolineExecutionContext {
124122
new JVMNormalTrampoline()
125123
}
126124

127-
private final class JVMOptimalTrampoline extends Trampoline {
128-
override def startLoop(runnable: Runnable, ec: ExecutionContext): Unit = {
125+
private final class JVMOptimalTrampoline extends Trampoline(fallbackTrampoline = Some(() => trampoline.get())) {
126+
override def startLoop(runnable: Runnable, ec: TrampolineEC): Unit = {
129127
val parentContext = localContext.get()
130128
localContext.set(trampolineContext(parentContext, ec))
131129
try {
@@ -136,8 +134,8 @@ object TrampolineExecutionContext {
136134
}
137135
}
138136

139-
private class JVMNormalTrampoline extends Trampoline {
140-
override def startLoop(runnable: Runnable, ec: ExecutionContext): Unit = {
137+
private class JVMNormalTrampoline extends Trampoline(fallbackTrampoline = Some(() => trampoline.get())) {
138+
override def startLoop(runnable: Runnable, ec: TrampolineEC): Unit = {
141139
BlockContext.withBlockContext(trampolineContext(BlockContext.current, ec)) {
142140
super.startLoop(runnable, ec)
143141
}

0 commit comments

Comments
 (0)