diff --git a/CHANGES.next.md b/CHANGES.next.md index 794d171b8d..0ae2e1ae22 100644 --- a/CHANGES.next.md +++ b/CHANGES.next.md @@ -283,6 +283,7 @@ - Add support for Alma Linux 8, 9, and 10 for the Azure provider. - Re-enable support for Rocky Linux 8, 9, and 10 for the Azure provider. - Add Ubuntu 26.04 support for GCP, AWS, and Azure Providers. +- Add neper_benchmark for testing highly concurrent networking workloads. ### Enhancements: @@ -463,6 +464,8 @@ creation can be retried on stock outs. - Add support for deploying VMs inside managed VM groups with `--use_managed_vm_groups`. +- Add Receive Flow Steering (RFS) support to Neper, Nginx, and Redis + benchmarks with universal --rps_sock_flow_entries and --rps_flow_cnt flags. ### Bug fixes and maintenance updates: diff --git a/perfkitbenchmarker/benchmark_sets.py b/perfkitbenchmarker/benchmark_sets.py index 854bea13c6..ab460c788a 100644 --- a/perfkitbenchmarker/benchmark_sets.py +++ b/perfkitbenchmarker/benchmark_sets.py @@ -14,7 +14,6 @@ """Benchmark set specific functions and definitions.""" - import collections import copy import itertools @@ -159,6 +158,7 @@ 'mongodb_ycsb', 'multichase', 'mxnet', + 'neper', 'netperf', 'netperf_hammerdbcli', 'object_storage_service', @@ -272,6 +272,16 @@ 'cloudsuite_web_serving', ], }, + 'networking': { + MESSAGE: 'Networking benchmark set.', + BENCHMARK_LIST: [ + 'iperf', + 'mesh_network', + 'neper', + 'netperf', + 'ping', + ], + }, } diff --git a/perfkitbenchmarker/linux_benchmarks/neper_benchmark.py b/perfkitbenchmarker/linux_benchmarks/neper_benchmark.py new file mode 100644 index 0000000000..c9e6a8b5a2 --- /dev/null +++ b/perfkitbenchmarker/linux_benchmarks/neper_benchmark.py @@ -0,0 +1,273 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Runs Neper RFS benchmark.""" + +import logging +import threading +import time +from absl import flags +from perfkitbenchmarker import background_tasks +from perfkitbenchmarker import configs +from perfkitbenchmarker import sample +from perfkitbenchmarker.linux_packages import neper +from perfkitbenchmarker.linux_packages import rfs + +FLAGS = flags.FLAGS + +# --- Benchmark Flags --- +flags.DEFINE_integer( + 'neper_num_flows', 10000, 'Total connections created by Neper.' +) +flags.DEFINE_integer( + 'neper_payload_size', 1024, 'Size of request/response payload in bytes.' +) +flags.DEFINE_integer( + 'neper_numa_node', -1, 'NUMA node for pinning. If -1, pinning is disabled.' +) +# Note: RFS global and per-queue flags are now in linux_packages/rfs.py +flags.DEFINE_integer( + 'neper_test_length', 30, 'Duration of the traffic run in seconds.' +) + +BENCHMARK_NAME = 'neper' +BENCHMARK_CONFIG = """ +neper: + description: Run Neper RFS benchmark. + vm_groups: + vm_1: + vm_spec: *default_dual_core + vm_2: + vm_spec: *default_dual_core +""" + + +def GetConfig(user_config): + return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) + + +def GetPrimaryNIC(vm): + """Finds the primary network interface.""" + stdout, _ = vm.RemoteCommand( + "ip route show | grep default | awk '{print $5}'" + ) + return stdout.strip() + + +def PrepareVM(vm): + """Prepares a single VM for the Neper benchmark.""" + vm.Install('neper') + + primary_iface = GetPrimaryNIC(vm) + + # NUMA Validation + numa_node_file = f'/sys/class/net/{primary_iface}/device/numa_node' + stdout, _ = vm.RemoteCommand(f'cat {numa_node_file}', ignore_failure=True) + if stdout: + try: + actual_numa = int(stdout.strip()) + if FLAGS.neper_numa_node != -1 and FLAGS.neper_numa_node != actual_numa: + logging.warning( + 'NUMA node mismatch on %s: expected %s, got %s', + vm.name, + FLAGS.neper_numa_node, + actual_numa, + ) + except ValueError: + logging.warning('Could not determine NUMA node for %s', primary_iface) + + # Use standardized RFS configuration + rfs.Configure(vm) + + # Memory Safety Check: + # Neper consumes significant memory for socket buffers and tracking flows. + # We estimate ~128KB of kernel/user memory per flow. + # We warn if estimated usage exceeds 80% of total system RAM to avoid OOM. + stdout, _ = vm.RemoteCommand("grep MemTotal /proc/meminfo | awk '{print $2}'") + total_mem_kb = int(stdout.strip()) + estimated_mem_kb = FLAGS.neper_num_flows * 128 + if estimated_mem_kb > 0.8 * total_mem_kb: + logging.warning( + 'Estimated memory usage on %s exceeds 80%% of total RAM.', vm.name + ) + + # CPU Count Check + stdout, _ = vm.RemoteCommand('nproc') + if int(stdout.strip()) == 1: + logging.warning( + 'RFS on %s requires at least two cores to provide any steering' + ' benefit.', + vm.name, + ) + + +def Prepare(benchmark_spec): + """Prepares the VMs for the Neper benchmark.""" + vms = benchmark_spec.vms + background_tasks.RunThreaded(PrepareVM, vms) + + +def GetProcStat(vm): + """Reads and parses /proc/stat.""" + stdout, _ = vm.RemoteCommand("grep '^cpu ' /proc/stat") + parts = stdout.split() + # User(1), System(3), Idle(4), IOWait(5), SoftIRQ(7) + columns = [int(p) for p in parts[1:]] + total = sum(columns) + return { + 'softirq': columns[6], + 'total': total, + 'active': ( + total - columns[3] - columns[4] + ), # Active = Total - Idle - IOWait + } + + +def Run(benchmark_spec): + """Runs the Neper benchmark.""" + client_vm, server_vm = benchmark_spec.vms[:2] + + # Neper default ports + server_vm.AllowPort(12866) + server_vm.AllowPort(12867) + + pinning = ( + f'numactl --cpunodebind={FLAGS.neper_numa_node}' + if FLAGS.neper_numa_node >= 0 + else '' + ) + + # Start Server: + # Increase file descriptor limits for high flow counts, pin to NUMA node if + # requested, and set flow/payload/latency/duration parameters. + server_args = [ + 'ulimit -n 65535 &&', + pinning, + neper.GetPath(), + '-T $(nproc)', + f'-F {FLAGS.neper_num_flows}', + f'-Q {FLAGS.neper_payload_size}', + f'-R {FLAGS.neper_payload_size}', + '-p 50,90,99', + f'-l {FLAGS.neper_test_length}', + ] + server_cmd = ' '.join(filter(None, server_args)) + + def StartServer(): + server_vm.RemoteCommand(server_cmd) + + server_thread = threading.Thread(target=StartServer) + server_thread.daemon = True + server_thread.start() + + # Neper consumes significant memory for socket buffers and tracking flows. + # We wait a few seconds for flow initialization to complete. + time.sleep(10) + + # Snapshot T0: capture CPU and RFS stats before traffic begins. + client_t0 = GetProcStat(client_vm) + server_t0 = GetProcStat(server_vm) + client_rfs_t0 = rfs.GetSoftnetStat(client_vm) + server_rfs_t0 = rfs.GetSoftnetStat(server_vm) + + # Run Client: similar parameters to server, but with -c and target host. + client_args = [ + 'ulimit -n 65535 &&', + pinning, + neper.GetPath(), + '-c', + f'-H {server_vm.internal_ip}', + '-T $(nproc)', + f'-F {FLAGS.neper_num_flows}', + f'-Q {FLAGS.neper_payload_size}', + f'-R {FLAGS.neper_payload_size}', + '-p 50,90,99', + f'-l {FLAGS.neper_test_length}', + ] + client_cmd = ' '.join(filter(None, client_args)) + stdout, _ = client_vm.RemoteCommand(client_cmd) + + # Snapshot Tend: capture CPU and RFS stats after traffic finishes. + client_tend = GetProcStat(client_vm) + server_tend = GetProcStat(server_vm) + client_rfs_tend = rfs.GetSoftnetStat(client_vm) + server_rfs_tend = rfs.GetSoftnetStat(server_vm) + + # Calculate Deltas: Compute utilization percentages based on the snapshots. + samples = [] + metadata = { + 'numa_node': FLAGS.neper_numa_node, + 'num_flows': FLAGS.neper_num_flows, + 'payload_size': FLAGS.neper_payload_size, + } + metadata.update(rfs.GetMetadata()) + + def AddCpuSamples(t0, tend, prefix): + """Calculates CPU and SoftIRQ utilization and appends to samples.""" + delta_total = tend['total'] - t0['total'] + delta_active = tend['active'] - t0['active'] + delta_softirq = tend['softirq'] - t0['softirq'] + if delta_total > 0: + cpu_util = (delta_active / delta_total) * 100 + softirq_util = (delta_softirq / delta_total) * 100 + samples.append( + sample.Sample(f'{prefix}_cpu_utilization', cpu_util, '%', metadata) + ) + samples.append( + sample.Sample( + f'{prefix}_softirq_utilization', softirq_util, '%', metadata + ) + ) + + AddCpuSamples(client_t0, client_tend, 'client') + AddCpuSamples(server_t0, server_tend, 'server') + + # Add RFS verification samples + samples.append( + sample.Sample( + 'client_rps_flow_steer_delta', + client_rfs_tend - client_rfs_t0, + 'count', + metadata, + ) + ) + samples.append( + sample.Sample( + 'server_rps_flow_steer_delta', + server_rfs_tend - server_rfs_t0, + 'count', + metadata, + ) + ) + + # Parse Neper Output + for line in stdout.splitlines(): + if '=' in line: + key, value = line.split('=') + if key in ['throughput', 'latency_p50', 'latency_p99', 'latency_max']: + unit = 'Transactions/s' if key == 'throughput' else 's' + samples.append(sample.Sample(key, float(value), unit, metadata)) + + return samples + + +def CleanupVM(vm): + """Cleans up a single VM.""" + vm.RemoteCommand('sudo killall tcp_rr', ignore_failure=True) + rfs.Restore(vm) + + +def Cleanup(benchmark_spec): + """Cleans up the VMs after the Neper benchmark.""" + vms = benchmark_spec.vms + background_tasks.RunThreaded(CleanupVM, vms) diff --git a/perfkitbenchmarker/linux_benchmarks/nginx_benchmark.py b/perfkitbenchmarker/linux_benchmarks/nginx_benchmark.py index a6e57bce3a..65823ee08c 100644 --- a/perfkitbenchmarker/linux_benchmarks/nginx_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/nginx_benchmark.py @@ -27,6 +27,7 @@ from perfkitbenchmarker import configs from perfkitbenchmarker import provider_info from perfkitbenchmarker import sample +from perfkitbenchmarker.linux_packages import rfs from perfkitbenchmarker.linux_packages import wrk2 FLAGS = flags.FLAGS @@ -203,8 +204,7 @@ def _ConfigureNginxServer(server, upstream_servers): for internal_ip in upstream_server.GetInternalIPs(): server.RemoteCommand( r"sudo sed -i 's|# server |server %s|g'" - ' /etc/nginx/conf.d/loadbalance.conf' - % (idx, internal_ip) + ' /etc/nginx/conf.d/loadbalance.conf' % (idx, internal_ip) ) idx += 1 if FLAGS.nginx_use_ssl: @@ -373,13 +373,22 @@ def Prepare(benchmark_spec): background_tasks.RunThreaded( _TuneNetworkStack, clients + [server] + upstream_servers ) + background_tasks.RunThreaded( + rfs.Configure, clients + [server] + upstream_servers + ) -def RunMultiClient(clients, targets, rate, connections, duration, threads): +def RunMultiClient( + clients, targets, rate, connections, duration, threads, server=None +): """Run multiple instances of wrk2 against a single target.""" results = [] num_clients = len(clients) + # Snapshot RFS stats + client_rfs_t0 = {c: rfs.GetSoftnetStat(c) for c in clients} + server_rfs_t0 = rfs.GetSoftnetStat(server) if server else None + def _RunSingleClient(client, client_number, target): """Run wrk2 from a single client.""" client_results = list( @@ -402,6 +411,10 @@ def _RunSingleClient(client, client_number, target): args.append(((client, i, target), {})) background_tasks.RunThreaded(_RunSingleClient, args) + # Snapshot Tend RFS stats + client_rfs_tend = {c: rfs.GetSoftnetStat(c) for c in clients} + server_rfs_tend = rfs.GetSoftnetStat(server) if server else None + requests = 0 errors = 0 max_latency = 0.0 @@ -430,8 +443,31 @@ def _RunSingleClient(client, client_number, target): 'num_server_targets': len(targets), 'nginx_content_size': FLAGS.nginx_content_size, } + metadata.update(rfs.GetMetadata()) if not FLAGS.nginx_file_server_conf: metadata['caching'] = True + + # Add RFS verification samples + for client, t0 in client_rfs_t0.items(): + delta = client_rfs_tend[client] - t0 + results.append( + sample.Sample( + f'client_{client.name}_rps_flow_steer_delta', + delta, + 'count', + metadata, + ) + ) + if server_rfs_t0 is not None and server_rfs_tend is not None: + results.append( + sample.Sample( + 'server_rps_flow_steer_delta', + server_rfs_tend - server_rfs_t0, + 'count', + metadata, + ) + ) + results += [ sample.Sample('achieved_rate', requests / duration, '', metadata), sample.Sample('aggregate requests', requests, '', metadata), @@ -483,6 +519,7 @@ def Run(benchmark_spec): connections=clients[0].NumCpusForBenchmark() * 10, duration=60, threads=clients[0].NumCpusForBenchmark(), + server=server, ) # Binary search for highest RPS under the p99 latency threshold. @@ -501,6 +538,7 @@ def Run(benchmark_spec): connections=clients[0].NumCpusForBenchmark() * 10, duration=60, threads=clients[0].NumCpusForBenchmark(), + server=server, ) for result in results: if result.metric == 'p99 latency': @@ -519,7 +557,7 @@ def Run(benchmark_spec): for config in FLAGS.nginx_load_configs: rate, duration, threads, connections = list(map(int, config.split(':'))) results += RunMultiClient( - clients, targets, rate, connections, duration, threads + clients, targets, rate, connections, duration, threads, server ) return results @@ -531,4 +569,9 @@ def Cleanup(benchmark_spec): benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. """ - del benchmark_spec + clients = benchmark_spec.vm_groups['clients'] + server = benchmark_spec.vm_groups['server'][0] + upstream_servers = benchmark_spec.vm_groups['upstream_servers'] + background_tasks.RunThreaded( + rfs.Restore, clients + [server] + upstream_servers + ) diff --git a/perfkitbenchmarker/linux_benchmarks/redis_memtier_benchmark.py b/perfkitbenchmarker/linux_benchmarks/redis_memtier_benchmark.py index bf82df5987..5065a99707 100644 --- a/perfkitbenchmarker/linux_benchmarks/redis_memtier_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/redis_memtier_benchmark.py @@ -32,6 +32,7 @@ from perfkitbenchmarker import virtual_machine from perfkitbenchmarker.linux_packages import memtier from perfkitbenchmarker.linux_packages import redis_server +from perfkitbenchmarker.linux_packages import rfs # location for top command output _TOP_OUTPUT = 'top.txt' @@ -133,7 +134,9 @@ def GetConfig(user_config: Dict[str, Any]) -> Dict[str, Any]: def PrepareSystem(bm_spec: _BenchmarkSpec) -> None: """Set system-wide parameters.""" server_vms = bm_spec.vm_groups['servers'] # for cluster mode + client_vms = bm_spec.vm_groups['clients'] background_tasks.RunThreaded(redis_server.PrepareSystem, server_vms) + background_tasks.RunThreaded(rfs.Configure, server_vms + client_vms) def InstallPackages(bm_spec: _BenchmarkSpec) -> None: @@ -211,10 +214,14 @@ def Run(bm_spec: _BenchmarkSpec) -> List[sample.Sample]: else [None] ) + benchmark_metadata = {} + benchmark_metadata.update(rfs.GetMetadata()) + maximum_total_ops_throughput = 0.0 result_with_maximum_total_ops_throughput = None all_results = [] top_results = [] + rfs_results = [] try: for io_threads in io_threads_to_sweep: @@ -222,11 +229,20 @@ def Run(bm_spec: _BenchmarkSpec) -> List[sample.Sample]: if io_threads is not None: _UpdateIOThreadsForRedisServer(bm_spec.vm_groups['servers'], io_threads) + # Snapshot RFS stats + client_rfs_t0 = {c: rfs.GetSoftnetStat(c) for c in client_vms} + server_rfs_t0 = rfs.GetSoftnetStat(server_vm) if server_vm else 0 + raw_results = memtier.RunOverAllThreadsPipelinesAndClients( client_vms, bm_spec.redis_endpoint_ip, redis_server.GetRedisPorts(server_vm), ) + + # Snapshot Tend RFS stats + client_rfs_tend = {c: rfs.GetSoftnetStat(c) for c in client_vms} + server_rfs_tend = rfs.GetSoftnetStat(server_vm) if server_vm else 0 + redis_metadata = redis_server.GetMetadata(server_vm) total_ops_throughput = 0.0 @@ -236,6 +252,27 @@ def Run(bm_spec: _BenchmarkSpec) -> List[sample.Sample]: server_result.metadata.update(redis_metadata) server_result.metadata.update(benchmark_metadata) + # Add RFS verification samples + for client, t0 in client_rfs_t0.items(): + delta = client_rfs_tend[client] - t0 + rfs_results.append( + sample.Sample( + f'client_{client.name}_rps_flow_steer_delta', + delta, + 'count', + benchmark_metadata, + ) + ) + if server_vm: + rfs_results.append( + sample.Sample( + 'server_rps_flow_steer_delta', + server_rfs_tend - server_rfs_t0, + 'count', + benchmark_metadata, + ) + ) + if total_ops_throughput > maximum_total_ops_throughput: maximum_total_ops_throughput = total_ops_throughput result_with_maximum_total_ops_throughput = raw_results @@ -257,11 +294,18 @@ def Run(bm_spec: _BenchmarkSpec) -> List[sample.Sample]: for server_vm in bm_spec.vm_groups['servers']: redis_server.VerifyRedisAof(server_vm) - return result_with_maximum_total_ops_throughput + all_results + top_results + return ( + result_with_maximum_total_ops_throughput + + all_results + + top_results + + rfs_results + ) def Cleanup(bm_spec: _BenchmarkSpec) -> None: - del bm_spec + server_vms = bm_spec.vm_groups.get('servers', []) + client_vms = bm_spec.vm_groups.get('clients', []) + background_tasks.RunThreaded(rfs.Restore, server_vms + client_vms) def _UpdateIOThreadsForRedisServer( diff --git a/perfkitbenchmarker/linux_packages/neper.py b/perfkitbenchmarker/linux_packages/neper.py new file mode 100644 index 0000000000..509f011655 --- /dev/null +++ b/perfkitbenchmarker/linux_packages/neper.py @@ -0,0 +1,45 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module containing neper installation and cleanup functions.""" + +from perfkitbenchmarker import linux_packages + +NEPER_DIR = f'{linux_packages.INSTALL_DIR}/neper' +GIT_REPO = 'https://github.com/google/neper.git' +GIT_COMMIT = 'c1419ebdd26ab934e2391da6a89c7323adb4b4fb' + + +def _Install(vm): + """Installs the neper package on the VM.""" + vm.Install('build_tools') + vm.Install('numactl') + vm.InstallPackages('autoconf automake git') + vm.RemoteCommand(f'rm -rf {NEPER_DIR} && git clone {GIT_REPO} {NEPER_DIR}') + vm.RemoteCommand(f'cd {NEPER_DIR} && git checkout {GIT_COMMIT} && make') + + +def YumInstall(vm): + """Installs the neper package on the VM for Yum-based systems.""" + _Install(vm) + + +def AptInstall(vm): + """Installs the neper package on the VM for Apt-based systems.""" + _Install(vm) + + +def GetPath(): + """Returns the path to the tcp_rr binary on the VM.""" + return f'{NEPER_DIR}/tcp_rr' diff --git a/perfkitbenchmarker/linux_packages/rfs.py b/perfkitbenchmarker/linux_packages/rfs.py new file mode 100644 index 0000000000..323698e330 --- /dev/null +++ b/perfkitbenchmarker/linux_packages/rfs.py @@ -0,0 +1,121 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Standardized Receive Flow Steering (RFS) tuning kit.""" + +import logging +from absl import flags + +FLAGS = flags.FLAGS + +flags.DEFINE_integer( + 'rps_sock_flow_entries', 0, 'Global RFS table size (rps_sock_flow_entries).' +) +flags.DEFINE_integer( + 'rps_flow_cnt', 0, 'Per-queue RFS flow count (rps_flow_cnt).' +) + +_RFS_GLOBAL_PATH = 'net.core.rps_sock_flow_entries' + + +def _GetPhysicalInterfaces(vm): + """Finds physical, non-virtual network interfaces.""" + stdout, _ = vm.RemoteCommand('find /sys/class/net -maxdepth 2 -name device') + # Expected output: /sys/class/net/eth0/device + interfaces = [] + for line in stdout.splitlines(): + if line.startswith('/sys/class/net/'): + iface = line.split('/')[4] + if iface != 'lo': + interfaces.append(iface) + return interfaces + + +def Configure(vm): + """Configures RFS on the VM and backs up current settings.""" + if not FLAGS.rps_sock_flow_entries and not FLAGS.rps_flow_cnt: + return + + # Back up original settings if not already backed up + if not hasattr(vm, 'rfs_original_settings'): + vm.rfs_original_settings = {} + + # Backup global + stdout, _ = vm.RemoteCommand(f'sysctl -n {_RFS_GLOBAL_PATH}') + vm.rfs_original_settings['global'] = stdout.strip() + + # Backup per-interface/queue + vm.rfs_original_settings['queues'] = {} + interfaces = _GetPhysicalInterfaces(vm) + for iface in interfaces: + # Find all rx-N/rps_flow_cnt files + find_cmd = f'find /sys/class/net/{iface}/queues/rx-* -name rps_flow_cnt' + stdout, _ = vm.RemoteCommand(find_cmd, ignore_failure=True) + for path in stdout.splitlines(): + val, _ = vm.RemoteCommand(f'cat {path}') + vm.rfs_original_settings['queues'][path] = val.strip() + + # Apply new settings + if FLAGS.rps_sock_flow_entries: + vm.RemoteCommand( + f'sudo sysctl -w {_RFS_GLOBAL_PATH}={FLAGS.rps_sock_flow_entries}' + ) + + if FLAGS.rps_flow_cnt: + interfaces = _GetPhysicalInterfaces(vm) + for iface in interfaces: + queues_pattern = f'/sys/class/net/{iface}/queues/rx-*/rps_flow_cnt' + vm.RemoteCommand( + f'echo {FLAGS.rps_flow_cnt} | sudo tee {queues_pattern}', + ignore_failure=True, + ) + + +def Restore(vm): + """Restores original RFS settings on the VM.""" + if not hasattr(vm, 'rfs_original_settings'): + logging.info('No RFS settings to restore on %s', vm.name) + return + + settings = vm.rfs_original_settings + + # Restore global + vm.RemoteCommand(f"sudo sysctl -w {_RFS_GLOBAL_PATH}={settings['global']}") + + # Restore per-queue + for path, val in settings['queues'].items(): + vm.RemoteCommand(f'echo {val} | sudo tee {path}', ignore_failure=True) + + del vm.rfs_original_settings + + +def GetMetadata(): + """Returns RFS metadata based on current flags.""" + metadata = {} + if FLAGS.rps_sock_flow_entries: + metadata['rps_sock_flow_entries'] = FLAGS.rps_sock_flow_entries + if FLAGS.rps_flow_cnt: + metadata['rps_flow_cnt'] = FLAGS.rps_flow_cnt + return metadata + + +def GetSoftnetStat(vm): + """Reads /proc/net/softnet_stat and returns the rps_flow_steer count (column 10).""" + # Each line is a CPU. Column 10 (0-indexed 9) is rps_flow_steer. + stdout, _ = vm.RemoteCommand('cat /proc/net/softnet_stat') + total_rps_flow_steer = 0 + for line in stdout.splitlines(): + parts = line.split() + if len(parts) >= 10: + total_rps_flow_steer += int(parts[9], 16) + return total_rps_flow_steer diff --git a/tests/linux_benchmarks/neper_benchmark_test.py b/tests/linux_benchmarks/neper_benchmark_test.py new file mode 100644 index 0000000000..e9a87a7395 --- /dev/null +++ b/tests/linux_benchmarks/neper_benchmark_test.py @@ -0,0 +1,87 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from absl import flags +import mock +from perfkitbenchmarker import benchmark_spec +from perfkitbenchmarker.linux_benchmarks import neper_benchmark + +FLAGS = flags.FLAGS +FLAGS.mark_as_parsed() + + +class NeperBenchmarkTestCase(unittest.TestCase): + + def setUp(self): + super().setUp() + self.spec = mock.MagicMock(spec=benchmark_spec.BenchmarkSpec) + self.client_vm = mock.MagicMock() + self.server_vm = mock.MagicMock() + self.spec.vms = [self.client_vm, self.server_vm] + self.client_vm.name = 'client' + self.server_vm.name = 'server' + self.server_vm.internal_ip = '10.0.0.2' + + @mock.patch('perfkitbenchmarker.linux_benchmarks.neper_benchmark.GetProcStat') + @mock.patch('perfkitbenchmarker.linux_packages.neper.GetPath') + def testRun(self, mock_get_path, mock_get_proc_stat): + mock_get_path.return_value = '/usr/local/bin/tcp_rr' + # Mock /proc/stat snapshots + mock_get_proc_stat.side_effect = [ + {'softirq': 100, 'total': 1000, 'active': 500}, # client T0 + {'softirq': 200, 'total': 2000, 'active': 800}, # server T0 + {'softirq': 150, 'total': 2000, 'active': 1000}, # client Tend + {'softirq': 300, 'total': 3000, 'active': 1300}, # server Tend + ] + + # Mock Neper output + self.client_vm.RemoteCommand.return_value = ( + ( + 'num_transactions=1000\n' + 'throughput=100.0\n' + 'latency_min=0.1\n' + 'latency_max=1.0\n' + 'latency_p50=0.5\n' + 'latency_p90=0.8\n' + 'latency_p99=0.9\n' + ), + '', + ) + + samples = neper_benchmark.Run(self.spec) + + # Verify samples + sample_names = [s.metric for s in samples] + self.assertIn('client_cpu_utilization', sample_names) + self.assertIn('client_softirq_utilization', sample_names) + self.assertIn('server_cpu_utilization', sample_names) + self.assertIn('server_softirq_utilization', sample_names) + self.assertIn('throughput', sample_names) + self.assertIn('latency_p50', sample_names) + self.assertIn('latency_p99', sample_names) + self.assertIn('latency_max', sample_names) + + # Verify deltas + # Client: DeltaTotal=1000, DeltaActive=500, DeltaSoftIRQ=50 + # CPU util = 50%, SoftIRQ util = 5% + for s in samples: + if s.metric == 'client_cpu_utilization': + self.assertEqual(s.value, 50.0) + if s.metric == 'client_softirq_utilization': + self.assertEqual(s.value, 5.0) + + +if __name__ == '__main__': + unittest.main()