Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@
.idea
target
*.iml
tools/release
tools/release

*.json
*.log
124 changes: 124 additions & 0 deletions openmessaging-benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# OpenMessaging Benchmark for Apache Fluss

A benchmark suite based on the [OpenMessaging Benchmark](https://openmessaging.cloud/docs/benchmarks/) (OMB) framework for measuring the performance of [Apache Fluss (Incubating)](https://github.com/apache/fluss).

## Project Structure

```
openmessaging-benchmark/
├── driver-api/ # Benchmark driver interface
├── benchmark-framework/ # Core benchmarking engine
├── driver-fluss/ # Fluss driver implementation
│ └── config/fluss.yaml # Fluss driver configuration
├── package/ # Assembly and packaging
├── bin/ # Launcher scripts
├── workloads/ # Workload definitions
├── payload/ # Pre-generated payload data files
└── etc/ # Shared build config (checkstyle, spotbugs, etc.)
```

## Prerequisites

- Java 8+ (Java 11+ recommended for Arrow format)
- Maven 3.8.6+
- A running Apache Fluss cluster

## Build

```bash
cd openmessaging-benchmark
mvn clean package -DskipTests
```

## Usage

### Local Mode

Run the benchmark on a single machine:

```bash
bin/benchmark-local \
-d driver-fluss/config/fluss.yaml \
workloads/1-topic-1-partition-1kb.yaml
```

### Distributed Mode

Start worker processes on each worker node:

```bash
bin/benchmark-worker
```

Then run the benchmark from the driver node:

```bash
bin/benchmark \
-d driver-fluss/config/fluss.yaml \
-w http://worker1:8080,http://worker2:8080 \
workloads/1-topic-16-partitions-1kb.yaml
```

### CLI Options

| Option | Description |
|--------|-------------|
| `-d, --drivers` | Driver config file(s), e.g. `driver-fluss/config/fluss.yaml` |
| `-w, --workers` | Comma-separated worker addresses for distributed mode |
| `-wf, --workers-file` | YAML file containing worker addresses |
| `-o, --output` | Output directory for JSON results |
| `-c, --csv` | Convert JSON results in a directory to CSV |

## Configuration

### Driver Configuration (`driver-fluss/config/fluss.yaml`)

| Parameter | Description | Default |
|-----------|-------------|---------|
| `bootstrapServers` | Fluss cluster address | `localhost:9123` |
| `schema` | Row schema (first field must be `long` for E2E timestamp) | `long-int-int-string-string` |
| `logFormat` | Log format: `ARROW` or `INDEXED` | `ARROW` |
| `writerAcks` | Writer acknowledgment mode | `all` |
| `writerBatchSize` | Writer batch size | `1mb` |
| `writerBufferMemory` | Writer buffer memory | `32mb` |
| `writerBatchTimeoutMs` | Writer batch timeout in milliseconds | `100` |
| `projectFields` | Consumer field projection (`all` or field indices like `0/1/2`) | `all` |
| `prefetchNum` | Consumer prefetch count | `4` |
| `fetchMaxBytes` | Consumer max fetch bytes | `16mb` |
| `clientNettyThreads` | Number of Netty client threads | `1` |

### Workload Configuration (`workloads/`)

| Parameter | Description |
|-----------|-------------|
| `topics` | Number of topics |
| `partitionsPerTopic` | Partitions per topic |
| `messageSize` | Message size in bytes |
| `producersPerTopic` | Number of producers per topic |
| `consumerPerSubscription` | Number of consumers per subscription |
| `producerRate` | Target produce rate (msg/s), `0` for auto-discovery |
| `testDurationMinutes` | Benchmark test duration |
| `warmupDurationMinutes` | Warmup duration before measurement |
| `useRandomizedPayloads` | Whether to use randomized payloads |
| `randomBytesRatio` | Ratio of random bytes in payload |

## Metrics

The benchmark collects and reports the following metrics every 10 seconds:

| Metric | Unit | Description |
|--------|------|-------------|
| Publish Rate | msg/s | Messages published per second |
| Publish Throughput | MB/s | Bytes published per second |
| Consume Rate | msg/s | Messages consumed per second |
| Consume Throughput | MB/s | Bytes consumed per second |
| Publish Latency | ms | Latency from send call to ack (avg, P50, P99, P99.9, max) |
| Publish Delay Latency | us | Delay between intended and actual send time |
| End-to-End Latency | ms | Latency from publish to consume |
| Backlog | count | Unconsumed message count |

Results are written to a JSON file (e.g. `1-topic-1-partition-1kb-Fluss-2026-04-26-20-44-27.json`) containing per-interval time series and aggregated percentiles for all metrics.

## License

Apache License, Version 2.0
147 changes: 147 additions & 0 deletions openmessaging-benchmark/benchmark-framework/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.openmessaging.benchmark</groupId>
<artifactId>messaging-benchmark</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>

<artifactId>benchmark-framework</artifactId>

<properties>
<jetty.version>9.4.42.v20210604</jetty.version>
<omb.root.dir>${project.basedir}/..</omb.root.dir>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>driver-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</dependency>
<dependency>
<groupId>io.javalin</groupId>
<artifactId>javalin</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper.stats</groupId>
<artifactId>prometheus-metrics-provider</artifactId>
<version>${bookkeeper.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>3.0.0</version>
</dependency>
<!-- set jetty version to be consistent across all transitive dependencies -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
<version>2.1.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-lambda</artifactId>
<version>1.2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- MainClass in mainfest make a executable jar -->
<archive>
<manifest>
<mainClass>util.Microseer</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<goals>
<goal>single</goal>
</goals>
<!-- bind to the packaging phase -->
<phase>package</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading