diff --git a/.env.example b/.env.example index 45d0bb69..fdb4af68 100644 --- a/.env.example +++ b/.env.example @@ -18,4 +18,12 @@ TIPS_AUDIT_S3_CONFIG_TYPE=manual TIPS_AUDIT_S3_ENDPOINT=http://localhost:7000 TIPS_AUDIT_S3_REGION=us-east-1 TIPS_AUDIT_S3_ACCESS_KEY_ID=minioadmin -TIPS_AUDIT_S3_SECRET_ACCESS_KEY=minioadmin \ No newline at end of file +TIPS_AUDIT_S3_SECRET_ACCESS_KEY=minioadmin + +# Maintenance +TIPS_MAINTENANCE_DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres +TIPS_MAINTENANCE_RPC_NODE=http://localhost:2222 +TIPS_MAINTENANCE_KAFKA_BROKERS=localhost:9092 +TIPS_MAINTENANCE_KAFKA_TOPIC=tips-audit +TIPS_MAINTENANCE_POLL_INTERVAL_MS=250 +TIPS_MAINTENANCE_LOG_LEVEL=info \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 4bccb7a9..9c3376ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -381,6 +381,17 @@ dependencies = [ "wasmtimer", ] +[[package]] +name = "alloy-rpc-types" +version = "1.0.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c079797bbda28d6a5a2e89bcbf788bf85b4ae2a4f0e57eed9e2d66637fe78c58" +dependencies = [ + "alloy-primitives", + "alloy-serde", + "serde", +] + [[package]] name = "alloy-rpc-types-admin" version = "1.0.33" @@ -7074,6 +7085,27 @@ dependencies = [ "url", ] +[[package]] +name = "tips-maintenance" +version = "0.1.0" +dependencies = [ + "alloy-primitives", + "alloy-provider", + "alloy-rpc-types", + "anyhow", + "clap", + "dotenvy", + "op-alloy-network", + "rdkafka", + "serde_json", + "tips-audit", + "tips-datastore", + "tokio", + "tracing", + "tracing-subscriber 0.3.20", + "url", +] + [[package]] name = "tokio" version = "1.47.1" diff --git a/Cargo.toml b/Cargo.toml index 5a784e60..e0dd4b03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,11 @@ [workspace] -members = ["crates/datastore", "crates/audit", "crates/ingress"] +members = ["crates/datastore", "crates/audit", "crates/ingress", "crates/maintenance"] resolver = "2" [workspace.dependencies] tips-datastore = { path = "crates/datastore" } tips-audit = { path = "crates/audit" } +tips-maintenance = { path = "crates/maintenance" } # Reth diff --git a/README.md b/README.md index 0904de9c..cdf290c3 100644 --- a/README.md +++ b/README.md @@ -21,3 +21,6 @@ Event streaming and archival system that: ### 🔌 Ingress (`crates/ingress`) The main entry point that provides a JSON-RPC API for receiving transactions and bundles. + +### 🔨 Maintenance (`crates/maintenance`) +A service that maintains the health of the TIPS DataStore, by removing stale or included bundles. diff --git a/crates/maintenance/Cargo.toml b/crates/maintenance/Cargo.toml new file mode 100644 index 00000000..9d7eec66 --- /dev/null +++ b/crates/maintenance/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "tips-maintenance" +version = "0.1.0" +edition = "2024" + +[[bin]] +name = "tips-maintenance" +path = "src/main.rs" + +[dependencies] +tips-datastore.workspace = true +tips-audit.workspace = true +alloy-provider.workspace = true +alloy-primitives.workspace = true +alloy-rpc-types.workspace = true +op-alloy-network.workspace = true +tokio.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +anyhow.workspace = true +clap.workspace = true +dotenvy.workspace = true +rdkafka.workspace = true +serde_json.workspace = true +url.workspace = true \ No newline at end of file diff --git a/crates/maintenance/src/main.rs b/crates/maintenance/src/main.rs new file mode 100644 index 00000000..a9c54607 --- /dev/null +++ b/crates/maintenance/src/main.rs @@ -0,0 +1,188 @@ +use alloy_provider::network::TransactionResponse; +use alloy_provider::network::primitives::BlockTransactions; +use alloy_provider::{Provider, ProviderBuilder, RootProvider}; +use anyhow::Result; +use clap::Parser; +use op_alloy_network::Optimism; +use rdkafka::ClientConfig; +use rdkafka::producer::FutureProducer; +use std::time::Duration; +use tips_audit::{KafkaMempoolEventPublisher, MempoolEvent, MempoolEventPublisher}; +use tips_datastore::{BundleDatastore, PostgresDatastore}; +use tokio::time::sleep; +use tracing::{error, info, warn}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use url::Url; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Args { + #[arg(long, env = "TIPS_MAINTENANCE_RPC_NODE")] + node_url: Url, + + #[arg(long, env = "TIPS_MAINTENANCE_KAFKA_BROKERS")] + kafka_brokers: String, + + #[arg( + long, + env = "TIPS_MAINTENANCE_KAFKA_TOPIC", + default_value = "mempool-events" + )] + kafka_topic: String, + + #[arg(long, env = "TIPS_MAINTENANCE_DATABASE_URL")] + database_url: String, + + #[arg(long, env = "TIPS_MAINTENANCE_POLL_INTERVAL_MS", default_value = "250")] + poll_interval: u64, + + #[arg(long, env = "TIPS_MAINTENANCE_LOG_LEVEL", default_value = "info")] + log_level: String, +} + +#[tokio::main] +async fn main() -> Result<()> { + dotenvy::dotenv().ok(); + + let args = Args::parse(); + + let log_level = match args.log_level.to_lowercase().as_str() { + "trace" => tracing::Level::TRACE, + "debug" => tracing::Level::DEBUG, + "info" => tracing::Level::INFO, + "warn" => tracing::Level::WARN, + "error" => tracing::Level::ERROR, + _ => { + warn!( + "Invalid log level '{}', defaulting to 'info'", + args.log_level + ); + tracing::Level::INFO + } + }; + + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level.to_string())), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + info!("Starting maintenance service"); + + let provider: RootProvider = ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(args.node_url); + + let datastore = PostgresDatastore::connect(args.database_url).await?; + + let kafka_producer: FutureProducer = ClientConfig::new() + .set("bootstrap.servers", &args.kafka_brokers) + .set("message.timeout.ms", "5000") + .create()?; + + let publisher = KafkaMempoolEventPublisher::new(kafka_producer, args.kafka_topic); + + let mut last_processed_block: Option = None; + + loop { + match process_new_blocks(&provider, &datastore, &publisher, &mut last_processed_block).await + { + Ok(_) => {} + Err(e) => { + error!(message = "Error processing blocks", error=%e); + } + } + + sleep(Duration::from_millis(args.poll_interval)).await; + } +} + +async fn process_new_blocks( + provider: &impl Provider, + datastore: &PostgresDatastore, + publisher: &KafkaMempoolEventPublisher, + last_processed_block: &mut Option, +) -> Result<()> { + let latest_block_number = provider.get_block_number().await?; + + let start_block = last_processed_block + .map(|n| n + 1) + .unwrap_or(latest_block_number); + + if start_block > latest_block_number { + return Ok(()); + } + + info!(message = "Processing blocks", from=%start_block, to=%latest_block_number); + + for block_number in start_block..=latest_block_number { + match process_block(provider, datastore, publisher, block_number).await { + Ok(_) => { + info!(message = "Successfully processed block", block=%block_number); + *last_processed_block = Some(block_number); + } + Err(e) => { + return Err(e); + } + } + } + + Ok(()) +} + +async fn process_block( + provider: &impl Provider, + datastore: &PostgresDatastore, + publisher: &KafkaMempoolEventPublisher, + block_number: u64, +) -> Result<()> { + let block = provider + .get_block_by_number(block_number.into()) + .full() + .await? + .ok_or_else(|| anyhow::anyhow!("Block {} not found", block_number))?; + + let block_hash = block.header.hash; + + let transactions = match &block.transactions { + BlockTransactions::Full(txs) => txs, + BlockTransactions::Hashes(_) => { + return Err(anyhow::anyhow!( + "Block transactions returned as hashes only, expected full transactions" + )); + } + BlockTransactions::Uncle => { + return Err(anyhow::anyhow!("Block contains uncle transactions")); + } + }; + + for tx in transactions { + let tx_hash = tx.tx_hash(); + info!(message = "Processing transaction", tx_hash=%tx_hash); + + match datastore.find_bundle_by_transaction_hash(tx_hash).await? { + Some(bundle_id) => { + info!(message = "Found bundle for transaction", bundle_id=%bundle_id, tx_hash=%tx_hash); + + let event = MempoolEvent::BlockIncluded { + bundle_id, + block_number, + block_hash, + }; + + publisher.publish(event).await?; + datastore.remove_bundle(bundle_id).await?; + + info!(message = "Removed bundle for transaction", bundle_id=%bundle_id, tx_hash=%tx_hash); + } + None => { + error!(message = "Transaction not part of tracked bundle", tx_hash=%tx_hash); + } + } + } + + Ok(()) +} diff --git a/justfile b/justfile index 507c421a..52d48326 100644 --- a/justfile +++ b/justfile @@ -29,4 +29,7 @@ audit: cargo run --bin tips-audit ingress: - cargo run --bin tips-ingress \ No newline at end of file + cargo run --bin tips-ingress + +maintenance: + cargo run --bin tips-maintenance \ No newline at end of file