Skip to content

Commit afa17e6

Browse files
feat: signing and firing real ethereum txs (#2)
* wip * oop * signer * does this fix * guh * reps * wut * Revert "wut" This reverts commit 2b4ca56. * oh spawn more lol * hm * Update main.rs * Reapply "wut" This reverts commit 699b70d. * Revert "Reapply "wut"" This reverts commit c8879ce. * Update worker.rs * async tx gen worker * wuh * Revert "wuh" This reverts commit 3c8f5d6. * Revert "async tx gen worker" This reverts commit aa1c1a7. * wuh2 * fix async? * full threads * ratio, sync again * Update main.rs * Update main.rs * try: cachepad * Revert "try: cachepad" This reverts commit 947b586. * LAWG * breh * breh2 * Update main.rs * lawg * brh * bind * disable pinning for a sec * feat: current thread tokio * Update main.rs * bruh oops * pin thread * curious * just pinned connection workers? * no pinning * yes pin * no pin all * pin * Update main.rs * disable again * blarg * blarg * Update main.rs * lawgs * Update queue.rs * Update queue.rs * grah * feat: clean * Update network.rs * reorg workers * feat: ensure each worker gets a core * comments * blah * proper json body * rm log * feat: proper decoder on testserver * Update network.rs * Update tx_queue.rs * Update tx_queue.rs * try cooked ratio * Update main.rs * disable thread pinning * pinning * actual reth? * log? * lawg * bruh * log ranges * Update workers.rs * todo * tehehe * Update network.rs * Update network.rs * Update network.rs * fml * Update network.rs * Update tx_gen.rs * Update network.rs * new signer each time hehe * Update tx_gen.rs * DEBUG way fewer connections, SLEEP * Revert "DEBUG way fewer connections, SLEEP" This reverts commit 19cc477. * anvil key * blarg * blarg * 0 gwei? * todo * feat: more granular worker specification * feat: use MnemonicBuilder * 1 wei * Update tx_gen.rs * Update tx_gen.rs * fix * pop tx from front * rm: annoying log * Update tx_queue.rs * time req * Revert "time req" This reverts commit c63d5c5. * MOAR CONNECTIONS * 100k connections?!?! * 20k connections * 50k connections?! * 30k connections * testserver match port * bruh * Update tx_queue.rs * 20k conn * upgrade actix max connections * 100k connections * extra detail? * document epehemral ports * Update network.rs * Update main.rs * Update main.rs * Update main.rs * Revert "Update main.rs" This reverts commit 7eaefce. * req async? * Update network.rs * undo async reqs * feat: batching * lawg * rm measure * Update network.rs * proper rps measure with batching * Update network.rs
1 parent c95fc76 commit afa17e6

12 files changed

Lines changed: 4607 additions & 647 deletions

File tree

Cargo.lock

Lines changed: 4109 additions & 552 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crescendo/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,9 @@ tokio-metrics = "0.4.3"
1414
rlimit = "0.10.2"
1515
num_cpus = "1.17.0"
1616
crossbeam-utils = "0.8.21"
17-
mimalloc = "0.1.47"
17+
mimalloc = "0.1.47"
18+
alloy = { version = "1", features = ["genesis", "signers"] }
19+
alloy-evm = "0.13"
20+
alloy-consensus = { version = "1", features = ["secp256k1"] }
21+
alloy-signer-local = { version = "1", features = ["mnemonic"] }
22+
core_affinity = "0.8.3"

crescendo/src/main.rs

Lines changed: 58 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,50 +2,82 @@ use std::future::pending;
22
use std::thread;
33
use std::time::Duration;
44

5+
use core_affinity;
56
use mimalloc::MiMalloc;
6-
use stats::STATS;
77

8-
mod stats;
8+
mod network_stats;
9+
mod tx_queue;
910
mod utils;
10-
mod worker;
11+
mod workers;
12+
13+
use crate::network_stats::NETWORK_STATS;
14+
use crate::tx_queue::TX_QUEUE;
15+
use crate::workers::{DesireType, WorkerType};
1116

1217
#[global_allocator]
1318
// Increases RPS by ~5.5% at the time of
1419
// writing. ~3.3% faster than jemalloc.
1520
static GLOBAL: MiMalloc = MiMalloc;
1621

17-
const TOTAL_CONNECTIONS: u64 = 4096;
18-
const TARGET_URL: &str = "http://127.0.0.1:8080";
22+
// TODO: Configurable CLI args.
23+
const TOTAL_CONNECTIONS: u64 = 20_000; // This is limited by the amount of ephemeral ports available on the system.
24+
const THREAD_PINNING: bool = true;
25+
const TARGET_URL: &str = "http://127.0.0.1:8545";
1926

20-
#[tokio::main(worker_threads = 1)]
27+
#[tokio::main(flavor = "current_thread")]
2128
async fn main() {
22-
let num_threads = num_cpus::get() as u64;
23-
let connections_per_thread = TOTAL_CONNECTIONS / num_threads;
24-
2529
if let Err(err) = utils::increase_nofile_limit(TOTAL_CONNECTIONS * 10) {
26-
println!("Failed to increase file descriptor limit: {err}.");
30+
println!("[!] Failed to increase file descriptor limit: {err}.");
2731
}
2832

29-
println!(
30-
"Running {} threads with {} connections each against {}...",
31-
num_threads, connections_per_thread, TARGET_URL
33+
let mut core_ids = core_affinity::get_core_ids().unwrap();
34+
println!("[*] Detected {} effective cores.", core_ids.len());
35+
36+
// Pin the tokio runtime to a core (if enabled).
37+
utils::maybe_pin_thread(core_ids.pop().unwrap(), THREAD_PINNING);
38+
39+
// Given our desired breakdown of workers, translate this into actual numbers of workers to spawn.
40+
let (workers, worker_counts) = workers::assign_workers(
41+
core_ids, // Doesn't include the main runtime core.
42+
vec![(WorkerType::TxGen, DesireType::Exact(20)), (WorkerType::Network, DesireType::Percentage(1.0))],
43+
THREAD_PINNING, // Only log core ranges if thread pinning is actually enabled.
3244
);
3345

34-
// Spawn all worker threads.
35-
for _ in 0..num_threads {
36-
thread::spawn(move || {
37-
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
38-
rt.block_on(async {
39-
for _ in 0..connections_per_thread {
40-
tokio::spawn(worker::connection_worker(TARGET_URL));
41-
}
42-
pending::<()>().await; // Keep the runtime alive forever.
43-
});
44-
});
46+
let connections_per_network_worker = TOTAL_CONNECTIONS / worker_counts[&WorkerType::Network];
47+
println!("[*] Connections per network worker: {}", connections_per_network_worker);
48+
49+
// TODO: Having the assign_workers function do this would be cleaner, also give ids to the network workers.
50+
let mut tx_gen_worker_id = 0;
51+
52+
// Spawn the workers, pinning them to the appropriate cores if enabled.
53+
for (core_id, worker_type) in workers {
54+
match worker_type {
55+
WorkerType::TxGen => {
56+
thread::spawn(move || {
57+
utils::maybe_pin_thread(core_id, THREAD_PINNING);
58+
workers::tx_gen_worker(tx_gen_worker_id);
59+
});
60+
tx_gen_worker_id += 1;
61+
}
62+
WorkerType::Network => {
63+
thread::spawn(move || {
64+
utils::maybe_pin_thread(core_id, THREAD_PINNING);
65+
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
66+
67+
rt.block_on(async {
68+
for _ in 0..connections_per_network_worker {
69+
tokio::spawn(workers::network_worker(TARGET_URL));
70+
}
71+
pending::<()>().await; // Keep the runtime alive forever.
72+
});
73+
});
74+
}
75+
}
4576
}
4677

47-
// Start stats reporter.
48-
tokio::spawn(STATS.start_reporter(Duration::from_secs(1)))
78+
// Start reporters.
79+
tokio::spawn(TX_QUEUE.start_reporter(Duration::from_secs(1)));
80+
tokio::spawn(NETWORK_STATS.start_reporter(Duration::from_secs(1)))
4981
.await // Keep the main thread alive forever.
5082
.unwrap();
5183
}
Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::time::Duration;
44
use crossbeam_utils::CachePadded;
55
use thousands::Separable;
66

7-
pub struct Stats {
7+
pub struct NetworkStats {
88
requests: AtomicU64,
99
errors: AtomicU64,
1010
}
@@ -13,16 +13,16 @@ pub struct Stats {
1313
// other frequently accessed memory can occur. To mitigate, we pad stats
1414
// to the length of a full cache line to avoid conflict. This is measured
1515
// to increase RPS by >10% in the release profile at the time of writing.
16-
pub static STATS: CachePadded<Stats> =
17-
CachePadded::new(Stats { requests: AtomicU64::new(0), errors: AtomicU64::new(0) });
16+
pub static NETWORK_STATS: CachePadded<NetworkStats> =
17+
CachePadded::new(NetworkStats { requests: AtomicU64::new(0), errors: AtomicU64::new(0) });
1818

19-
impl Stats {
20-
pub fn inc_requests(&self) {
21-
self.requests.fetch_add(1, Ordering::Relaxed);
19+
impl NetworkStats {
20+
pub fn inc_requests_by(&self, count: usize) {
21+
self.requests.fetch_add(count as u64, Ordering::Relaxed);
2222
}
2323

24-
pub fn inc_errors(&self) {
25-
self.errors.fetch_add(1, Ordering::Relaxed);
24+
pub fn inc_errors_by(&self, count: usize) {
25+
self.errors.fetch_add(count as u64, Ordering::Relaxed);
2626
}
2727

2828
pub async fn start_reporter(&self, measurement_interval: Duration) {
@@ -37,7 +37,7 @@ impl Stats {
3737
let rps = requests - last_requests;
3838
let eps = errors - last_errors;
3939
println!(
40-
"RPS: {}, EPS: {}, Total requests: {}, Total errors: {}",
40+
"[*] RPS: {}, EPS: {}, Total requests: {}, Total errors: {}",
4141
rps.separate_with_commas(),
4242
eps.separate_with_commas(),
4343
requests.separate_with_commas(),

crescendo/src/tx_queue.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use std::collections::VecDeque;
2+
use std::sync::atomic::{AtomicU64, Ordering};
3+
use std::sync::Mutex;
4+
5+
use thousands::Separable;
6+
7+
pub struct TxQueue {
8+
// TODO: RwLock? Natively concurrent deque?
9+
queue: Mutex<VecDeque<Vec<u8>>>,
10+
total_added: AtomicU64,
11+
}
12+
13+
pub static TX_QUEUE: TxQueue = TxQueue { queue: Mutex::new(VecDeque::new()), total_added: AtomicU64::new(0) };
14+
15+
impl TxQueue {
16+
pub fn push_tx(&self, tx: Vec<u8>) {
17+
self.total_added.fetch_add(1, Ordering::Relaxed);
18+
self.queue.lock().unwrap().push_back(tx);
19+
}
20+
21+
pub fn queue_len(&self) -> usize {
22+
self.queue.lock().map(|q| q.len()).unwrap_or(0)
23+
}
24+
25+
pub fn pop_at_most(&self, max_count: usize) -> Option<Vec<Vec<u8>>> {
26+
let mut queue = self.queue.lock().ok()?;
27+
28+
let count = max_count.min(queue.len());
29+
if count == 0 {
30+
return None;
31+
}
32+
33+
// TODO: Is drain more or less efficient than repeated pop_front?
34+
// It's important to pop from the front here, otherwise the node
35+
// gets confused seeing a bunch of txs with incredibly high nonces
36+
// before it sees any of the lower ones. It's possible this issue
37+
// could still emerge at high enough RPS, but haven't seen it yet.
38+
Some(queue.drain(..count).collect())
39+
}
40+
41+
pub async fn start_reporter(&self, measurement_interval: std::time::Duration) {
42+
let mut last_total_added = 0u64;
43+
let mut last_queue_len = 0usize;
44+
let mut interval = tokio::time::interval(measurement_interval);
45+
interval.tick().await;
46+
loop {
47+
interval.tick().await;
48+
let current_total_added = self.total_added.load(Ordering::Relaxed);
49+
let current_queue_len = self.queue_len();
50+
let added_per_second = current_total_added - last_total_added;
51+
let queue_growth = current_queue_len.saturating_sub(last_queue_len);
52+
println!(
53+
"[*] TxQueue +/s: {}, TxQueue Δ/s: {}, Current length: {}",
54+
added_per_second.separate_with_commas(),
55+
queue_growth.separate_with_commas(),
56+
current_queue_len.separate_with_commas()
57+
);
58+
last_total_added = current_total_added;
59+
last_queue_len = current_queue_len;
60+
}
61+
}
62+
}

crescendo/src/utils.rs

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,69 @@
11
use std::io;
22

3+
use core_affinity::CoreId;
34
use rlimit::Resource;
45

6+
/// Increase the file descriptor limit to the given minimum.
7+
///
8+
/// Panics if the hard limit is too low, otherwise tries to increase.
59
pub fn increase_nofile_limit(min_limit: u64) -> io::Result<u64> {
610
let (soft, hard) = Resource::NOFILE.get()?;
7-
println!("At startup, file descriptor limit: soft = {soft}, hard = {hard}");
11+
println!("[*] At startup, file descriptor limit: soft = {soft}, hard = {hard}");
812

913
if hard < min_limit {
10-
panic!("File descriptor hard limit is too low. Please increase it to at least {}.", min_limit);
14+
panic!("[!] File descriptor hard limit is too low. Please increase it to at least {}.", min_limit);
1115
}
1216

1317
if soft != hard {
1418
Resource::NOFILE.set(hard, hard)?; // Just max things out to give us plenty of overhead.
1519
let (soft, hard) = Resource::NOFILE.get()?;
16-
println!("After increasing file descriptor limit: soft = {soft}, hard = {hard}");
20+
println!("[+] After increasing file descriptor limit: soft = {soft}, hard = {hard}");
1721
}
1822

1923
Ok(soft)
2024
}
25+
26+
/// Pin the current thread to the given core ID if enabled.
27+
///
28+
/// Panics if the thread fails to pin.
29+
pub fn maybe_pin_thread(core_id: CoreId, enable_thread_pinning: bool) {
30+
if !enable_thread_pinning {
31+
return;
32+
}
33+
34+
if !core_affinity::set_for_current(core_id) {
35+
panic!("[!] Failed to pin thread to core {}.", core_id.id);
36+
}
37+
}
38+
39+
/// Format a sorted list of numbers into a range string (e.g., "1-3, 5, 7-9")
40+
pub fn format_ranges(nums: &[usize]) -> String {
41+
if nums.is_empty() {
42+
return String::new();
43+
}
44+
45+
let mut ranges = Vec::new();
46+
let mut i = 0;
47+
48+
while i < nums.len() {
49+
let start = nums[i];
50+
let mut end = start;
51+
52+
// Find end of consecutive sequence
53+
while i + 1 < nums.len() && nums[i + 1] == nums[i] + 1 {
54+
i += 1;
55+
end = nums[i];
56+
}
57+
58+
// Format this range
59+
if start == end {
60+
ranges.push(start.to_string());
61+
} else {
62+
ranges.push(format!("{}-{}", start, end));
63+
}
64+
65+
i += 1;
66+
}
67+
68+
ranges.join(", ")
69+
}

crescendo/src/worker.rs

Lines changed: 0 additions & 42 deletions
This file was deleted.

0 commit comments

Comments
 (0)