-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathmod.rs
More file actions
558 lines (480 loc) · 20.1 KB
/
mod.rs
File metadata and controls
558 lines (480 loc) · 20.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
use alloy::consensus::Transaction;
use alloy::primitives::{Address, U256};
use alloy::providers::Provider;
use engine_core::{
chain::{Chain, ChainService},
credentials::{KmsClientCache, SigningCredential},
error::AlloyRpcErrorToEngineError,
signer::EoaSigner,
};
use engine_eip7702_core::delegated_account::DelegatedAccount;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use twmq::Queue;
use twmq::redis::cluster_async::ClusterConnection;
use twmq::{
DurableExecution, FailHookData, NackHookData, SuccessHookData,
hooks::TransactionContext,
job::{BorrowedJob, JobResult, RequeuePosition, ToJobResult},
};
use crate::eoa::authorization_cache::EoaAuthorizationCache;
use crate::eoa::store::{AtomicEoaExecutorStore, EoaExecutorStore, EoaHealth, SubmissionResult};
use crate::metrics::{
EoaMetrics, calculate_duration_seconds, current_timestamp_ms, record_eoa_job_processing_time,
};
use crate::webhook::WebhookJobHandler;
pub mod confirm;
pub mod error;
mod send;
mod transaction;
use error::{EoaExecutorWorkerError, SendContext};
// ========== SPEC-COMPLIANT CONSTANTS ==========
const MAX_INFLIGHT_PER_EOA: u64 = 100; // Default from spec
const MAX_RECYCLED_THRESHOLD: u64 = 50; // Circuit breaker from spec
const TARGET_TRANSACTIONS_PER_EOA: u64 = 10; // Fleet management from spec
const MIN_TRANSACTIONS_PER_EOA: u64 = 1; // Fleet management from spec
// ========== JOB DATA ==========
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct EoaExecutorWorkerJobData {
pub eoa_address: Address,
pub chain_id: u64,
pub noop_signing_credential: SigningCredential,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct EoaExecutorWorkerResult {
// what we did
/// Number of transactions we recovered from borrowed state
pub recovered_transactions: u32,
/// Number of transactions we confirmed
pub confirmed_transactions: u32,
/// Number of transactions we failed due to deterministic errors
pub failed_transactions: u32,
/// Number of transactions we sent
pub sent_transactions: u32,
/// Number of transactions that got replaced in the mempool and are now pending
pub replaced_transactions: u32,
// what we have left
/// Number of transactions currently in the submitted state
pub submitted_transactions: u32,
/// Number of transactions currently in the pending state
pub pending_transactions: u32,
/// Number of transactions currently in the borrowed state
pub borrowed_transactions: u32,
/// Number of recycled nonces
pub recycled_nonces: u32,
}
impl EoaExecutorWorkerResult {
pub fn is_work_remaining(&self) -> bool {
self.pending_transactions > 0
|| self.borrowed_transactions > 0
|| self.recycled_nonces > 0
|| self.submitted_transactions > 0
}
}
// ========== MAIN WORKER ==========
/// EOA Executor Worker
///
/// ## Core Workflow:
/// 1. **Acquire Lock Aggressively** - Takes over stalled workers using force acquisition. This is a lock over EOA:CHAIN
/// 2. **Crash Recovery** - Rebroadcasts borrowed transactions, handles deterministic failures
/// 3. **Confirmation Flow** - Fetches receipts, confirms transactions, handles nonce sync, requeues replaced transactions
/// 4. **Send Flow** - Processes recycled nonces first, then new transactions with in-flight budget control
/// 5. **Lock Release** - Explicit release in finally pattern as per spec
///
/// ## Key Features:
/// - **Atomic Operations**: All state transitions use Redis WATCH/MULTI/EXEC for durability
/// - **Borrowed State**: Mid-send crash recovery with atomic pending->borrowed->submitted transitions
/// - **Nonce Management**: Optimistic nonce tracking with recycled nonce priority
/// - **Error Classification**: Spec-compliant deterministic vs. possibly-sent error handling
/// - **Circuit Breakers**: Automatic recycled nonce nuking when threshold exceeded
/// - **Health Monitoring**: Balance checking with configurable thresholds
pub struct EoaExecutorJobHandler<CS>
where
CS: ChainService + Send + Sync + 'static,
{
pub chain_service: Arc<CS>,
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
pub authorization_cache: EoaAuthorizationCache,
pub redis: ClusterConnection,
pub namespace: Option<String>,
pub eoa_signer: Arc<EoaSigner>,
pub max_inflight: u64, // Note: Spec uses MAX_INFLIGHT_PER_EOA constant
pub max_recycled_nonces: u64, // Note: Spec uses MAX_RECYCLED_THRESHOLD constant
// EOA metrics abstraction with encapsulated configuration
pub eoa_metrics: EoaMetrics,
// KMS client cache for AWS KMS credentials
pub kms_client_cache: KmsClientCache,
// TTL for completed transactions
pub completed_transaction_ttl_seconds: u64,
}
impl<CS> DurableExecution for EoaExecutorJobHandler<CS>
where
CS: ChainService + Send + Sync + 'static,
{
type Output = EoaExecutorWorkerResult;
type ErrorData = EoaExecutorWorkerError;
type JobData = EoaExecutorWorkerJobData;
#[tracing::instrument(name = "eoa_executor_worker", skip_all, fields(eoa = ?job.job.data.eoa_address, chain_id = job.job.data.chain_id))]
async fn process(
&self,
job: &BorrowedJob<Self::JobData>,
) -> JobResult<Self::Output, Self::ErrorData> {
let data = &job.job.data;
// 1. GET CHAIN
let chain = self
.chain_service
.get_chain(data.chain_id)
.map_err(|e| EoaExecutorWorkerError::ChainServiceError {
chain_id: data.chain_id,
message: format!("Failed to get chain: {e}"),
})
.map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?;
let worker_id = format!("{}:{}", uuid::Uuid::new_v4(), job.lease_token);
// 2. CREATE SCOPED STORE (acquires lock)
let scoped = EoaExecutorStore::new(
self.redis.clone(),
self.namespace.clone(),
data.eoa_address,
data.chain_id,
self.completed_transaction_ttl_seconds,
)
.acquire_eoa_lock_aggressively(&worker_id, self.eoa_metrics.clone())
.await
.map_err(|e| Into::<EoaExecutorWorkerError>::into(e).handle())?;
let delegated_account = DelegatedAccount::new(data.eoa_address, chain.clone());
// if there's an error checking 7702 delegation here, we'll just assume it's not a minimal account for the purposes of max in flight
let is_minimal_account = self
.authorization_cache
.is_minimal_account(&delegated_account, None)
.await
.inspect_err(|e| {
tracing::error!(error = ?e, "Error checking 7702 delegation");
})
.ok()
.unwrap_or(false);
let chain_id = chain.chain_id();
// Inject KMS cache into the noop signing credential (after deserialization from Redis)
let noop_signing_credential = data
.noop_signing_credential
.clone()
.with_aws_kms_cache(&self.kms_client_cache);
let worker = EoaExecutorWorker {
store: scoped,
chain,
eoa: data.eoa_address,
chain_id: data.chain_id,
noop_signing_credential,
max_inflight: if is_minimal_account {
1
} else if chain_id == 1628 {
20
} else {
self.max_inflight
},
max_recycled_nonces: self.max_recycled_nonces,
webhook_queue: self.webhook_queue.clone(),
signer: self.eoa_signer.clone(),
kms_client_cache: self.kms_client_cache.clone(),
};
let job_start_time = current_timestamp_ms();
let workflow_result = worker.execute_main_workflow().await;
// Always release lock, regardless of workflow success/failure
if let Err(e) = worker.release_eoa_lock().await {
tracing::error!(error = ?e, worker_id = worker_id, "Error releasing EOA lock");
}
// Propagate workflow error after releasing lock
let result = workflow_result?;
// Record EOA job processing metrics
let job_end_time = current_timestamp_ms();
let job_duration = calculate_duration_seconds(job_start_time, job_end_time);
record_eoa_job_processing_time(data.chain_id, job_duration);
tracing::info!(
eoa = ?data.eoa_address,
chain_id = data.chain_id,
worker_id = worker_id,
job_duration_seconds = job_duration,
work_remaining = result.is_work_remaining(),
result = ?result,
"JOB_LIFECYCLE - EOA executor job completed"
);
let delay = if is_minimal_account {
Some(Duration::from_secs(2))
} else {
Some(Duration::from_millis(200))
};
if result.is_work_remaining() {
Err(EoaExecutorWorkerError::WorkRemaining { result })
.map_err_nack(delay, RequeuePosition::Last)
} else {
Ok(result)
}
// // initiate health data if doesn't exist
// self.get_eoa_health(&scoped, &chain)
// .await
// .map_err_nack(Some(Duration::from_secs(10)), RequeuePosition::Last)?;
// // Execute main workflow with proper error handling
// self.execute_main_workflow(&scoped, &chain).await
}
async fn on_success(
&self,
_job: &BorrowedJob<Self::JobData>,
_success_data: SuccessHookData<'_, Self::Output>,
_tx: &mut TransactionContext<'_>,
) {
// Lock is already released in process() with ownership checking
}
async fn on_nack(
&self,
_job: &BorrowedJob<Self::JobData>,
_nack_data: NackHookData<'_, Self::ErrorData>,
_tx: &mut TransactionContext<'_>,
) {
// Lock is already released in process() with ownership checking
}
async fn on_fail(
&self,
_job: &BorrowedJob<Self::JobData>,
_fail_data: FailHookData<'_, Self::ErrorData>,
_tx: &mut TransactionContext<'_>,
) {
// Lock is already released in process() with ownership checking
}
}
pub struct EoaExecutorWorker<C: Chain> {
pub store: AtomicEoaExecutorStore,
pub chain: C,
pub eoa: Address,
pub chain_id: u64,
pub noop_signing_credential: SigningCredential,
pub max_inflight: u64,
pub max_recycled_nonces: u64,
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
pub signer: Arc<EoaSigner>,
pub kms_client_cache: KmsClientCache,
}
impl<C: Chain> EoaExecutorWorker<C> {
/// Execute the main EOA worker workflow
async fn execute_main_workflow(
&self,
) -> JobResult<EoaExecutorWorkerResult, EoaExecutorWorkerError> {
// 1. CRASH RECOVERY
let start_time = current_timestamp_ms();
let recovered = self
.recover_borrowed_state()
.await
.inspect_err(|e| {
tracing::error!(error = ?e, "Error in recover_borrowed_state");
})
.map_err(|e| e.handle())?;
let duration = calculate_duration_seconds(start_time, current_timestamp_ms());
tracing::info!(
eoa = ?self.eoa,
chain_id = self.chain_id,
worker_id = self.store.worker_id(),
duration_seconds = duration,
recovered_count = recovered,
"JOB_LIFECYCLE - Crash recovery completed"
);
// 2. CONFIRM FLOW
let start_time = current_timestamp_ms();
let confirmations_report = self
.confirm_flow()
.await
.inspect_err(|e| {
tracing::error!(error = ?e, "Error in confirm flow");
})
.map_err(|e| e.handle())?;
let duration = calculate_duration_seconds(start_time, current_timestamp_ms());
tracing::info!(
eoa = ?self.eoa,
chain_id = self.chain_id,
worker_id = self.store.worker_id(),
duration_seconds = duration,
confirmed = confirmations_report.moved_to_success,
failed = confirmations_report.moved_to_pending,
"JOB_LIFECYCLE - Confirm flow completed"
);
// 3. SEND FLOW
let start_time = current_timestamp_ms();
let sent = self
.send_flow()
.await
.inspect_err(|e| {
tracing::error!(error = ?e, "Error in send_flow");
})
.map_err(|e| e.handle())?;
let duration = calculate_duration_seconds(start_time, current_timestamp_ms());
tracing::info!(
eoa = ?self.eoa,
chain_id = self.chain_id,
worker_id = self.store.worker_id(),
duration_seconds = duration,
sent_count = sent,
"JOB_LIFECYCLE - Send flow completed"
);
// 4. CHECK FOR REMAINING WORK
let counts = self
.store
.get_all_counts()
.await
.map_err(EoaExecutorWorkerError::from)
.inspect_err(|e| {
tracing::error!(error = ?e, "Error in get_all_counts");
})
.map_err(|e| e.handle())?;
tracing::info!(
recovered = recovered,
confirmed = confirmations_report.moved_to_success,
temp_failed = confirmations_report.moved_to_pending,
replacements = confirmations_report.moved_to_pending,
currently_submitted = counts.submitted_transactions,
currently_pending = counts.pending_transactions,
currently_borrowed = counts.borrowed_transactions,
currently_recycled = counts.recycled_nonces,
"JOB_LIFECYCLE - Check for remaining work completed"
);
Ok(EoaExecutorWorkerResult {
recovered_transactions: recovered,
confirmed_transactions: confirmations_report.moved_to_success as u32,
failed_transactions: confirmations_report.moved_to_pending as u32,
sent_transactions: sent,
replaced_transactions: confirmations_report.moved_to_pending as u32,
submitted_transactions: counts.submitted_transactions as u32,
pending_transactions: counts.pending_transactions as u32,
borrowed_transactions: counts.borrowed_transactions as u32,
recycled_nonces: counts.recycled_nonces as u32,
})
}
// ========== CRASH RECOVERY ==========
#[tracing::instrument(skip_all)]
async fn recover_borrowed_state(&self) -> Result<u32, EoaExecutorWorkerError> {
let borrowed_transactions = self.store.peek_borrowed_transactions().await?;
let mut borrowed_transactions = self.store.hydrate_all(borrowed_transactions).await?;
if borrowed_transactions.is_empty() {
return Ok(0);
}
tracing::warn!(
"Recovering {} borrowed transactions. This indicates a worker crash or system issue",
borrowed_transactions.len()
);
// Sort borrowed transactions by nonce to ensure proper ordering
borrowed_transactions.sort_by_key(|tx| tx.signed_transaction.nonce());
// Rebroadcast all transactions in parallel
let rebroadcast_futures: Vec<_> = borrowed_transactions
.iter()
.map(|borrowed| {
let tx_envelope = borrowed.signed_transaction.clone().into();
let nonce = borrowed.signed_transaction.nonce();
let transaction_id = borrowed.transaction_id.clone();
tracing::info!(
transaction_id = ?transaction_id,
nonce = nonce,
"Recovering borrowed transaction"
);
async move {
let send_result = self.chain.provider().send_tx_envelope(tx_envelope).await;
(borrowed, send_result)
}
})
.collect();
let rebroadcast_results = futures::future::join_all(rebroadcast_futures).await;
// Convert results to SubmissionResult for batch processing
let submission_results: Vec<SubmissionResult> = rebroadcast_results
.into_iter()
.map(|(borrowed, send_result)| {
SubmissionResult::from_send_result(
borrowed,
send_result,
SendContext::Rebroadcast,
&self.chain,
)
})
.collect();
// TODO: Implement post-processing analysis for balance threshold updates and nonce resets
// Currently we lose the granular error handling that was in the individual atomic operations.
// Consider:
// 1. Analyzing submission_results for specific error patterns
// 2. Calling update_balance_threshold if needed
// 3. Detecting nonce reset conditions
// 4. Or move this logic into the batch processor itself
// Process all results in one batch operation
let report = self
.store
.process_borrowed_transactions(submission_results, self.webhook_queue.clone())
.await?;
// TODO: Handle post-processing updates here if needed
// For now, we skip the individual error analysis that was done in the old atomic approach
tracing::info!(
"Recovered {} transactions: {} submitted, {} recycled, {} failed",
report.total_processed,
report.moved_to_submitted,
report.moved_to_pending,
report.failed_transactions
);
Ok(report.total_processed as u32)
}
// ========== HEALTH ACCESSOR ==========
/// Get EOA health, initializing it if it doesn't exist
/// This method ensures the health data is always available for the worker
async fn get_eoa_health(&self) -> Result<EoaHealth, EoaExecutorWorkerError> {
let store_health = self.store.get_eoa_health().await?;
match store_health {
Some(health) => Ok(health),
None => {
// Initialize with fresh data from chain
let balance = self
.chain
.provider()
.get_balance(self.eoa)
.await
.map_err(|e| {
let engine_error = e.to_engine_error(&self.chain);
EoaExecutorWorkerError::RpcError {
message: format!(
"Failed to get balance during initialization: {engine_error}"
),
inner_error: engine_error,
}
})?;
let now = current_timestamp_ms();
let health = EoaHealth {
balance,
balance_threshold: U256::ZERO,
balance_fetched_at: now,
last_confirmation_at: now,
last_nonce_movement_at: now,
nonce_resets: Vec::new(),
};
// Save to store
self.store.update_health_data(&health).await?;
Ok(health)
}
}
}
#[tracing::instrument(skip_all, fields(eoa = ?self.eoa, chain_id = self.chain.chain_id()))]
async fn update_balance_threshold(&self) -> Result<(), EoaExecutorWorkerError> {
let mut health = self.get_eoa_health().await?;
tracing::info!("Updating balance threshold");
let balance_threshold = self
.chain
.provider()
.get_balance(self.eoa)
.await
.map_err(|e| {
let engine_error = e.to_engine_error(&self.chain);
EoaExecutorWorkerError::RpcError {
message: format!("Failed to get balance: {engine_error}"),
inner_error: engine_error,
}
})?;
health.balance_threshold = balance_threshold;
self.store.update_health_data(&health).await?;
Ok(())
}
async fn release_eoa_lock(self) -> Result<(), EoaExecutorWorkerError> {
self.store.release_eoa_lock().await?;
Ok(())
}
}