diff --git a/.github/workflows/backend-ci.yml b/.github/workflows/backend-ci.yml index 49d57a8..6c0ace9 100644 --- a/.github/workflows/backend-ci.yml +++ b/.github/workflows/backend-ci.yml @@ -63,7 +63,9 @@ jobs: - name: Run database migrations run: | - PGPASSWORD=utility_secret psql -h localhost -U utility -d utility_test -f src/time_series/compress.sql 2>&1 || true + PGPASSWORD=utility_secret psql -h localhost -U utility -d utility_test -f src/time_series/compress.sql + PGPASSWORD=utility_secret psql -h localhost -U utility -d utility_test -f src/soroban/sync.sql + PGPASSWORD=utility_secret psql -h localhost -U utility -d utility_test -f src/settlement/schema.sql env: PGPASSWORD: utility_secret diff --git a/docker-compose.yml b/docker-compose.yml index 56fe363..431d59d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,7 @@ services: volumes: - pgdata:/var/lib/postgresql/data - ./src/time_series/compress.sql:/docker-entrypoint-initdb.d/01-compress.sql + - ./src/settlement/schema.sql:/docker-entrypoint-initdb.d/02-settlement.sql healthcheck: test: ["CMD-SHELL", "pg_isready -U utility -d utility_backend"] interval: 10s diff --git a/src/api/handlers.rs b/src/api/handlers.rs index 662c348..4bdc301 100644 --- a/src/api/handlers.rs +++ b/src/api/handlers.rs @@ -4,6 +4,9 @@ use axum::{ use ed25519_dalek::VerifyingKey; use hex; use serde::{Deserialize, Serialize}; + +use crate::api::AppState; + use sqlx::{Pool, Postgres}; use std::sync::Arc; @@ -11,7 +14,6 @@ use crate::api::metrics; use crate::api::middleware::DynamicRateLimiter; use crate::gateway::crypto::global_registry; use crate::gateway::lock::{ActiveLock, AdvisoryLock}; -use crate::soroban::sequencer::NonceSequencer; use crate::tariffs::engine::{global_tariff_engine, TariffContext, TariffExplanation}; use crate::time_series::analytics::{global_engine, DiagnosticReport}; use crate::time_series::compression::CompressionStatus; @@ -47,10 +49,8 @@ pub struct GridNonceStatus { pub high_water_mark: u64, } -pub async fn nonce_status( - State(sequencer): State>, -) -> Json> { - let marks = sequencer.get_all_grid_high_water_marks(); +pub async fn nonce_status(State(state): State) -> Json> { + let marks = state.sequencer.get_all_grid_high_water_marks(); let statuses: Vec = marks .into_iter() .map(|(grid_id, hwm)| GridNonceStatus { @@ -136,8 +136,43 @@ pub async fn submit_reading( } } -pub async fn settle_account(Json(_body): Json) -> Json<&'static str> { - Json("settlement initiated") +pub async fn settle_account( + State(state): State, + Json(body): Json, +) -> Result, StatusCode> { + let rpc_url = + std::env::var("SOROBAN_RPC_URL").unwrap_or_else(|_| "http://localhost:8000".into()); + let finalizer = crate::settlement::finalizer::Finalizer::new( + state.pool.clone(), + rpc_url, + state.breaker.clone(), + ); + let mint_queue = crate::settlement::mint_queue::MintQueue::new(state.pool); + + // In a real scenario, we'd get readings from the database. + // Here we simulate a batch for the requested meter and a generic resource type (e.g. water). + let batch_id = format!("batch-{}", uuid::Uuid::new_v4()); + let resource_type = "water"; // Example + let readings = vec![(chrono::Utc::now(), body.resource_units)]; + + let engine = crate::tariffs::engine::TariffEngine::new(vec![]); // Default tariff + + engine + .evaluate_and_finalize( + &batch_id, + resource_type, + &readings, + &finalizer, + &mint_queue, + &body.destination_wallet, + ) + .await + .map_err(|e| { + tracing::error!(error = %e, "settlement failed"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(Json("settlement completed")) } pub async fn get_diagnostics( diff --git a/src/api/mod.rs b/src/api/mod.rs index 09386c9..2641268 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,9 +1,11 @@ use crate::api::middleware::DynamicRateLimiter; use crate::gateway::lock::AdvisoryLock; +use crate::soroban::rpc::CircuitBreaker; use crate::soroban::sequencer::NonceSequencer; use axum::extract::FromRef; use sqlx::{Pool, Postgres}; use std::sync::Arc; +use tokio::sync::Mutex; pub mod alloc_tracker; pub mod handlers; @@ -14,8 +16,9 @@ pub mod router; #[derive(Clone)] pub struct AppState { pub sequencer: Arc, - pub db_pool: Pool, + pub pool: Pool, pub advisory_lock: Arc, + pub breaker: Arc>, pub rate_limiter: Arc, } @@ -27,7 +30,7 @@ impl FromRef for Arc { impl FromRef for Pool { fn from_ref(state: &AppState) -> Self { - state.db_pool.clone() + state.pool.clone() } } @@ -37,6 +40,12 @@ impl FromRef for Arc { } } +impl FromRef for Arc> { + fn from_ref(state: &AppState) -> Self { + state.breaker.clone() + } +} + impl FromRef for Arc { fn from_ref(state: &AppState) -> Self { state.rate_limiter.clone() diff --git a/src/api/router.rs b/src/api/router.rs index ecd6669..4d991b6 100644 --- a/src/api/router.rs +++ b/src/api/router.rs @@ -6,7 +6,7 @@ use axum::{ use tower_http::cors::CorsLayer; use super::handlers; -use crate::api::AppState; +use super::AppState; pub async fn build_router(state: AppState) -> anyhow::Result { let cors = CorsLayer::permissive(); diff --git a/src/main.rs b/src/main.rs index 6c4f85a..a43059f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,13 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; use tracing_subscriber::EnvFilter; use utility_backend::api::middleware::DynamicRateLimiter; use utility_backend::api::AppState; use utility_backend::gateway::lock::AdvisoryLock; +use utility_backend::soroban::rpc::CircuitBreaker; use utility_backend::soroban::sequencer::NonceSequencer; use utility_backend::time_series::compression::{ init_global_compression_manager, spawn_compression_monitor, CompressionPolicy, @@ -52,11 +54,13 @@ async fn main() -> anyhow::Result<()> { let _transport = spawn_transport_monitors(TcpTransportConfig::default()); let advisory_lock = Arc::new(AdvisoryLock::postgres(db_pool.clone())); + let breaker = Arc::new(Mutex::new(CircuitBreaker::new(5))); let rate_limiter = DynamicRateLimiter::new(); let state = AppState { sequencer, - db_pool, + pool: db_pool, advisory_lock, + breaker, rate_limiter, }; diff --git a/src/settlement/finalizer.rs b/src/settlement/finalizer.rs new file mode 100644 index 0000000..459a9c7 --- /dev/null +++ b/src/settlement/finalizer.rs @@ -0,0 +1,113 @@ +use crate::settlement::mint_queue::MintQueue; +use crate::soroban::rpc::CircuitBreaker; +use hex; +use sha2::{Digest, Sha256}; +use sqlx::{PgPool, Postgres, Transaction}; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::{info, warn}; + +pub struct Finalizer { + pool: PgPool, + rpc_url: String, + mint_queue: MintQueue, + breaker: Arc>, +} + +impl Finalizer { + pub fn new(pool: PgPool, rpc_url: String, breaker: Arc>) -> Self { + let mint_queue = MintQueue::new(pool.clone()); + Self { + pool, + rpc_url, + mint_queue, + breaker, + } + } + + pub async fn finalize_mint( + &self, + batch_id: &str, + resource_type: &str, + ) -> Result<(), Box> { + // 1. Generate idempotency key: SHA256(batch_id || resource_type || 'mint') + let mut hasher = Sha256::new(); + hasher.update(batch_id); + hasher.update(resource_type); + hasher.update("mint"); + let idempotency_key = hex::encode(hasher.finalize()); + + // 2. Attempt to mark as processed using a transaction for atomicity and deduplication + let mut tx: Transaction<'_, Postgres> = self.pool.begin().await?; + + let result = sqlx::query( + r#" + INSERT INTO processed_mints (batch_id, resource_type, idempotency_key) + VALUES ($1, $2, $3) + ON CONFLICT (batch_id, resource_type) DO NOTHING + RETURNING id + "#, + ) + .bind(batch_id) + .bind(resource_type) + .bind(idempotency_key.clone()) + .fetch_optional(&mut *tx) + .await?; + + if result.is_none() { + info!(batch_id = %batch_id, resource_type = %resource_type, "mint already processed, skipping"); + tx.rollback().await?; + return Ok(()); + } + + // 3. Get pending mints for this batch and resource type + let pending = self.mint_queue.get_pending(batch_id).await?; + let filtered_pending: Vec<_> = pending + .into_iter() + .filter(|e| e.resource_type == resource_type) + .collect(); + + if filtered_pending.is_empty() { + warn!(batch_id = %batch_id, resource_type = %resource_type, "no pending mints found for finalization"); + tx.rollback().await?; + return Ok(()); + } + + // AGGREGATION: We must mint the TOTAL amount in ONE transaction to honor the single idempotency key. + let total_amount: f64 = filtered_pending.iter().map(|e| e.amount).sum(); + let destination = filtered_pending[0].destination_wallet.clone(); // Assume same destination for batch/resource + + // 4. Submit to Soroban RPC + let payload = serde_json::json!({ + "jsonrpc": "2.0", + "id": format!("mint-{}-{}", batch_id, resource_type), + "method": "sendTransaction", + "params": { + "batch_id": batch_id, + "resource_type": resource_type, + "amount": total_amount, + "destination": destination, + "idempotency_key": idempotency_key + } + }); + + let mut breaker = self.breaker.lock().await; + match breaker.call_rpc(&self.rpc_url, payload).await { + Ok(_) => { + info!(batch_id = %batch_id, "soroban aggregated mint transaction submitted"); + for event in filtered_pending { + self.mint_queue.remove_event(event.id).await?; + } + } + Err(e) => { + warn!(batch_id = %batch_id, error = %e, "soroban aggregated mint transaction failed"); + tx.rollback().await?; + return Err(e.into()); + } + } + + tx.commit().await?; + info!(batch_id = %batch_id, resource_type = %resource_type, "mint finalization complete"); + Ok(()) + } +} diff --git a/src/settlement/mint_queue.rs b/src/settlement/mint_queue.rs new file mode 100644 index 0000000..8dea496 --- /dev/null +++ b/src/settlement/mint_queue.rs @@ -0,0 +1,73 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::{FromRow, PgPool}; +use uuid::Uuid; + +#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] +pub struct MintEvent { + pub id: Uuid, + pub batch_id: String, + pub resource_type: String, + pub amount: f64, + pub destination_wallet: String, + pub created_at: Option>, +} + +pub struct MintQueue { + pool: PgPool, +} + +impl MintQueue { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } + + pub async fn enqueue( + &self, + batch_id: &str, + resource_type: &str, + amount: f64, + destination_wallet: &str, + ) -> Result { + let id = Uuid::new_v4(); + sqlx::query( + r#" + INSERT INTO pending_mints (id, batch_id, resource_type, amount, destination_wallet) + VALUES ($1, $2, $3, $4, $5) + "#, + ) + .bind(id) + .bind(batch_id) + .bind(resource_type) + .bind(amount) + .bind(destination_wallet) + .execute(&self.pool) + .await?; + + Ok(id) + } + + pub async fn get_pending(&self, batch_id: &str) -> Result, sqlx::Error> { + let events = sqlx::query_as::<_, MintEvent>( + r#" + SELECT id, batch_id, resource_type, amount, destination_wallet, created_at + FROM pending_mints + WHERE batch_id = $1 + "#, + ) + .bind(batch_id) + .fetch_all(&self.pool) + .await?; + + Ok(events) + } + + pub async fn remove_event(&self, id: Uuid) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM pending_mints WHERE id = $1") + .bind(id) + .execute(&self.pool) + .await?; + + Ok(()) + } +} diff --git a/src/settlement/mod.rs b/src/settlement/mod.rs index 8c7bbfc..e93b8b4 100644 --- a/src/settlement/mod.rs +++ b/src/settlement/mod.rs @@ -1,8 +1,10 @@ pub mod attestation_verifier; pub mod credit_flow; pub mod executor; +pub mod finalizer; pub mod merkle; pub mod messages; +pub mod mint_queue; pub mod queue; pub mod shard_router; pub mod submitter; diff --git a/src/settlement/schema.sql b/src/settlement/schema.sql new file mode 100644 index 0000000..2fd92eb --- /dev/null +++ b/src/settlement/schema.sql @@ -0,0 +1,21 @@ +-- Settlement schema for resource token minting + +CREATE TABLE IF NOT EXISTS pending_mints ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + batch_id TEXT NOT NULL, + resource_type TEXT NOT NULL, + amount DOUBLE PRECISION NOT NULL, + destination_wallet TEXT NOT NULL, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS processed_mints ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + batch_id TEXT NOT NULL, + resource_type TEXT NOT NULL, + idempotency_key TEXT NOT NULL UNIQUE, + processed_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + UNIQUE(batch_id, resource_type) +); + +CREATE INDEX IF NOT EXISTS idx_pending_mints_batch_id ON pending_mints(batch_id); diff --git a/src/soroban/rpc.rs b/src/soroban/rpc.rs index 5f64d40..0ee647d 100644 --- a/src/soroban/rpc.rs +++ b/src/soroban/rpc.rs @@ -45,7 +45,6 @@ pub struct CircuitBreakerSnapshot { pub window_size: usize, } -#[allow(dead_code)] pub struct CircuitBreaker { samples: VecDeque, state: CircuitState, diff --git a/src/tariffs/engine.rs b/src/tariffs/engine.rs index d0f086c..8d15f04 100644 --- a/src/tariffs/engine.rs +++ b/src/tariffs/engine.rs @@ -13,7 +13,7 @@ pub enum TariffTier { Dynamic, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Deserialize, Serialize, Clone, Debug)] pub struct TariffSchedule { pub tier: TariffTier, pub rate_per_unit: f64, @@ -220,7 +220,12 @@ impl TariffEngine { pub fn evaluate_context(&self, ctx: TariffContext) -> TariffExplanation { let dag = self.dag.read().expect("tariff DAG lock poisoned").clone(); let explanation = dag.evaluate(&ctx); - info!(meter_id = %ctx.meter_id, cost = explanation.total_cost, rules = explanation.applied_rules.len(), "tariff evaluated"); + info!( + meter_id = %ctx.meter_id, + cost = explanation.total_cost, + rules = explanation.applied_rules.len(), + "tariff evaluated" + ); explanation } @@ -249,6 +254,28 @@ impl TariffEngine { .map(|(ts, vol)| self.evaluate(*ts, *vol)) .sum() } + + pub async fn evaluate_and_finalize( + &self, + batch_id: &str, + resource_type: &str, + readings: &[(DateTime, f64)], + finalizer: &crate::settlement::finalizer::Finalizer, + mint_queue: &crate::settlement::mint_queue::MintQueue, + destination_wallet: &str, + ) -> Result> { + let total_cost = self.evaluate_batch(readings); + + // Enqueue the mint event + mint_queue + .enqueue(batch_id, resource_type, total_cost, destination_wallet) + .await?; + + // Trigger finalization + finalizer.finalize_mint(batch_id, resource_type).await?; + + Ok(total_cost) + } } pub fn global_tariff_engine() -> &'static TariffEngine { diff --git a/tests/settlement_tests.rs b/tests/settlement_tests.rs new file mode 100644 index 0000000..63c5fce --- /dev/null +++ b/tests/settlement_tests.rs @@ -0,0 +1,130 @@ +use sha2::Digest; +use std::sync::Arc; +use tokio::sync::{Barrier, Mutex}; +use utility_backend::settlement::finalizer::Finalizer; +use utility_backend::settlement::mint_queue::MintQueue; +use utility_backend::soroban::rpc::CircuitBreaker; + +#[tokio::test] +async fn test_concurrent_finalization_deduplication() { + let db_url = std::env::var("DATABASE_URL") + .unwrap_or_else(|_| "postgres://utility:utility_secret@localhost:5432/utility_test".into()); + + let pool = match sqlx::PgPool::connect(&db_url).await { + Ok(p) => p, + Err(_) => { + eprintln!("Skipping test: DATABASE_URL not available"); + return; + } + }; + + // Clean up + sqlx::query("DELETE FROM processed_mints") + .execute(&pool) + .await + .unwrap(); + sqlx::query("DELETE FROM pending_mints") + .execute(&pool) + .await + .unwrap(); + + let batch_id = "test-batch-123"; + let resource_type = "water"; + let destination = "GABC...123"; + + let mint_queue = MintQueue::new(pool.clone()); + mint_queue + .enqueue(batch_id, resource_type, 100.0, destination) + .await + .unwrap(); + + let breaker = Arc::new(Mutex::new(CircuitBreaker::new(5))); + let finalizer = Arc::new(Finalizer::new( + pool.clone(), + "http://invalid-rpc-url".into(), + breaker, + )); + + let num_threads = 5; + let barrier = Arc::new(Barrier::new(num_threads)); + let mut handles = vec![]; + + for _ in 0..num_threads { + let f = finalizer.clone(); + let b = barrier.clone(); + handles.push(tokio::spawn(async move { + b.wait().await; + f.finalize_mint("test-batch-123", "water").await + })); + } + + for h in handles { + let _ = h.await.unwrap(); + } + + let count: i64 = sqlx::query_scalar("SELECT count(*) FROM processed_mints") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count, 0); // Still 0 because RPC fails and rolls back +} + +#[tokio::test] +async fn test_aggregation_of_pending_mints() { + let db_url = std::env::var("DATABASE_URL") + .unwrap_or_else(|_| "postgres://utility:utility_secret@localhost:5432/utility_test".into()); + + let pool = match sqlx::PgPool::connect(&db_url).await { + Ok(p) => p, + Err(_) => { + eprintln!("Skipping test: DATABASE_URL not available"); + return; + } + }; + + sqlx::query("DELETE FROM processed_mints") + .execute(&pool) + .await + .unwrap(); + sqlx::query("DELETE FROM pending_mints") + .execute(&pool) + .await + .unwrap(); + + let batch_id = "batch-agg"; + let resource_type = "energy"; + let mint_queue = MintQueue::new(pool.clone()); + + mint_queue + .enqueue(batch_id, resource_type, 40.0, "dest1") + .await + .unwrap(); + mint_queue + .enqueue(batch_id, resource_type, 60.0, "dest1") + .await + .unwrap(); + + let breaker = Arc::new(Mutex::new(CircuitBreaker::new(5))); + let finalizer = Finalizer::new(pool.clone(), "http://invalid-rpc-url".into(), breaker); + + // This will fail at RPC but we want to verify it tried to aggregate + let _ = finalizer.finalize_mint(batch_id, resource_type).await; + + // Verify both are still there because of rollback + let pending = mint_queue.get_pending(batch_id).await.unwrap(); + assert_eq!(pending.len(), 2); +} + +#[tokio::test] +async fn test_idempotency_key_generation() { + let batch_id = "batch-1"; + let resource_type = "energy"; + + let mut hasher = sha2::Sha256::new(); + hasher.update(batch_id); + hasher.update(resource_type); + hasher.update("mint"); + let expected = hex::encode(hasher.finalize()); + + assert_eq!(expected.len(), 64); +}