-
Notifications
You must be signed in to change notification settings - Fork 337
Add MpscRingBuffer primitive for pre-allocated slot rings #11492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
1d12490
Add MpscRingBuffer for pre-allocated, recyclable slot rings
dougqh 16b2ec6
Add throughput benchmarks for MpscRingBuffer
dougqh e67dde7
Add RingVsQueueBenchmark for MpscRingBuffer vs MpscArrayQueue
dougqh cec6abf
Document MpscRingBuffer thread-safety contract; publish on filler throw
dougqh 9162a1e
Use AtomicLongFieldUpdater for the producer cursor
dougqh e8ead01
Pad MpscRingBuffer cursors and stride publishedSequences
dougqh 7e6f497
Add MpscRingBuffer.tryClaim(n) batch-claim API
dougqh 780a204
Add low-level tryClaimRange/slotAt/publish primitives to MpscRingBuffer
dougqh 348ca2a
Merge branch 'master' into dougqh/ring-buffer
dougqh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
191 changes: 191 additions & 0 deletions
191
dd-trace-core/src/jmh/java/datadog/trace/util/concurrent/RingVsQueueBenchmark.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,191 @@ | ||
| package datadog.trace.util.concurrent; | ||
|
|
||
| import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
|
||
| import java.util.function.BiConsumer; | ||
| import org.jctools.queues.MpscArrayQueue; | ||
| import org.openjdk.jmh.annotations.Benchmark; | ||
| import org.openjdk.jmh.annotations.BenchmarkMode; | ||
| import org.openjdk.jmh.annotations.Fork; | ||
| import org.openjdk.jmh.annotations.Group; | ||
| import org.openjdk.jmh.annotations.GroupThreads; | ||
| import org.openjdk.jmh.annotations.Level; | ||
| import org.openjdk.jmh.annotations.Measurement; | ||
| import org.openjdk.jmh.annotations.Mode; | ||
| import org.openjdk.jmh.annotations.OutputTimeUnit; | ||
| import org.openjdk.jmh.annotations.Param; | ||
| import org.openjdk.jmh.annotations.Scope; | ||
| import org.openjdk.jmh.annotations.Setup; | ||
| import org.openjdk.jmh.annotations.State; | ||
| import org.openjdk.jmh.annotations.TearDown; | ||
| import org.openjdk.jmh.annotations.Threads; | ||
| import org.openjdk.jmh.annotations.Warmup; | ||
| import org.openjdk.jmh.infra.Blackhole; | ||
|
|
||
| /** | ||
| * Head-to-head comparison of {@link MpscRingBuffer} (mutable pre-allocated slots) against the | ||
| * conventional approach of a jctools {@link MpscArrayQueue} with a fresh {@code Slot} allocated per | ||
| * publish. The latter is the pattern the current CSS code uses for {@code SpanSnapshot} on the | ||
| * producer side, so the delta between the two measures the actual allocation/handoff saving of the | ||
| * ring-buffer rewrite. | ||
| * | ||
| * <ul> | ||
| * <li>{@code write_*_8p} — 8 producers, background drainer keeps the structure empty so the | ||
| * measurement reflects publish cost, not full-structure drop cost. Pair-compare ring vs queue | ||
| * at matched capacity. | ||
| * <li>{@code e2e_*_8p} — JMH {@code @Group} pairing 8 producers with 1 consumer for each | ||
| * structure. End-to-end ops/s under realistic backpressure. | ||
| * </ul> | ||
| * | ||
| * <p>Run with {@code -prof gc} to also see per-op allocation rate — that's where the ring's win is | ||
| * loudest, since the queue allocates one {@code Slot} per publish and the ring allocates none. | ||
| */ | ||
| @State(Scope.Benchmark) | ||
| @Warmup(iterations = 2, time = 15, timeUnit = SECONDS) | ||
| @Measurement(iterations = 5, time = 15, timeUnit = SECONDS) | ||
| @BenchmarkMode(Mode.Throughput) | ||
| @OutputTimeUnit(SECONDS) | ||
| @Fork(value = 1) | ||
| public class RingVsQueueBenchmark { | ||
|
|
||
| /** | ||
| * Shared slot type. {@code MpscRingBuffer} pre-allocates these and the producer mutates in place; | ||
| * the queue path allocates a fresh one per publish and offers the reference. Two constructors so | ||
| * both styles read naturally. | ||
| */ | ||
| public static final class Slot { | ||
| long value; | ||
|
|
||
| Slot() {} | ||
|
|
||
| Slot(final long value) { | ||
| this.value = value; | ||
| } | ||
| } | ||
|
|
||
| // Static (non-capturing) handlers. Passing ts/bh as context lets the JIT keep these as | ||
| // singleton functions and avoid per-call lambda allocation. | ||
| private static final BiConsumer<ThreadState, Slot> RING_FILLER = | ||
| (ts, slot) -> { | ||
| slot.value = ts.counter; | ||
| ts.counter++; | ||
| }; | ||
|
|
||
| private static final BiConsumer<Blackhole, Slot> RING_CONSUMER = | ||
| (bh, slot) -> bh.consume(slot.value); | ||
|
|
||
| @Param({"1024"}) | ||
| public int capacity; | ||
|
|
||
| /** Write-side benchmark structures. Drained by a background thread so they never fill. */ | ||
| MpscRingBuffer<Slot> ring; | ||
|
|
||
| MpscArrayQueue<Slot> queue; | ||
|
|
||
| /** E2e benchmark structures. JMH drives both sides via {@code @Group}; no background drainer. */ | ||
| MpscRingBuffer<Slot> e2eRing; | ||
|
|
||
| MpscArrayQueue<Slot> e2eQueue; | ||
|
|
||
| private volatile boolean stopDrainers; | ||
| private Thread ringDrainer; | ||
| private Thread queueDrainer; | ||
|
|
||
| @Setup(Level.Trial) | ||
| public void setup() { | ||
| ring = new MpscRingBuffer<>(Slot::new, capacity); | ||
| queue = new MpscArrayQueue<>(capacity); | ||
| e2eRing = new MpscRingBuffer<>(Slot::new, capacity); | ||
| e2eQueue = new MpscArrayQueue<>(capacity); | ||
|
|
||
| stopDrainers = false; | ||
| ringDrainer = | ||
| new Thread( | ||
| () -> { | ||
| while (!stopDrainers) { | ||
| if (ring.drain((Slot s) -> {}) == 0) Thread.yield(); | ||
| } | ||
| }, | ||
| "RingVsQueueBenchmark-ringDrainer"); | ||
| ringDrainer.setDaemon(true); | ||
| ringDrainer.start(); | ||
|
|
||
| queueDrainer = | ||
| new Thread( | ||
| () -> { | ||
| while (!stopDrainers) { | ||
| Slot s = queue.poll(); | ||
| if (s == null) Thread.yield(); | ||
| } | ||
| }, | ||
| "RingVsQueueBenchmark-queueDrainer"); | ||
| queueDrainer.setDaemon(true); | ||
| queueDrainer.start(); | ||
| } | ||
|
|
||
| @TearDown(Level.Trial) | ||
| public void teardown() throws InterruptedException { | ||
| stopDrainers = true; | ||
| ringDrainer.join(5_000); | ||
| queueDrainer.join(5_000); | ||
| } | ||
|
|
||
| @State(Scope.Thread) | ||
| public static class ThreadState { | ||
| long counter; | ||
| } | ||
|
|
||
| // ============ Write-side throughput ============ | ||
|
|
||
| @Threads(8) | ||
| @Benchmark | ||
| public boolean write_ring_8p(final ThreadState ts) { | ||
| return ring.tryWrite(ts, RING_FILLER); | ||
| } | ||
|
|
||
| /** Mirrors the SpanSnapshot pattern: allocate a fresh instance per publish, offer it. */ | ||
| @Threads(8) | ||
| @Benchmark | ||
| public boolean write_queue_8p(final ThreadState ts) { | ||
| return queue.offer(new Slot(ts.counter++)); | ||
| } | ||
|
|
||
| // ============ End-to-end producer/consumer ============ | ||
|
|
||
| @Group("e2e_ring_8p") | ||
| @GroupThreads(8) | ||
| @Benchmark | ||
| public boolean e2e_ring_producer(final ThreadState ts) { | ||
| return e2eRing.tryWrite(ts, RING_FILLER); | ||
| } | ||
|
|
||
| @Group("e2e_ring_8p") | ||
| @GroupThreads(1) | ||
| @Benchmark | ||
| public int e2e_ring_consumer(final Blackhole bh) { | ||
| int drained = e2eRing.drain(bh, RING_CONSUMER); | ||
| if (drained == 0) Thread.yield(); | ||
| return drained; | ||
| } | ||
|
|
||
| @Group("e2e_queue_8p") | ||
| @GroupThreads(8) | ||
| @Benchmark | ||
| public boolean e2e_queue_producer(final ThreadState ts) { | ||
| return e2eQueue.offer(new Slot(ts.counter++)); | ||
| } | ||
|
|
||
| @Group("e2e_queue_8p") | ||
| @GroupThreads(1) | ||
| @Benchmark | ||
| public int e2e_queue_consumer(final Blackhole bh) { | ||
| int drained = 0; | ||
| Slot slot; | ||
| while ((slot = e2eQueue.poll()) != null) { | ||
| bh.consume(slot.value); | ||
| drained++; | ||
| } | ||
| if (drained == 0) Thread.yield(); | ||
| return drained; | ||
| } | ||
| } | ||
146 changes: 146 additions & 0 deletions
146
internal-api/src/jmh/java/datadog/trace/util/concurrent/MpscRingBufferBenchmark.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,146 @@ | ||
| package datadog.trace.util.concurrent; | ||
|
|
||
| import static java.util.concurrent.TimeUnit.SECONDS; | ||
|
|
||
| import java.util.function.BiConsumer; | ||
| import org.openjdk.jmh.annotations.Benchmark; | ||
| import org.openjdk.jmh.annotations.BenchmarkMode; | ||
| import org.openjdk.jmh.annotations.Fork; | ||
| import org.openjdk.jmh.annotations.Group; | ||
| import org.openjdk.jmh.annotations.GroupThreads; | ||
| import org.openjdk.jmh.annotations.Level; | ||
| import org.openjdk.jmh.annotations.Measurement; | ||
| import org.openjdk.jmh.annotations.Mode; | ||
| import org.openjdk.jmh.annotations.OutputTimeUnit; | ||
| import org.openjdk.jmh.annotations.Param; | ||
| import org.openjdk.jmh.annotations.Scope; | ||
| import org.openjdk.jmh.annotations.Setup; | ||
| import org.openjdk.jmh.annotations.State; | ||
| import org.openjdk.jmh.annotations.TearDown; | ||
| import org.openjdk.jmh.annotations.Threads; | ||
| import org.openjdk.jmh.annotations.Warmup; | ||
| import org.openjdk.jmh.infra.Blackhole; | ||
|
|
||
| /** | ||
| * Throughput benchmarks for {@link MpscRingBuffer}. | ||
| * | ||
| * <ul> | ||
| * <li>{@code write_1p / write_8p / write_16p} — producer-side throughput with a background | ||
| * drainer consuming what's published. Measures the cost of one {@code tryWrite} including CAS | ||
| * contention on the producer cursor at the given thread count. | ||
| * <li>{@code e2e_8p} — JMH {@code @Group} pairing 8 producers with 1 consumer. Aggregate | ||
| * throughput reflects whichever side is the bottleneck under realistic pressure. | ||
| * </ul> | ||
| * | ||
| * <p>Run with {@code -p capacity=...} to override the default ring capacity. | ||
| */ | ||
| @State(Scope.Benchmark) | ||
| @Warmup(iterations = 2, time = 15, timeUnit = SECONDS) | ||
| @Measurement(iterations = 5, time = 15, timeUnit = SECONDS) | ||
| @BenchmarkMode(Mode.Throughput) | ||
| @OutputTimeUnit(SECONDS) | ||
| @Fork(value = 1) | ||
| public class MpscRingBufferBenchmark { | ||
|
|
||
| /** | ||
| * Static filler so the lambda is non-capturing and the JIT can hoist it past the {@code tryWrite} | ||
| * call. Context arg comes first, slot last — matches {@code TagMap.forEach} convention. | ||
| */ | ||
| private static final BiConsumer<Long, Slot> FILLER = (v, slot) -> slot.value = v; | ||
|
|
||
| /** Mutable slot. Replicates the per-publish allocation a real producer wants to avoid. */ | ||
| public static final class Slot { | ||
| long value; | ||
| } | ||
|
|
||
| @Param({"1024"}) | ||
| public int capacity; | ||
|
|
||
| /** | ||
| * Shared ring for the {@code write_*} benches. A background drainer keeps space available so | ||
| * producer benchmarks measure write throughput rather than full-ring drop throughput. | ||
| */ | ||
| MpscRingBuffer<Slot> ring; | ||
|
|
||
| private volatile boolean stopDrainer; | ||
| private Thread drainerThread; | ||
|
|
||
| /** | ||
| * Separate ring for the {@code e2e_*} group benches. JMH drives both sides directly so we don't | ||
| * want our own background drainer for those. | ||
| */ | ||
| MpscRingBuffer<Slot> e2eRing; | ||
|
|
||
| @Setup(Level.Trial) | ||
| public void setup() { | ||
| ring = new MpscRingBuffer<>(Slot::new, capacity); | ||
| e2eRing = new MpscRingBuffer<>(Slot::new, capacity); | ||
| stopDrainer = false; | ||
| drainerThread = | ||
| new Thread( | ||
| () -> { | ||
| while (!stopDrainer) { | ||
| if (ring.drain((Slot s) -> {}) == 0) Thread.yield(); | ||
| } | ||
| }, | ||
| "MpscRingBufferBenchmark-drainer"); | ||
| drainerThread.setDaemon(true); | ||
| drainerThread.start(); | ||
| } | ||
|
|
||
| @TearDown(Level.Trial) | ||
| public void teardown() throws InterruptedException { | ||
| stopDrainer = true; | ||
| drainerThread.join(5_000); | ||
| } | ||
|
|
||
| @State(Scope.Thread) | ||
| public static class ThreadState { | ||
| long counter; | ||
| } | ||
|
|
||
| // ============ Write throughput with background drainer ============ | ||
|
|
||
| @Threads(1) | ||
| @Benchmark | ||
| public boolean write_1p(ThreadState ts) { | ||
| return ring.tryWrite(ts.counter++, FILLER); | ||
| } | ||
|
|
||
| @Threads(8) | ||
| @Benchmark | ||
| public boolean write_8p(ThreadState ts) { | ||
| return ring.tryWrite(ts.counter++, FILLER); | ||
| } | ||
|
|
||
| @Threads(16) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: Does it make sense to have higher producer threads ? |
||
| @Benchmark | ||
| public boolean write_16p(ThreadState ts) { | ||
| return ring.tryWrite(ts.counter++, FILLER); | ||
| } | ||
|
|
||
| // ============ End-to-end producer/consumer pairing ============ | ||
| // | ||
| // JMH runs both methods in the same trial: 8 producer threads + 1 consumer thread. Throughput | ||
| // is reported as ops/sec aggregated across all 9 threads, but the consumer's drain count | ||
| // dwarfs the producer ops since one call processes many slots -- in practice the bottleneck | ||
| // is the producer side (CAS contention), and that's what the number reflects. | ||
|
|
||
| private static final BiConsumer<Blackhole, Slot> CONSUMER = (bh, slot) -> bh.consume(slot.value); | ||
|
|
||
| @Group("e2e_8p") | ||
| @GroupThreads(8) | ||
| @Benchmark | ||
| public boolean e2e_producer(ThreadState ts) { | ||
| return e2eRing.tryWrite(ts.counter++, FILLER); | ||
| } | ||
|
|
||
| @Group("e2e_8p") | ||
| @GroupThreads(1) | ||
| @Benchmark | ||
| public int e2e_consumer(Blackhole bh) { | ||
| int drained = e2eRing.drain(bh, CONSUMER); | ||
| if (drained == 0) Thread.yield(); | ||
| return drained; | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Do 8 seems low for producers threads, or is enough to compare both
MpscRingBufferandMpscArrayQueue?