Skip to content
4 changes: 3 additions & 1 deletion .github/workflows/backend-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ jobs:

- name: Run database migrations
run: |
PGPASSWORD=utility_secret psql -h localhost -U utility -d utility_test -f src/time_series/compress.sql 2>&1 || true
PGPASSWORD=utility_secret psql -h localhost -U utility -d utility_test -f src/time_series/compress.sql
PGPASSWORD=utility_secret psql -h localhost -U utility -d utility_test -f src/soroban/sync.sql
PGPASSWORD=utility_secret psql -h localhost -U utility -d utility_test -f src/settlement/schema.sql
env:
PGPASSWORD: utility_secret

Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
volumes:
- pgdata:/var/lib/postgresql/data
- ./src/time_series/compress.sql:/docker-entrypoint-initdb.d/01-compress.sql
- ./src/settlement/schema.sql:/docker-entrypoint-initdb.d/02-settlement.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U utility -d utility_backend"]
interval: 10s
Expand Down
49 changes: 42 additions & 7 deletions src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ use axum::{
use ed25519_dalek::VerifyingKey;
use hex;
use serde::{Deserialize, Serialize};

use crate::api::AppState;

use sqlx::{Pool, Postgres};
use std::sync::Arc;

use crate::api::metrics;
use crate::api::middleware::DynamicRateLimiter;
use crate::gateway::crypto::global_registry;
use crate::gateway::lock::{ActiveLock, AdvisoryLock};
use crate::soroban::sequencer::NonceSequencer;
use crate::tariffs::engine::{global_tariff_engine, TariffContext, TariffExplanation};
use crate::time_series::analytics::{global_engine, DiagnosticReport};
use crate::time_series::compression::CompressionStatus;
Expand Down Expand Up @@ -47,10 +49,8 @@ pub struct GridNonceStatus {
pub high_water_mark: u64,
}

pub async fn nonce_status(
State(sequencer): State<Arc<NonceSequencer>>,
) -> Json<Vec<GridNonceStatus>> {
let marks = sequencer.get_all_grid_high_water_marks();
pub async fn nonce_status(State(state): State<AppState>) -> Json<Vec<GridNonceStatus>> {
let marks = state.sequencer.get_all_grid_high_water_marks();
let statuses: Vec<GridNonceStatus> = marks
.into_iter()
.map(|(grid_id, hwm)| GridNonceStatus {
Expand Down Expand Up @@ -136,8 +136,43 @@ pub async fn submit_reading(
}
}

pub async fn settle_account(Json(_body): Json<SettlementRequest>) -> Json<&'static str> {
Json("settlement initiated")
pub async fn settle_account(
State(state): State<AppState>,
Json(body): Json<SettlementRequest>,
) -> Result<Json<&'static str>, StatusCode> {
let rpc_url =
std::env::var("SOROBAN_RPC_URL").unwrap_or_else(|_| "http://localhost:8000".into());
let finalizer = crate::settlement::finalizer::Finalizer::new(
state.pool.clone(),
rpc_url,
state.breaker.clone(),
);
let mint_queue = crate::settlement::mint_queue::MintQueue::new(state.pool);

// In a real scenario, we'd get readings from the database.
// Here we simulate a batch for the requested meter and a generic resource type (e.g. water).
let batch_id = format!("batch-{}", uuid::Uuid::new_v4());
let resource_type = "water"; // Example
let readings = vec![(chrono::Utc::now(), body.resource_units)];

let engine = crate::tariffs::engine::TariffEngine::new(vec![]); // Default tariff

engine
.evaluate_and_finalize(
&batch_id,
resource_type,
&readings,
&finalizer,
&mint_queue,
&body.destination_wallet,
)
.await
.map_err(|e| {
tracing::error!(error = %e, "settlement failed");
StatusCode::INTERNAL_SERVER_ERROR
})?;

Ok(Json("settlement completed"))
}

pub async fn get_diagnostics(
Expand Down
13 changes: 11 additions & 2 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::api::middleware::DynamicRateLimiter;
use crate::gateway::lock::AdvisoryLock;
use crate::soroban::rpc::CircuitBreaker;
use crate::soroban::sequencer::NonceSequencer;
use axum::extract::FromRef;
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use tokio::sync::Mutex;

pub mod alloc_tracker;
pub mod handlers;
Expand All @@ -14,8 +16,9 @@ pub mod router;
#[derive(Clone)]
pub struct AppState {
pub sequencer: Arc<NonceSequencer>,
pub db_pool: Pool<Postgres>,
pub pool: Pool<Postgres>,
pub advisory_lock: Arc<AdvisoryLock>,
pub breaker: Arc<Mutex<CircuitBreaker>>,
pub rate_limiter: Arc<DynamicRateLimiter>,
}

Expand All @@ -27,7 +30,7 @@ impl FromRef<AppState> for Arc<NonceSequencer> {

impl FromRef<AppState> for Pool<Postgres> {
fn from_ref(state: &AppState) -> Self {
state.db_pool.clone()
state.pool.clone()
}
}

Expand All @@ -37,6 +40,12 @@ impl FromRef<AppState> for Arc<AdvisoryLock> {
}
}

impl FromRef<AppState> for Arc<Mutex<CircuitBreaker>> {
fn from_ref(state: &AppState) -> Self {
state.breaker.clone()
}
}

impl FromRef<AppState> for Arc<DynamicRateLimiter> {
fn from_ref(state: &AppState) -> Self {
state.rate_limiter.clone()
Expand Down
2 changes: 1 addition & 1 deletion src/api/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use axum::{
use tower_http::cors::CorsLayer;

use super::handlers;
use crate::api::AppState;
use super::AppState;

pub async fn build_router(state: AppState) -> anyhow::Result<Router> {
let cors = CorsLayer::permissive();
Expand Down
6 changes: 5 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tracing_subscriber::EnvFilter;

use utility_backend::api::middleware::DynamicRateLimiter;
use utility_backend::api::AppState;
use utility_backend::gateway::lock::AdvisoryLock;
use utility_backend::soroban::rpc::CircuitBreaker;
use utility_backend::soroban::sequencer::NonceSequencer;
use utility_backend::time_series::compression::{
init_global_compression_manager, spawn_compression_monitor, CompressionPolicy,
Expand Down Expand Up @@ -52,11 +54,13 @@ async fn main() -> anyhow::Result<()> {
let _transport = spawn_transport_monitors(TcpTransportConfig::default());

let advisory_lock = Arc::new(AdvisoryLock::postgres(db_pool.clone()));
let breaker = Arc::new(Mutex::new(CircuitBreaker::new(5)));
let rate_limiter = DynamicRateLimiter::new();
let state = AppState {
sequencer,
db_pool,
pool: db_pool,
advisory_lock,
breaker,
rate_limiter,
};

Expand Down
113 changes: 113 additions & 0 deletions src/settlement/finalizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use crate::settlement::mint_queue::MintQueue;
use crate::soroban::rpc::CircuitBreaker;
use hex;
use sha2::{Digest, Sha256};
use sqlx::{PgPool, Postgres, Transaction};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{info, warn};

pub struct Finalizer {
pool: PgPool,
rpc_url: String,
mint_queue: MintQueue,
breaker: Arc<Mutex<CircuitBreaker>>,
}

impl Finalizer {
pub fn new(pool: PgPool, rpc_url: String, breaker: Arc<Mutex<CircuitBreaker>>) -> Self {
let mint_queue = MintQueue::new(pool.clone());
Self {
pool,
rpc_url,
mint_queue,
breaker,
}
}

pub async fn finalize_mint(
&self,
batch_id: &str,
resource_type: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// 1. Generate idempotency key: SHA256(batch_id || resource_type || 'mint')
let mut hasher = Sha256::new();
hasher.update(batch_id);
hasher.update(resource_type);
hasher.update("mint");
let idempotency_key = hex::encode(hasher.finalize());

// 2. Attempt to mark as processed using a transaction for atomicity and deduplication
let mut tx: Transaction<'_, Postgres> = self.pool.begin().await?;

let result = sqlx::query(
r#"
INSERT INTO processed_mints (batch_id, resource_type, idempotency_key)
VALUES ($1, $2, $3)
ON CONFLICT (batch_id, resource_type) DO NOTHING
RETURNING id
"#,
)
.bind(batch_id)
.bind(resource_type)
.bind(idempotency_key.clone())
.fetch_optional(&mut *tx)
.await?;

if result.is_none() {
info!(batch_id = %batch_id, resource_type = %resource_type, "mint already processed, skipping");
tx.rollback().await?;
return Ok(());
}

// 3. Get pending mints for this batch and resource type
let pending = self.mint_queue.get_pending(batch_id).await?;
let filtered_pending: Vec<_> = pending
.into_iter()
.filter(|e| e.resource_type == resource_type)
.collect();

if filtered_pending.is_empty() {
warn!(batch_id = %batch_id, resource_type = %resource_type, "no pending mints found for finalization");
tx.rollback().await?;
return Ok(());
}

// AGGREGATION: We must mint the TOTAL amount in ONE transaction to honor the single idempotency key.
let total_amount: f64 = filtered_pending.iter().map(|e| e.amount).sum();
let destination = filtered_pending[0].destination_wallet.clone(); // Assume same destination for batch/resource

// 4. Submit to Soroban RPC
let payload = serde_json::json!({
"jsonrpc": "2.0",
"id": format!("mint-{}-{}", batch_id, resource_type),
"method": "sendTransaction",
"params": {
"batch_id": batch_id,
"resource_type": resource_type,
"amount": total_amount,
"destination": destination,
"idempotency_key": idempotency_key
}
});

let mut breaker = self.breaker.lock().await;
match breaker.call_rpc(&self.rpc_url, payload).await {
Ok(_) => {
info!(batch_id = %batch_id, "soroban aggregated mint transaction submitted");
for event in filtered_pending {
self.mint_queue.remove_event(event.id).await?;
}
}
Err(e) => {
warn!(batch_id = %batch_id, error = %e, "soroban aggregated mint transaction failed");
tx.rollback().await?;
return Err(e.into());
}
}

tx.commit().await?;
info!(batch_id = %batch_id, resource_type = %resource_type, "mint finalization complete");
Ok(())
}
}
73 changes: 73 additions & 0 deletions src/settlement/mint_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::{FromRow, PgPool};
use uuid::Uuid;

#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct MintEvent {
pub id: Uuid,
pub batch_id: String,
pub resource_type: String,
pub amount: f64,
pub destination_wallet: String,
pub created_at: Option<DateTime<Utc>>,
}

pub struct MintQueue {
pool: PgPool,
}

impl MintQueue {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}

pub async fn enqueue(
&self,
batch_id: &str,
resource_type: &str,
amount: f64,
destination_wallet: &str,
) -> Result<Uuid, sqlx::Error> {
let id = Uuid::new_v4();
sqlx::query(
r#"
INSERT INTO pending_mints (id, batch_id, resource_type, amount, destination_wallet)
VALUES ($1, $2, $3, $4, $5)
"#,
)
.bind(id)
.bind(batch_id)
.bind(resource_type)
.bind(amount)
.bind(destination_wallet)
.execute(&self.pool)
.await?;

Ok(id)
}

pub async fn get_pending(&self, batch_id: &str) -> Result<Vec<MintEvent>, sqlx::Error> {
let events = sqlx::query_as::<_, MintEvent>(
r#"
SELECT id, batch_id, resource_type, amount, destination_wallet, created_at
FROM pending_mints
WHERE batch_id = $1
"#,
)
.bind(batch_id)
.fetch_all(&self.pool)
.await?;

Ok(events)
}

pub async fn remove_event(&self, id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query("DELETE FROM pending_mints WHERE id = $1")
.bind(id)
.execute(&self.pool)
.await?;

Ok(())
}
}
2 changes: 2 additions & 0 deletions src/settlement/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub mod attestation_verifier;
pub mod credit_flow;
pub mod executor;
pub mod finalizer;
pub mod merkle;
pub mod messages;
pub mod mint_queue;
pub mod queue;
pub mod shard_router;
pub mod submitter;
21 changes: 21 additions & 0 deletions src/settlement/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Settlement schema for resource token minting

CREATE TABLE IF NOT EXISTS pending_mints (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
batch_id TEXT NOT NULL,
resource_type TEXT NOT NULL,
amount DOUBLE PRECISION NOT NULL,
destination_wallet TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE IF NOT EXISTS processed_mints (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
batch_id TEXT NOT NULL,
resource_type TEXT NOT NULL,
idempotency_key TEXT NOT NULL UNIQUE,
processed_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
UNIQUE(batch_id, resource_type)
);

CREATE INDEX IF NOT EXISTS idx_pending_mints_batch_id ON pending_mints(batch_id);
Loading
Loading