Skip to content

Commit 3ede6ab

Browse files
authored
Fix sub-second queue delays in TWMQ (#99)
* Revert "log clientid, txnid, queueid, chainid (#98)" This reverts commit b5b14a5. * Revert "valkey integration (#97)" This reverts commit 6bf1f2f. * Reapply "redis tls support (#95)" (#96) This reverts commit e570bc8. * Revert "redis tls support (#95)" This reverts commit d729f5f. * Fix sub-second queue delays in TWMQ Sub-second delayed requeues in TWMQ were being truncated to 0s because queue scheduling used Duration::as_secs(). That meant values like 200ms were effectively treated as immediate retries, which could cause hot-looping at queue poll cadence. This change adds a small helper that rounds any non-zero sub-second delay up to 1s and uses it consistently in: - queue push scheduling - hook scheduling - multilane queue scheduling * bump tarpaulin timeout * safe cargo clippy changes
1 parent b5b14a5 commit 3ede6ab

36 files changed

Lines changed: 306 additions & 666 deletions

.github/workflows/coverage-twmq.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ jobs:
5353

5454
# Run coverage with tarpaulin
5555
- name: Run coverage
56-
run: cargo tarpaulin -p twmq --skip-clean --out Xml --out Html --output-dir coverage --exclude-files "aa-core/*" --exclude-files "core/*" --exclude-files "server/*" --exclude-files "thirdweb-core/*" --exclude-files "executors/*"
56+
run: cargo tarpaulin -p twmq --skip-clean --timeout 300 --out Xml --out Html --output-dir coverage --exclude-files "aa-core/*" --exclude-files "core/*" --exclude-files "server/*" --exclude-files "thirdweb-core/*" --exclude-files "executors/*"
5757

5858
# Upload coverage to Codecov
5959
# TODO: Uncomment once we have open-sourced the repo

Cargo.toml

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,7 @@ config = "0.15.11"
9898
aws-arn = "0.3.1"
9999

100100
# Redis
101-
redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager", "cluster", "cluster-async", "tls-rustls", "tokio-rustls-comp"] }
101+
redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] }
102102

103103
# Dev dependencies
104-
criterion = { version = "0.6", features = ["html_reports", "async_tokio"] }
105-
106-
# Rustls
107-
#
108-
# NOTE: rustls 0.23 requires selecting exactly one process-wide crypto provider
109-
# (features: `ring` or `aws_lc_rs` / `aws-lc-rs`). Some dependency graphs (e.g. via
110-
# redis-rs' rustls integration) can end up with *no* provider enabled, which causes a
111-
# runtime panic when building TLS client/server configs.
112-
#
113-
# We explicitly enable the `ring` provider here to make TLS work reliably.
114-
rustls = { version = "0.23.32", default-features = false, features = ["std", "ring"] }
104+
criterion = { version = "0.6", features = ["html_reports", "async_tokio"] }

core/src/chain.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,6 @@ impl RpcCredentials {
2525

2626
Ok(header_map)
2727
}
28-
29-
pub fn client_id_for_logs(&self) -> Option<&str> {
30-
match self {
31-
RpcCredentials::Thirdweb(ThirdwebAuth::ClientIdServiceKey(creds)) => {
32-
Some(&creds.client_id)
33-
}
34-
RpcCredentials::Thirdweb(ThirdwebAuth::SecretKey(_)) => None,
35-
}
36-
}
3728
}
3829

3930
pub trait Chain: Send + Sync {

executors/src/eip7702_executor/confirm.rs

Lines changed: 6 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ use crate::{
2929
},
3030
};
3131

32-
const EIP7702_CONFIRM_QUEUE_ID: &str = "eip7702_confirm";
33-
3432
// --- Job Payload ---
3533
#[derive(Serialize, Deserialize, Debug, Clone)]
3634
#[serde(rename_all = "camelCase")]
@@ -170,7 +168,7 @@ where
170168
type ErrorData = Eip7702ConfirmationError;
171169
type JobData = Eip7702ConfirmationJobData;
172170

173-
#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, chain_id = job.job.data.chain_id, client_id = ?job.job.data.rpc_credentials.client_id_for_logs(), queue_id = EIP7702_CONFIRM_QUEUE_ID, stage = Self::stage_name(), executor = Self::executor_name()))]
171+
#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))]
174172
async fn process(
175173
&self,
176174
job: &BorrowedJob<Self::JobData>,
@@ -204,14 +202,7 @@ where
204202
.await
205203
.map_err(|e| {
206204
tracing::error!(
207-
transaction_id = %job_data.transaction_id,
208-
chain_id = job_data.chain_id,
209-
client_id = job_data
210-
.rpc_credentials
211-
.client_id_for_logs()
212-
.unwrap_or("unknown"),
213-
queue_id = EIP7702_CONFIRM_QUEUE_ID,
214-
bundler_transaction_id = %job_data.bundler_transaction_id,
205+
bundler_transaction_id = job_data.bundler_transaction_id,
215206
sender_details = ?job_data.sender_details,
216207
error = ?e,
217208
"Failed to get transaction hash from bundler"
@@ -330,15 +321,7 @@ where
330321
// Send webhook
331322
if let Err(e) = self.queue_success_webhook(job, success_data, tx) {
332323
tracing::error!(
333-
transaction_id = %job.job.data.transaction_id,
334-
chain_id = job.job.data.chain_id,
335-
client_id = job
336-
.job
337-
.data
338-
.rpc_credentials
339-
.client_id_for_logs()
340-
.unwrap_or("unknown"),
341-
queue_id = EIP7702_CONFIRM_QUEUE_ID,
324+
transaction_id = job.job.data.transaction_id,
342325
error = ?e,
343326
"Failed to queue success webhook"
344327
);
@@ -363,15 +346,7 @@ where
363346
if should_queue_webhook {
364347
if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) {
365348
tracing::error!(
366-
transaction_id = %job.job.data.transaction_id,
367-
chain_id = job.job.data.chain_id,
368-
client_id = job
369-
.job
370-
.data
371-
.rpc_credentials
372-
.client_id_for_logs()
373-
.unwrap_or("unknown"),
374-
queue_id = EIP7702_CONFIRM_QUEUE_ID,
349+
transaction_id = job.job.data.transaction_id,
375350
error = ?e,
376351
"Failed to queue nack webhook"
377352
);
@@ -395,30 +370,14 @@ where
395370
.add_remove_command(tx.pipeline(), &job.job.data.transaction_id);
396371

397372
tracing::error!(
398-
transaction_id = %job.job.data.transaction_id,
399-
chain_id = job.job.data.chain_id,
400-
client_id = job
401-
.job
402-
.data
403-
.rpc_credentials
404-
.client_id_for_logs()
405-
.unwrap_or("unknown"),
406-
queue_id = EIP7702_CONFIRM_QUEUE_ID,
373+
transaction_id = job.job.data.transaction_id,
407374
error = ?fail_data.error,
408375
"EIP-7702 confirmation job failed"
409376
);
410377

411378
if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) {
412379
tracing::error!(
413-
transaction_id = %job.job.data.transaction_id,
414-
chain_id = job.job.data.chain_id,
415-
client_id = job
416-
.job
417-
.data
418-
.rpc_credentials
419-
.client_id_for_logs()
420-
.unwrap_or("unknown"),
421-
queue_id = EIP7702_CONFIRM_QUEUE_ID,
380+
transaction_id = job.job.data.transaction_id,
422381
error = ?e,
423382
"Failed to queue fail webhook"
424383
);

executors/src/eip7702_executor/send.rs

Lines changed: 6 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ use crate::{
3636

3737
use super::confirm::{Eip7702ConfirmationHandler, Eip7702ConfirmationJobData};
3838

39-
const EIP7702_SEND_QUEUE_ID: &str = "eip7702_send";
40-
const EIP7702_CONFIRM_QUEUE_ID: &str = "eip7702_confirm";
41-
4239
// --- Job Payload ---
4340
#[derive(Serialize, Deserialize, Debug, Clone)]
4441
#[serde(rename_all = "camelCase")]
@@ -178,7 +175,7 @@ where
178175
type ErrorData = Eip7702SendError;
179176
type JobData = Eip7702SendJobData;
180177

181-
#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, chain_id = job.job.data.chain_id, client_id = ?job.job.data.rpc_credentials.client_id_for_logs(), queue_id = EIP7702_SEND_QUEUE_ID, stage = Self::stage_name(), executor = Self::executor_name()))]
178+
#[tracing::instrument(skip(self, job), fields(transaction_id = job.job.id, stage = Self::stage_name(), executor = Self::executor_name()))]
182179
async fn process(
183180
&self,
184181
job: &BorrowedJob<Self::JobData>,
@@ -389,15 +386,7 @@ where
389386

390387
if let Err(e) = tx.queue_job(confirmation_job) {
391388
tracing::error!(
392-
transaction_id = %job.job.data.transaction_id,
393-
chain_id = job.job.data.chain_id,
394-
client_id = job
395-
.job
396-
.data
397-
.rpc_credentials
398-
.client_id_for_logs()
399-
.unwrap_or("unknown"),
400-
queue_id = EIP7702_CONFIRM_QUEUE_ID,
389+
transaction_id = job.job.data.transaction_id,
401390
error = ?e,
402391
"Failed to enqueue confirmation job"
403392
);
@@ -406,15 +395,7 @@ where
406395
// Send webhook
407396
if let Err(e) = self.queue_success_webhook(job, success_data, tx) {
408397
tracing::error!(
409-
transaction_id = %job.job.data.transaction_id,
410-
chain_id = job.job.data.chain_id,
411-
client_id = job
412-
.job
413-
.data
414-
.rpc_credentials
415-
.client_id_for_logs()
416-
.unwrap_or("unknown"),
417-
queue_id = EIP7702_SEND_QUEUE_ID,
398+
transaction_id = job.job.data.transaction_id,
418399
error = ?e,
419400
"Failed to queue success webhook"
420401
);
@@ -430,15 +411,7 @@ where
430411
// Don't modify transaction registry on NACK - job will be retried
431412
if let Err(e) = self.queue_nack_webhook(job, nack_data, tx) {
432413
tracing::error!(
433-
transaction_id = %job.job.data.transaction_id,
434-
chain_id = job.job.data.chain_id,
435-
client_id = job
436-
.job
437-
.data
438-
.rpc_credentials
439-
.client_id_for_logs()
440-
.unwrap_or("unknown"),
441-
queue_id = EIP7702_SEND_QUEUE_ID,
414+
transaction_id = job.job.data.transaction_id,
442415
error = ?e,
443416
"Failed to queue nack webhook"
444417
);
@@ -456,30 +429,14 @@ where
456429
.add_remove_command(tx.pipeline(), &job.job.data.transaction_id);
457430

458431
tracing::error!(
459-
transaction_id = %job.job.data.transaction_id,
460-
chain_id = job.job.data.chain_id,
461-
client_id = job
462-
.job
463-
.data
464-
.rpc_credentials
465-
.client_id_for_logs()
466-
.unwrap_or("unknown"),
467-
queue_id = EIP7702_SEND_QUEUE_ID,
432+
transaction_id = job.job.data.transaction_id,
468433
error = ?fail_data.error,
469434
"EIP-7702 send job failed"
470435
);
471436

472437
if let Err(e) = self.queue_fail_webhook(job, fail_data, tx) {
473438
tracing::error!(
474-
transaction_id = %job.job.data.transaction_id,
475-
chain_id = job.job.data.chain_id,
476-
client_id = job
477-
.job
478-
.data
479-
.rpc_credentials
480-
.client_id_for_logs()
481-
.unwrap_or("unknown"),
482-
queue_id = EIP7702_SEND_QUEUE_ID,
439+
transaction_id = job.job.data.transaction_id,
483440
error = ?e,
484441
"Failed to queue fail webhook"
485442
);

executors/src/eoa/store/atomic.rs

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ use alloy::{
44
consensus::{Signed, TypedTransaction},
55
primitives::Address,
66
};
7-
use twmq::redis::{AsyncCommands, Pipeline};
8-
use twmq::redis::cluster_async::ClusterConnection;
7+
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};
98

109
use crate::{
1110
eoa::{
@@ -31,7 +30,6 @@ use crate::{
3130

3231
const MAX_RETRIES: u32 = 10;
3332
const RETRY_BASE_DELAY_MS: u64 = 10;
34-
const EOA_QUEUE_ID: &str = "eoa_executor";
3533

3634
pub trait SafeRedisTransaction: Send + Sync {
3735
type ValidationData;
@@ -45,7 +43,7 @@ pub trait SafeRedisTransaction: Send + Sync {
4543
) -> Self::OperationResult;
4644
fn validation(
4745
&self,
48-
conn: &mut ClusterConnection,
46+
conn: &mut ConnectionManager,
4947
store: &EoaExecutorStore,
5048
) -> impl Future<Output = Result<Self::ValidationData, TransactionStoreError>> + Send;
5149
fn watch_keys(&self) -> Vec<String>;
@@ -595,7 +593,7 @@ impl AtomicEoaExecutorStore {
595593
let ttl_seconds = self.completed_transaction_ttl_seconds as i64;
596594
pipeline.expire(&tx_data_key, ttl_seconds);
597595
pipeline.expire(
598-
&self.transaction_attempts_list_name(&pending_transaction.transaction_id),
596+
self.transaction_attempts_list_name(&pending_transaction.transaction_id),
599597
ttl_seconds,
600598
);
601599

@@ -614,18 +612,7 @@ impl AtomicEoaExecutorStore {
614612
&mut tx_context,
615613
webhook_queue.clone(),
616614
) {
617-
tracing::error!(
618-
transaction_id = %pending_transaction.transaction_id,
619-
chain_id = pending_transaction.user_request.chain_id,
620-
client_id = pending_transaction
621-
.user_request
622-
.rpc_credentials
623-
.client_id_for_logs()
624-
.unwrap_or("unknown"),
625-
queue_id = EOA_QUEUE_ID,
626-
"Failed to queue webhook for fail: {}",
627-
e
628-
);
615+
tracing::error!("Failed to queue webhook for fail: {}", e);
629616
}
630617
}
631618

@@ -683,7 +670,7 @@ impl AtomicEoaExecutorStore {
683670
let ttl_seconds = self.completed_transaction_ttl_seconds as i64;
684671
pipeline.expire(&tx_data_key, ttl_seconds);
685672
pipeline.expire(
686-
&self.transaction_attempts_list_name(&pending_transaction.transaction_id),
673+
self.transaction_attempts_list_name(&pending_transaction.transaction_id),
687674
ttl_seconds,
688675
);
689676
}
@@ -707,13 +694,6 @@ impl AtomicEoaExecutorStore {
707694
) {
708695
tracing::error!(
709696
transaction_id = %pending_transaction.transaction_id,
710-
chain_id = pending_transaction.user_request.chain_id,
711-
client_id = pending_transaction
712-
.user_request
713-
.rpc_credentials
714-
.client_id_for_logs()
715-
.unwrap_or("unknown"),
716-
queue_id = EOA_QUEUE_ID,
717697
error = ?e,
718698
"Failed to queue webhook for batch fail"
719699
);
@@ -835,7 +815,7 @@ impl SafeRedisTransaction for ResetNoncesTransaction<'_> {
835815

836816
async fn validation(
837817
&self,
838-
_conn: &mut ClusterConnection,
818+
_conn: &mut ConnectionManager,
839819
store: &EoaExecutorStore,
840820
) -> Result<Self::ValidationData, TransactionStoreError> {
841821
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;

0 commit comments

Comments
 (0)