Skip to content

End to End IOT Platform benchmark that can handle upto 2 million rows/sec#8

Open
hyperscaledesignhub wants to merge 4 commits into
apache:mainfrom
hyperscaledesignhub:e2e-benchmark-aws
Open

End to End IOT Platform benchmark that can handle upto 2 million rows/sec#8
hyperscaledesignhub wants to merge 4 commits into
apache:mainfrom
hyperscaledesignhub:e2e-benchmark-aws

Conversation

@hyperscaledesignhub
Copy link
Copy Markdown

Purpose

Linked issue: NA

This pull request is for creating end to end fluss benchmark for IOT case where complete realtime IOT system that is inserting 2 million rows/sec into apache fluss and flink job is consuming and aggregating the data from fluss in realtime

Brief change log

complete end to end system. Which can be deployed in AWS and create the platform and stress the platform with 2 million rows/sec

Tests

NA

API and Format

NA

Documentation

NA

securityContext:
runAsUser: 0 # Run as root to allow writing to /etc/hosts
command:
- /app/entrypoint.sh
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This image copies the wrapper to /opt/flink/bin/entrypoint.sh and the jar to /opt/flink/usrlib/fluss-flink-realtime-demo.jar, but this Job still executes /app/entrypoint.sh with /app/fluss-flink-realtime-demo.jar. In the deploy_k8s_jobs.sh flow the producer pod may fails immediately with no such file or directory before Java starts; k8s-flink-aggregator-job.yaml has the same mismatch.

# limitations under the License.
#

#!/bin/bash
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Dockerfile uses exec-form ENTRYPOINT ["/opt/flink/bin/entrypoint.sh"], this file needs #! in its first two bytes. With the ASF header before the shebang, running the image without overriding command returns exec format error instead of launching Java.

long currentTotal = totalSent.sum();
metrics.recordWrite();

if (currentTotal % 10 == 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the default --rate 200000, this logs about 20,000 INFO lines per second per producer instance because every tenth record emits a message. With slf4j-simple writing synchronously to stdout, that I/O dominates CPU and makes the benchmark throughput numbers unusable. I think we can change this to a time interval logging, or change to a DEBUG log level by default.

long currentTime = System.currentTimeMillis();
long total = totalRecords.sum();
long elapsedSeconds = (currentTime - startTime.get()) / 1000;
long windowRecords = lastStatsRecords.get();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateStats() stores the cumulative record count, but /metrics divides that total by only the time since the last update. After the first stats window, fluss_producer_records_per_second_window keeps drifting upward even if the actual short-term rate is flat or falling. This needs to use a delta count for the current window.

System.out.println("Buckets: " + bucketCount);
System.out.println("========================================");

if (bucketCount == 48) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helper exits non-zero for anything other than 48 buckets, but the high-infra scripts and docs in this branch create sensor_readings with 128 buckets by default. Running check-table-buckets.sh against the documented deployment will therefore flag a healthy table as broken.


SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# Navigate to project root (3 levels up from this script)
WORKDIR=$(cd "${SCRIPT_DIR}/../../.." && pwd)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From e2e-iot/fluss_flink_realtime, ../../.. resolves to the parent of fluss-benchmarks, so this script looks for demos/demo/fluss_flink_realtime_demo outside the repository. In the current layout that path does not exist, so run_kind_demo.sh cannot build or find the jar before it starts the cluster; test-local.sh and e2e-iot/high-infra/k8s/jobs/check-table-buckets.sh still use the same stale path.

### Step 1: Build the JAR

```bash
cd /Users/vijayabhaskarv/IOT/FLUSS
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many places still using the speicifc directory /Users/vijayabhaskarv/, could you change that to a more general directory like ~/

## Prerequisites

1. **Maven** - For building the JAR
2. **Fluss 0.8.0** - Extracted to `demos/demo/deploy_local_kind_fluss/fluss-0.8.0-incubating/`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fluss have official released 0.9, could you upgrade it to 0.9.0?

@wuchong
Copy link
Copy Markdown
Member

wuchong commented Mar 19, 2026

Thanks for the giant contribution and sorry for the late reviewing. I left some comments below.

Comment on lines +118 to +122
System.out.println("Starting JDBC Flink IoT Consumer with AVRO Support and 1-Minute Aggregation...");
System.out.println("Pulsar URL: " + pulsarUrl);
System.out.println("Pulsar Admin URL: " + pulsarAdminUrl);
System.out.println("Consuming Topic: " + baseTopicName + " (all partitions)");
System.out.println("ClickHouse URL: " + clickhouseUrl);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this class actually being used? I haven't seen any references to it, and our benchmarks don't seem to include any joint tests with Pulsar or ClickHouse.

#

apiVersion: v2
appVersion: 0.8.0-incubating
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this helm charts is copied from apache/fluss project. Can we just to reuse the helm charts to avoid duplicate work?

@wuchong
Copy link
Copy Markdown
Member

wuchong commented Mar 30, 2026

Hi @hyperscaledesignhub , do you have time to update this? Otherwise, I can help it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants