Skip to content

Commit b65bf15

Browse files
perf: avoid lock contention in tx queue (#7)
* wip: avoid hogging lock * feat: tx_gen worker batching * clippy lints * Update tx_gen.rs
1 parent c930544 commit b65bf15

9 files changed

Lines changed: 47 additions & 28 deletions

File tree

configs/default.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ token_contract_address = "0x2000000000000000000000000000000000000001"
2020
recipient_distribution_factor = 20 # 1/20 of accounts receive transfers.
2121
max_transfer_amount = 10
2222

23+
batch_size = 1000 # Number of transactions to generate before pushing to queue.
24+
2325
[rate_limiting]
2426
initial_ratelimit = 100 # txs/s
2527

crescendo/src/bin/generate_genesis_alloc.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
1818
const NUM_ACCOUNTS: u32 = 50_000;
1919
const MNEMONIC: &str = "test test test test test test test test test test test junk";
2020

21-
println!("Generating {} accounts...", NUM_ACCOUNTS);
21+
println!("Generating {NUM_ACCOUNTS} accounts...");
2222

2323
let genesis_alloc: BTreeMap<String, AccountBalance> = (0..NUM_ACCOUNTS)
2424
.into_par_iter()
@@ -27,17 +27,17 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
2727
let signer =
2828
MnemonicBuilder::<English>::default().phrase(MNEMONIC).index(worker_id).unwrap().build().unwrap();
2929

30-
let address = secret_key_to_address(&signer.credential());
30+
let address = secret_key_to_address(signer.credential());
3131

32-
(format!("{:?}", address), AccountBalance { balance: "0xD3C21BCECCEDA1000000".to_string() })
32+
(format!("{address:?}"), AccountBalance { balance: "0xD3C21BCECCEDA1000000".to_string() })
3333
})
3434
.collect();
3535

3636
let output_path = Path::new("genesis-alloc.json");
3737
let json = serde_json::to_string_pretty(&genesis_alloc)?;
3838
fs::write(output_path, json)?;
3939

40-
println!("\nSuccessfully generated {} accounts!", NUM_ACCOUNTS);
40+
println!("\nSuccessfully generated {NUM_ACCOUNTS} accounts!");
4141
println!("Accounts saved to: {}", output_path.display());
4242

4343
Ok(())

crescendo/src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ pub struct TxGenWorkerConfig {
8282
pub token_contract_address: String,
8383
pub recipient_distribution_factor: u32,
8484
pub max_transfer_amount: u64,
85+
86+
pub batch_size: u32,
8587
}
8688

8789
#[derive(Debug, Clone, Serialize, Deserialize)]

crescendo/src/main.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::thread;
44
use std::time::Duration;
55

66
use clap::Parser;
7-
use core_affinity;
87
use mimalloc::MiMalloc;
98

109
mod config;
@@ -70,7 +69,7 @@ async fn main() {
7069

7170
let connections_per_network_worker =
7271
config::get().network_worker.total_connections / worker_counts[&WorkerType::Network];
73-
println!("[*] Connections per network worker: {}", connections_per_network_worker);
72+
println!("[*] Connections per network worker: {connections_per_network_worker}");
7473

7574
// TODO: Having the assign_workers function do this would be cleaner.
7675
let mut tx_gen_worker_id = 0;

crescendo/src/tx_queue.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,25 +37,35 @@ impl TxQueue {
3737
pub static TX_QUEUE: std::sync::LazyLock<TxQueue> = std::sync::LazyLock::new(TxQueue::new);
3838

3939
impl TxQueue {
40-
pub fn push_tx(&self, tx: Vec<u8>) {
41-
self.total_added.fetch_add(1, Ordering::Relaxed);
42-
self.queue.lock().push_back(tx);
40+
pub fn push_txs(&self, txs: Vec<Vec<u8>>) {
41+
self.total_added.fetch_add(txs.len() as u64, Ordering::Relaxed);
42+
self.queue.lock().extend(txs);
4343
}
4444

4545
pub fn queue_len(&self) -> usize {
4646
self.queue.lock().len()
4747
}
4848

4949
pub async fn pop_at_most(&self, max_count: usize) -> Option<Vec<Vec<u8>>> {
50-
let mut queue = self.queue.lock();
51-
let allowed = (0..queue.len().min(max_count)).take_while(|_| self.rate_limiter.try_wait().is_ok()).count();
50+
// Assume the queue has sufficient items for now.
51+
let allowed = (0..max_count).take_while(|_| self.rate_limiter.try_wait().is_ok()).count();
5252
if allowed == 0 {
5353
return None;
54+
}
55+
56+
// Scope to release lock asap.
57+
let drained = {
58+
let mut queue = self.queue.lock();
59+
let to_drain = allowed.min(queue.len());
60+
if to_drain == 0 {
61+
return None;
62+
}
63+
queue.drain(..to_drain).collect::<Vec<_>>()
5464
};
5565

56-
self.total_popped.fetch_add(allowed as u64, Ordering::Relaxed);
66+
self.total_popped.fetch_add(drained.len() as u64, Ordering::Relaxed);
5767

58-
Some(queue.drain(..allowed).collect())
68+
Some(drained)
5969
}
6070

6171
pub async fn start_reporter(&self, measurement_interval: std::time::Duration) {

crescendo/src/utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub fn increase_nofile_limit(min_limit: u64) -> io::Result<u64> {
1313
println!("[*] At startup, file descriptor limit: soft = {soft}, hard = {hard}");
1414

1515
if hard < min_limit {
16-
panic!("[!] File descriptor hard limit is too low. Please increase it to at least {}.", min_limit);
16+
panic!("[!] File descriptor hard limit is too low. Please increase it to at least {min_limit}.");
1717
}
1818

1919
if soft != hard {
@@ -61,7 +61,7 @@ pub fn format_ranges(nums: &[usize]) -> String {
6161
if start == end {
6262
ranges.push(start.to_string());
6363
} else {
64-
ranges.push(format!("{}-{}", start, end));
64+
ranges.push(format!("{start}-{end}"));
6565
}
6666

6767
i += 1;

crescendo/src/workers.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub fn assign_workers(
5757
if let Some(core_id) = core_ids.pop() {
5858
result.push((core_id, worker_type));
5959
*worker_counts.entry(worker_type).or_insert(0) += 1;
60-
worker_cores.entry(worker_type).or_insert_with(Vec::new).push(core_id);
60+
worker_cores.entry(worker_type).or_default().push(core_id);
6161
remaining_cores -= 1;
6262
}
6363
}
@@ -74,7 +74,7 @@ pub fn assign_workers(
7474
if let Some(core_id) = core_ids.pop() {
7575
result.push((core_id, *worker_type));
7676
*worker_counts.entry(*worker_type).or_insert(0) += 1;
77-
worker_cores.entry(*worker_type).or_insert_with(Vec::new).push(core_id);
77+
worker_cores.entry(*worker_type).or_default().push(core_id);
7878
remaining_cores -= 1;
7979
}
8080
}
@@ -87,26 +87,26 @@ pub fn assign_workers(
8787
if let Some(core_id) = core_ids.pop() {
8888
result.push((core_id, worker_type));
8989
*worker_counts.entry(worker_type).or_insert(0) += 1;
90-
worker_cores.entry(worker_type).or_insert_with(Vec::new).push(core_id);
90+
worker_cores.entry(worker_type).or_default().push(core_id);
9191
}
9292
}
9393
}
9494

95-
println!("[+] Spawning {} workers:", total_starting_cores);
95+
println!("[+] Spawning {total_starting_cores} workers:");
9696
for (worker_type, count) in worker_counts.clone() {
9797
if log_core_ranges {
9898
if let Some(cores) = worker_cores.get(&worker_type) {
9999
let mut core_ids: Vec<usize> = cores.iter().map(|c| c.id).collect();
100100
core_ids.sort();
101101

102102
let core_str = match core_ids.as_slice() {
103-
[single] => format!("core {}", single),
103+
[single] => format!("core {single}"),
104104
ids => format!("cores {}", format_ranges(ids)),
105105
};
106-
println!("- {:?}: {} ({})", worker_type, count, core_str);
106+
println!("- {worker_type:?}: {count} ({core_str})");
107107
}
108108
} else {
109-
println!("- {:?}: {}", worker_type, count);
109+
println!("- {worker_type:?}: {count}");
110110
}
111111
}
112112

crescendo/src/workers/network.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,19 +82,19 @@ pub async fn network_worker(worker_id: usize) {
8282
NETWORK_STATS.inc_requests_by(txs.len() - error_count);
8383
}
8484
Err(e) => {
85-
eprintln!("[!] Failed to read response body: {:?}", e);
85+
eprintln!("[!] Failed to read response body: {e:?}");
8686
NETWORK_STATS.inc_errors_by(txs.len());
8787
tokio::time::sleep(Duration::from_millis(config.error_sleep_ms)).await;
8888
}
8989
}
9090
} else {
91-
println!("[!] Request did not have OK status: {:?}", res);
91+
println!("[!] Request did not have OK status: {res:?}");
9292
NETWORK_STATS.inc_errors_by(txs.len());
9393
tokio::time::sleep(Duration::from_millis(100)).await;
9494
}
9595
}
9696
Err(e) => {
97-
eprintln!("[!] Request failed: {:?}", e);
97+
eprintln!("[!] Request failed: {e:?}");
9898
NETWORK_STATS.inc_errors_by(txs.len());
9999
tokio::time::sleep(Duration::from_millis(100)).await;
100100
}

crescendo/src/workers/tx_gen.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,12 @@ sol! {
4444
}
4545

4646
pub fn tx_gen_worker(_worker_id: u32) {
47+
let config = &config::get().tx_gen_worker;
48+
4749
let mut rng = rand::rng();
50+
let mut tx_batch = Vec::with_capacity(config.batch_size as usize);
4851

4952
loop {
50-
let config = &config::get().tx_gen_worker;
51-
5253
let account_index = rng.random_range(0..config.num_accounts); // Acount we'll be sending from.
5354

5455
// Get and increment nonce atomically.
@@ -83,7 +84,12 @@ pub fn tx_gen_worker(_worker_id: u32) {
8384
},
8485
);
8586

86-
TX_QUEUE.push_tx(tx);
87+
tx_batch.push(tx);
88+
89+
// Once we've accumulated batch_size transactions, drain them all to the queue.
90+
if tx_batch.len() >= config.batch_size as usize {
91+
TX_QUEUE.push_txs(std::mem::take(&mut tx_batch));
92+
}
8793
}
8894
}
8995

0 commit comments

Comments
 (0)