diff --git a/contracts/Cargo.toml b/contracts/Cargo.toml index 7656399..f161b0d 100644 --- a/contracts/Cargo.toml +++ b/contracts/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = ["utility_contracts", "price_oracle", "resource-token", "common", "settlement"] +members = ["utility_contracts", "price_oracle", "resource-token", "common", "settlement", "meter-aggregator"] [workspace.dependencies] soroban-sdk = "23.2.4" diff --git a/contracts/meter-aggregator/Cargo.toml b/contracts/meter-aggregator/Cargo.toml new file mode 100644 index 0000000..bddff05 --- /dev/null +++ b/contracts/meter-aggregator/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "meter-aggregator" +version = "0.0.0" +edition = "2021" +publish = false + +[lib] +crate-type = ["lib", "cdylib"] +doctest = false + +[dependencies] +soroban-sdk = { workspace = true } + +[dev-dependencies] +soroban-sdk = { workspace = true, features = ["testutils"] } diff --git a/contracts/meter-aggregator/README.md b/contracts/meter-aggregator/README.md new file mode 100644 index 0000000..ea2abe2 --- /dev/null +++ b/contracts/meter-aggregator/README.md @@ -0,0 +1,68 @@ +# meter-aggregator + +Per-device meter reading aggregation with **bounded storage**. + +## Why + +Appending every raw meter reading to an unbounded per-device vector exhausts +Soroban contract storage. At one reading every ~5 seconds (~17,280/day) a naive +design overruns the contract storage budget within hours, after which all further +readings *and* settlements for that device fail — a cheap denial of service. + +This contract keeps live storage bounded regardless of device lifetime or +submission frequency. + +## How + +- **Raw readings** are stored under individual keys `RawReading(device, seq)` + with a monotonically increasing sequence number (O(1) append; seq order == time + order). +- On every submission the value is folded into the matching **hourly** and + **daily** rollup buckets using overflow-checked `i128` addition. +- Raw readings older than `MAX_RAW_RETENTION_SECS` (7 days) are pruned **inline**, + amortized to O(1) per submission via a watermark cursor `PruneCursor(device)`, + deleting at most `PRUNE_BATCH_SIZE` (10) entries per call so a backlog drains + over several submissions instead of blowing one call's instruction budget. +- Long-term volume lives in compact rollup buckets, read by + `get_aggregated_volume` which prefers **daily → hourly → raw** in that order. +- `rollup_day` consolidates a completed day by reclaiming its now-redundant + hourly buckets (the daily total is maintained incrementally), keeping + hourly-bucket growth bounded too. + +## Public API + +| fn | description | +|----|-------------| +| `initialize(admin)` | one-time admin setup | +| `submit_reading(device, source, value) -> seq` | store + rollup + inline prune | +| `prune(device) -> u32` | manual batch prune (callable by anyone) | +| `rollup_day(device, day_epoch) -> i128` | admin; reclaim a day's hourly buckets | +| `get_aggregated_volume(device, from_ts, to_ts) -> i128` | tiered windowed total | +| `get_hourly_bucket` / `get_daily_bucket` / `get_raw_reading` | views | +| `get_prune_cursor` / `get_reading_count` / `get_live_reading_count` | views | + +## Constants + +| name | value | meaning | +|------|-------|---------| +| `MAX_RAW_RETENTION_SECS` | `604_800` | 7-day raw retention window | +| `PRUNE_BATCH_SIZE` | `10` | max deletions per call | +| `ROLLUP_INTERVAL_SECS` | `3_600` | hourly bucket width | +| `SECONDS_PER_DAY` | `86_400` | daily bucket width | +| `FIXED_POINT_SCALE` | `10_000_000` | 7-decimal fixed point | + +## Limitation + +Sub-day query resolution is retained only while hourly buckets exist. After a day +is consolidated via `rollup_day` (and its raw readings pruned), that day is +queryable at day granularity only. Full-day and multi-day windows remain exact. + +## Test + +```sh +cargo test --package meter-aggregator +``` + +Covers: hourly/daily rollup correctness, daily fast-path reads, `rollup_day` +reclamation, overflow rejection, negative-value rejection, the pruning retention +boundary, batch-size limiting, and end-to-end storage-exhaustion prevention. diff --git a/contracts/meter-aggregator/src/constants.rs b/contracts/meter-aggregator/src/constants.rs new file mode 100644 index 0000000..88ab8cf --- /dev/null +++ b/contracts/meter-aggregator/src/constants.rs @@ -0,0 +1,38 @@ +//! Tunable bounds for raw-reading retention, pruning, and time-windowed rollups. +//! +//! These constants encode the invariants from the storage-exhaustion mitigation: +//! raw readings are short-lived audit records that get rolled up into compact +//! hourly/daily buckets and then pruned, keeping per-device storage bounded +//! regardless of how long a device runs or how fast it submits. + +/// How long raw readings are retained before they become eligible for pruning. +/// 7 days = `7 * 86_400` seconds. After this window the data lives only in the +/// rolled-up hourly/daily buckets. +pub const MAX_RAW_RETENTION_SECS: u64 = 604_800; + +/// Maximum number of stale raw readings deleted during a single submission. +/// +/// Pruning is amortized across submissions: each `submit_reading` call deletes +/// at most this many entries, so a backlog of stale readings is drained over +/// several calls instead of blowing the instruction budget of one call. Each +/// deletion costs on the order of a few thousand instructions, so a small batch +/// keeps the per-call cost predictable. +pub const PRUNE_BATCH_SIZE: u32 = 10; + +/// Number of seconds covered by one hourly rollup bucket (1 hour). +pub const ROLLUP_INTERVAL_SECS: u64 = 3_600; + +/// Number of seconds in one day, used to derive daily bucket epochs. +pub const SECONDS_PER_DAY: u64 = 86_400; + +/// Number of hourly buckets that make up one day. +pub const HOURS_PER_DAY: u64 = SECONDS_PER_DAY / ROLLUP_INTERVAL_SECS; + +/// Fixed-point scale for reported volumes: 7 decimal places (1.0 == 10_000_000). +/// Values are summed as raw scaled `i128` integers; this constant documents the +/// interpretation and is exposed for clients that need to format volumes. +pub const FIXED_POINT_SCALE: i128 = 10_000_000; + +/// TTL bump (in ledgers) applied to long-lived rollup/bookkeeping entries so the +/// aggregated history is not archived out from under an active device. +pub const BUCKET_TTL_LEDGERS: u32 = 30 * 17_280; // ~30 days at ~5s ledgers diff --git a/contracts/meter-aggregator/src/lib.rs b/contracts/meter-aggregator/src/lib.rs new file mode 100644 index 0000000..9513179 --- /dev/null +++ b/contracts/meter-aggregator/src/lib.rs @@ -0,0 +1,292 @@ +#![no_std] +//! # Meter Aggregator +//! +//! Collects raw meter readings per device and keeps per-device storage **bounded** +//! regardless of device lifetime or submission frequency. +//! +//! ## The problem this solves +//! +//! Appending every raw reading to an ever-growing per-device vector exhausts +//! Soroban contract storage. At one reading every ~5s (≈17,280/day) a naive +//! design blows the contract storage budget within hours, after which **all** +//! further readings and settlements for the device fail — a cheap denial of +//! service. +//! +//! ## The mechanism +//! +//! * Each raw reading is stored under its own key `RawReading(device, seq)` with +//! a monotonically increasing sequence number, so submissions are O(1) and seq +//! order matches time order. +//! * On every submission the value is folded into the matching **hourly** and +//! **daily** rollup buckets (`checked_add`, overflow-safe). +//! * Raw readings older than [`constants::MAX_RAW_RETENTION_SECS`] are pruned +//! **inline**, amortized to O(1) per submission via a watermark cursor +//! (`PruneCursor(device)`), deleting at most [`constants::PRUNE_BATCH_SIZE`] +//! entries per call so a backlog drains over several submissions. +//! * Long-term volume lives in the compact rollup buckets, queried by +//! [`MeterAggregator::get_aggregated_volume`] which reads daily → hourly → raw +//! in that order of preference. +//! +//! Net effect: live raw storage is capped at roughly one retention window of +//! readings; everything older is represented by a handful of bytes per +//! hour/day. + +use soroban_sdk::{contract, contractimpl, panic_with_error, Address, Env}; + +pub mod constants; +pub mod storage; +pub mod types; + +#[cfg(test)] +mod test; + +use constants::{ + HOURS_PER_DAY, MAX_RAW_RETENTION_SECS, PRUNE_BATCH_SIZE, ROLLUP_INTERVAL_SECS, SECONDS_PER_DAY, +}; +use types::{DailyBucket, Error, HourlyBucket, RawReading}; + +/// Overflow-checked `i128` addition that traps with [`Error::Overflow`]. +fn checked_add(env: &Env, a: i128, b: i128) -> i128 { + match a.checked_add(b) { + Some(v) => v, + None => panic_with_error!(env, Error::Overflow), + } +} + +fn require_admin(env: &Env) -> Address { + match storage::get_admin(env) { + Some(a) => a, + None => panic_with_error!(env, Error::NotInitialized), + } +} + +#[contract] +pub struct MeterAggregator; + +#[contractimpl] +impl MeterAggregator { + /// Initialize the contract with an admin. Callable once. + pub fn initialize(env: Env, admin: Address) { + if storage::get_admin(&env).is_some() { + panic_with_error!(&env, Error::AlreadyInitialized); + } + admin.require_auth(); + storage::set_admin(&env, &admin); + } + + /// Submit a raw meter reading for `device`, signed by `source`. + /// + /// Stores the raw reading, folds it into the hourly/daily rollups, then + /// prunes up to [`PRUNE_BATCH_SIZE`] stale readings. Returns the sequence + /// number assigned to the reading. + pub fn submit_reading(env: Env, device: Address, source: Address, value: i128) -> u64 { + // Must be initialized (guarantees an admin exists for privileged ops). + require_admin(&env); + + if value < 0 { + panic_with_error!(&env, Error::NegativeValue); + } + + source.require_auth(); + + let ts = env.ledger().timestamp(); + let seq = storage::get_next_seq(&env, &device); + + let reading = RawReading { + timestamp: ts, + value, + source: source.clone(), + }; + storage::set_raw_reading(&env, &device, seq, &reading); + storage::set_next_seq(&env, &device, seq + 1); + + rollup_raw_to_hourly(&env, &device, ts, value); + + prune_stale_readings(&env, &device); + + seq + } + + /// Maintenance entry point: prune up to [`PRUNE_BATCH_SIZE`] stale raw + /// readings for `device`. Callable by anyone (purely deterministic cleanup). + /// Returns the number of readings pruned by this call. + pub fn prune(env: Env, device: Address) -> u32 { + prune_stale_readings(&env, &device) + } + + /// Consolidate a completed day's hourly buckets for `device`, reclaiming + /// their storage. + /// + /// The daily bucket is maintained incrementally on each submission, so this + /// only deletes the now-redundant hourly buckets for `day_epoch` to keep + /// hourly-bucket growth bounded over the device's lifetime. Idempotent. + /// Admin only. Returns the day's total volume. + pub fn rollup_day(env: Env, device: Address, day_epoch: u64) -> i128 { + let admin = require_admin(&env); + admin.require_auth(); + + let start_hour = day_epoch * HOURS_PER_DAY; + let mut i = 0u64; + while i < HOURS_PER_DAY { + storage::remove_hourly_bucket(&env, &device, start_hour + i); + i += 1; + } + + storage::get_daily_bucket(&env, &device, day_epoch) + .map(|d| d.total) + .unwrap_or(0) + } + + /// Total volume for `device` over the inclusive hour window covering + /// `[from_ts, to_ts]`. + /// + /// Evaluated at hour-bucket granularity. Reads are tiered for efficiency and + /// correctness: a fully-covered day uses its `DailyBucket`; otherwise each + /// hour uses its `HourlyBucket`; if a bucket is missing (e.g. not yet rolled + /// up) the live raw readings for that hour are summed as a fallback. + pub fn get_aggregated_volume(env: Env, device: Address, from_ts: u64, to_ts: u64) -> i128 { + if from_ts > to_ts { + panic_with_error!(&env, Error::InvalidTimeRange); + } + + let from_hour = from_ts / ROLLUP_INTERVAL_SECS; + let to_hour = to_ts / ROLLUP_INTERVAL_SECS; + + let mut total: i128 = 0; + let mut h = from_hour; + while h <= to_hour { + let day = h / HOURS_PER_DAY; + let day_start_hour = day * HOURS_PER_DAY; + let day_end_hour = day_start_hour + HOURS_PER_DAY - 1; + + // Fast path: the whole day fits inside the window — use the daily bucket. + if h == day_start_hour && day_end_hour <= to_hour { + if let Some(d) = storage::get_daily_bucket(&env, &device, day) { + total = checked_add(&env, total, d.total); + h = day_end_hour + 1; + continue; + } + } + + // Hour tier, falling back to the live raw readings for the hour. + match storage::get_hourly_bucket(&env, &device, h) { + Some(b) => total = checked_add(&env, total, b.total), + None => total = checked_add(&env, total, sum_raw_for_hour(&env, &device, h)), + } + h += 1; + } + + total + } + + // --- View accessors -------------------------------------------------- + + pub fn get_admin(env: Env) -> Option
{ + storage::get_admin(&env) + } + + pub fn get_hourly_bucket(env: Env, device: Address, hour_epoch: u64) -> Option { + storage::get_hourly_bucket(&env, &device, hour_epoch) + } + + pub fn get_daily_bucket(env: Env, device: Address, day_epoch: u64) -> Option { + storage::get_daily_bucket(&env, &device, day_epoch) + } + + pub fn get_raw_reading(env: Env, device: Address, seq: u64) -> Option { + storage::get_raw_reading(&env, &device, seq) + } + + /// The pruning watermark: the next sequence number that will be examined. + pub fn get_prune_cursor(env: Env, device: Address) -> u64 { + storage::get_prune_cursor(&env, &device) + } + + /// Total number of raw readings ever submitted for `device` (next seq). + pub fn get_reading_count(env: Env, device: Address) -> u64 { + storage::get_next_seq(&env, &device) + } + + /// Number of raw readings still live in storage (submitted minus pruned). + pub fn get_live_reading_count(env: Env, device: Address) -> u64 { + storage::get_next_seq(&env, &device) - storage::get_prune_cursor(&env, &device) + } +} + +/// Fold a single reading into its hourly and daily rollup buckets. +fn rollup_raw_to_hourly(env: &Env, device: &Address, ts: u64, value: i128) { + let hour = ts / ROLLUP_INTERVAL_SECS; + let mut hb = storage::get_hourly_bucket(env, device, hour).unwrap_or(HourlyBucket { + hour_epoch: hour, + total: 0, + count: 0, + }); + hb.total = checked_add(env, hb.total, value); + hb.count += 1; + storage::set_hourly_bucket(env, device, hour, &hb); + + let day = ts / SECONDS_PER_DAY; + let mut db = storage::get_daily_bucket(env, device, day).unwrap_or(DailyBucket { + day_epoch: day, + total: 0, + count: 0, + }); + db.total = checked_add(env, db.total, value); + db.count += 1; + storage::set_daily_bucket(env, device, day, &db); +} + +/// Prune stale raw readings using the watermark cursor. +/// +/// Readings are stored in sequence (= time) order, so we advance the cursor from +/// the oldest unexamined sequence number, deleting readings whose age exceeds +/// the retention window, and stop at the first still-fresh reading. At most +/// [`PRUNE_BATCH_SIZE`] readings are deleted per call. +fn prune_stale_readings(env: &Env, device: &Address) -> u32 { + let now = env.ledger().timestamp(); + // A reading is stale when `now - timestamp > retention`, i.e. `timestamp < cutoff`. + // A reading exactly `retention` seconds old (timestamp == cutoff) is kept. + let cutoff = now.saturating_sub(MAX_RAW_RETENTION_SECS); + + let next = storage::get_next_seq(env, device); + let mut cursor = storage::get_prune_cursor(env, device); + let mut pruned: u32 = 0; + + while cursor < next && pruned < PRUNE_BATCH_SIZE { + match storage::get_raw_reading(env, device, cursor) { + Some(r) => { + if r.timestamp < cutoff { + storage::remove_raw_reading(env, device, cursor); + cursor += 1; + pruned += 1; + } else { + // First fresh reading in time order — nothing older remains. + break; + } + } + // Already-removed gap: skip without counting against the batch. + None => cursor += 1, + } + } + + storage::set_prune_cursor(env, device, cursor); + pruned +} + +/// Sum live raw readings whose timestamp falls in `hour_epoch`. Fallback used by +/// [`MeterAggregator::get_aggregated_volume`] when a bucket is absent; bounded by +/// the live (un-pruned) sequence range. +fn sum_raw_for_hour(env: &Env, device: &Address, hour_epoch: u64) -> i128 { + let next = storage::get_next_seq(env, device); + let mut seq = storage::get_prune_cursor(env, device); + let mut sum: i128 = 0; + while seq < next { + if let Some(r) = storage::get_raw_reading(env, device, seq) { + if r.timestamp / ROLLUP_INTERVAL_SECS == hour_epoch { + sum = checked_add(env, sum, r.value); + } + } + seq += 1; + } + sum +} diff --git a/contracts/meter-aggregator/src/storage.rs b/contracts/meter-aggregator/src/storage.rs new file mode 100644 index 0000000..9c1403f --- /dev/null +++ b/contracts/meter-aggregator/src/storage.rs @@ -0,0 +1,138 @@ +//! Storage key definitions and typed accessors. +//! +//! Keys are namespaced (prefixed with `"MTAG"`) and XDR-encoded into `Bytes`, +//! matching the convention used by the other contracts in this workspace +//! (see `resource-token` / `price_oracle`). Namespacing prevents collisions if +//! this contract is ever co-deployed or migrated alongside others. + +use soroban_sdk::xdr::ToXdr; +use soroban_sdk::{contracttype, Address, Bytes, Env}; + +use crate::constants::BUCKET_TTL_LEDGERS; +use crate::types::{DailyBucket, HourlyBucket, RawReading}; + +/// Namespace prefix: "MTAG". +pub const NAMESPACE_PREFIX: [u8; 4] = [0x4d, 0x54, 0x41, 0x47]; + +#[derive(Clone)] +#[contracttype] +pub enum DataKey { + /// Admin address with privileged operations. + Admin, + /// Next raw-reading sequence number to assign for a device. + ReadingSeq(Address), + /// A raw reading for a device at a given sequence number. + RawReading(Address, u64), + /// Pruning watermark: the next sequence number to examine for a device. + PruneCursor(Address), + /// Aggregated hourly bucket: (device, hour_epoch). + HourlyBucket(Address, u64), + /// Aggregated daily bucket: (device, day_epoch). + DailyBucket(Address, u64), +} + +impl DataKey { + /// Encode the key with the contract namespace prefix. + pub fn encode(&self, env: &Env) -> Bytes { + let mut key = Bytes::new(env); + key.append(&Bytes::from_array(env, &NAMESPACE_PREFIX)); + key.append(&self.clone().to_xdr(env)); + key + } +} + +// --- Admin --------------------------------------------------------------- + +pub fn get_admin(env: &Env) -> Option
{ + let key = DataKey::Admin.encode(env); + env.storage().instance().get(&key) +} + +pub fn set_admin(env: &Env, admin: &Address) { + let key = DataKey::Admin.encode(env); + env.storage().instance().set(&key, admin); +} + +// --- Sequence counter ---------------------------------------------------- + +/// The next sequence number that will be assigned to a device's raw reading. +pub fn get_next_seq(env: &Env, device: &Address) -> u64 { + let key = DataKey::ReadingSeq(device.clone()).encode(env); + env.storage().persistent().get(&key).unwrap_or(0) +} + +pub fn set_next_seq(env: &Env, device: &Address, seq: u64) { + let key = DataKey::ReadingSeq(device.clone()).encode(env); + env.storage().persistent().set(&key, &seq); + env.storage() + .persistent() + .extend_ttl(&key, BUCKET_TTL_LEDGERS, BUCKET_TTL_LEDGERS); +} + +// --- Raw readings -------------------------------------------------------- + +pub fn get_raw_reading(env: &Env, device: &Address, seq: u64) -> Option { + let key = DataKey::RawReading(device.clone(), seq).encode(env); + env.storage().persistent().get(&key) +} + +pub fn set_raw_reading(env: &Env, device: &Address, seq: u64, reading: &RawReading) { + let key = DataKey::RawReading(device.clone(), seq).encode(env); + env.storage().persistent().set(&key, reading); +} + +pub fn remove_raw_reading(env: &Env, device: &Address, seq: u64) { + let key = DataKey::RawReading(device.clone(), seq).encode(env); + env.storage().persistent().remove(&key); +} + +// --- Prune cursor -------------------------------------------------------- + +/// The next sequence number to examine when pruning a device's stale readings. +pub fn get_prune_cursor(env: &Env, device: &Address) -> u64 { + let key = DataKey::PruneCursor(device.clone()).encode(env); + env.storage().persistent().get(&key).unwrap_or(0) +} + +pub fn set_prune_cursor(env: &Env, device: &Address, cursor: u64) { + let key = DataKey::PruneCursor(device.clone()).encode(env); + env.storage().persistent().set(&key, &cursor); + env.storage() + .persistent() + .extend_ttl(&key, BUCKET_TTL_LEDGERS, BUCKET_TTL_LEDGERS); +} + +// --- Hourly buckets ------------------------------------------------------ + +pub fn get_hourly_bucket(env: &Env, device: &Address, hour_epoch: u64) -> Option { + let key = DataKey::HourlyBucket(device.clone(), hour_epoch).encode(env); + env.storage().persistent().get(&key) +} + +pub fn set_hourly_bucket(env: &Env, device: &Address, hour_epoch: u64, bucket: &HourlyBucket) { + let key = DataKey::HourlyBucket(device.clone(), hour_epoch).encode(env); + env.storage().persistent().set(&key, bucket); + env.storage() + .persistent() + .extend_ttl(&key, BUCKET_TTL_LEDGERS, BUCKET_TTL_LEDGERS); +} + +pub fn remove_hourly_bucket(env: &Env, device: &Address, hour_epoch: u64) { + let key = DataKey::HourlyBucket(device.clone(), hour_epoch).encode(env); + env.storage().persistent().remove(&key); +} + +// --- Daily buckets ------------------------------------------------------- + +pub fn get_daily_bucket(env: &Env, device: &Address, day_epoch: u64) -> Option { + let key = DataKey::DailyBucket(device.clone(), day_epoch).encode(env); + env.storage().persistent().get(&key) +} + +pub fn set_daily_bucket(env: &Env, device: &Address, day_epoch: u64, bucket: &DailyBucket) { + let key = DataKey::DailyBucket(device.clone(), day_epoch).encode(env); + env.storage().persistent().set(&key, bucket); + env.storage() + .persistent() + .extend_ttl(&key, BUCKET_TTL_LEDGERS, BUCKET_TTL_LEDGERS); +} diff --git a/contracts/meter-aggregator/src/test.rs b/contracts/meter-aggregator/src/test.rs new file mode 100644 index 0000000..18e1f95 --- /dev/null +++ b/contracts/meter-aggregator/src/test.rs @@ -0,0 +1,259 @@ +#![cfg(test)] + +use soroban_sdk::testutils::{Address as _, Ledger}; +use soroban_sdk::{Address, Env}; + +use crate::constants::{MAX_RAW_RETENTION_SECS, PRUNE_BATCH_SIZE, ROLLUP_INTERVAL_SECS}; +use crate::{MeterAggregator, MeterAggregatorClient}; + +fn setup(env: &Env) -> (MeterAggregatorClient<'_>, Address, Address) { + env.mock_all_auths(); + let contract_id = env.register(MeterAggregator, ()); + let client = MeterAggregatorClient::new(env, &contract_id); + let admin = Address::generate(env); + let device = Address::generate(env); + client.initialize(&admin); + (client, device, admin) +} + +#[test] +fn test_initialize_and_admin() { + let env = Env::default(); + let (client, _device, admin) = setup(&env); + assert_eq!(client.get_admin(), Some(admin)); +} + +#[test] +#[should_panic] +fn test_double_initialize_panics() { + let env = Env::default(); + let (client, _device, _admin) = setup(&env); + let other = Address::generate(&env); + client.initialize(&other); +} + +// --- Hour rollup correctness -------------------------------------------- + +#[test] +fn test_hour_rollup_correctness() { + let env = Env::default(); + let (client, device, _admin) = setup(&env); + let source = Address::generate(&env); + + let hour0 = 100u64; + let base = hour0 * ROLLUP_INTERVAL_SECS; + + // Three readings inside hour0. + for (i, v) in [10i128, 20, 30].iter().enumerate() { + env.ledger().set_timestamp(base + (i as u64) * 10); + client.submit_reading(&device, &source, v); + } + // One reading inside hour1. + env.ledger().set_timestamp((hour0 + 1) * ROLLUP_INTERVAL_SECS + 5); + client.submit_reading(&device, &source, &5); + + let b0 = client.get_hourly_bucket(&device, &hour0).unwrap(); + assert_eq!(b0.total, 60); + assert_eq!(b0.count, 3); + + let b1 = client.get_hourly_bucket(&device, &(hour0 + 1)).unwrap(); + assert_eq!(b1.total, 5); + assert_eq!(b1.count, 1); + + // Aggregate over both hours. + let from_ts = base; + let to_ts = (hour0 + 1) * ROLLUP_INTERVAL_SECS + ROLLUP_INTERVAL_SECS - 1; + assert_eq!(client.get_aggregated_volume(&device, &from_ts, &to_ts), 65); +} + +#[test] +fn test_daily_bucket_fast_path() { + let env = Env::default(); + let (client, device, _admin) = setup(&env); + let source = Address::generate(&env); + + // Day 10 in epoch days. + let day = 10u64; + let day_start = day * 86_400; + + // Two readings on day 10, in different hours. + env.ledger().set_timestamp(day_start + 100); + client.submit_reading(&device, &source, &7); + env.ledger().set_timestamp(day_start + 3_700); + client.submit_reading(&device, &source, &3); + + let d = client.get_daily_bucket(&device, &day).unwrap(); + assert_eq!(d.total, 10); + assert_eq!(d.count, 2); + + // Querying the full day exercises the daily fast path. + let vol = client.get_aggregated_volume(&device, &day_start, &(day_start + 86_399)); + assert_eq!(vol, 10); +} + +#[test] +fn test_rollup_day_reclaims_hourly_but_keeps_total() { + let env = Env::default(); + let (client, device, _admin) = setup(&env); + let source = Address::generate(&env); + + let day = 20u64; + let day_start = day * 86_400; + + env.ledger().set_timestamp(day_start + 10); + client.submit_reading(&device, &source, &40); + env.ledger().set_timestamp(day_start + 7_300); + client.submit_reading(&device, &source, &60); + + let hour_a = (day_start + 10) / ROLLUP_INTERVAL_SECS; + assert!(client.get_hourly_bucket(&device, &hour_a).is_some()); + + // Consolidate the day: hourly buckets are reclaimed, daily total persists. + let total = client.rollup_day(&device, &day); + assert_eq!(total, 100); + assert!(client.get_hourly_bucket(&device, &hour_a).is_none()); + assert!(client.get_daily_bucket(&device, &day).is_some()); + + // Full-day aggregation still correct via the daily bucket. + let vol = client.get_aggregated_volume(&device, &day_start, &(day_start + 86_399)); + assert_eq!(vol, 100); +} + +// --- Overflow aggregation ----------------------------------------------- + +#[test] +fn test_overflow_aggregation_is_rejected() { + let env = Env::default(); + let (client, device, _admin) = setup(&env); + let source = Address::generate(&env); + + let base = 500u64 * ROLLUP_INTERVAL_SECS; + + // First reading saturates the hourly bucket near i128::MAX. + env.ledger().set_timestamp(base); + client.submit_reading(&device, &source, &i128::MAX); + + // Second reading in the same hour overflows the checked_add and must trap. + env.ledger().set_timestamp(base + 10); + let res = client.try_submit_reading(&device, &source, &1); + assert!(res.is_err(), "overflowing aggregation should be rejected"); + + // State unchanged: the saturating reading is still the only one folded in. + let hour = base / ROLLUP_INTERVAL_SECS; + let b = client.get_hourly_bucket(&device, &hour).unwrap(); + assert_eq!(b.total, i128::MAX); + assert_eq!(b.count, 1); +} + +#[test] +fn test_negative_value_rejected() { + let env = Env::default(); + let (client, device, _admin) = setup(&env); + let source = Address::generate(&env); + env.ledger().set_timestamp(1_000); + let res = client.try_submit_reading(&device, &source, &-1); + assert!(res.is_err()); +} + +// --- Pruning boundary ---------------------------------------------------- + +#[test] +fn test_prune_retention_boundary() { + let env = Env::default(); + let (client, device, _admin) = setup(&env); + let source = Address::generate(&env); + + let t: u64 = 2_000_000; + env.ledger().set_timestamp(t); + client.submit_reading(&device, &source, &42); + assert_eq!(client.get_live_reading_count(&device), 1); + + // Exactly at the retention boundary: age == retention -> kept. + env.ledger().set_timestamp(t + MAX_RAW_RETENTION_SECS); + let pruned = client.prune(&device); + assert_eq!(pruned, 0); + assert_eq!(client.get_live_reading_count(&device), 1); + + // One second past the boundary: age == retention + 1 -> pruned. + env.ledger().set_timestamp(t + MAX_RAW_RETENTION_SECS + 1); + let pruned = client.prune(&device); + assert_eq!(pruned, 1); + assert_eq!(client.get_live_reading_count(&device), 0); + + // Aggregated history survives pruning of the raw reading. + let hour = t / ROLLUP_INTERVAL_SECS; + assert_eq!(client.get_hourly_bucket(&device, &hour).unwrap().total, 42); +} + +#[test] +fn test_prune_batch_size_limit() { + let env = Env::default(); + let (client, device, _admin) = setup(&env); + let source = Address::generate(&env); + + let t: u64 = 3_000_000; + // Submit (PRUNE_BATCH_SIZE * 2) readings, all in distinct seconds. + let n = (PRUNE_BATCH_SIZE * 2) as u64; + for i in 0..n { + env.ledger().set_timestamp(t + i); + client.submit_reading(&device, &source, &1); + } + assert_eq!(client.get_live_reading_count(&device), n); + + // Age everything past retention, then prune once: at most batch-size deleted. + env.ledger().set_timestamp(t + MAX_RAW_RETENTION_SECS + n + 1); + let pruned = client.prune(&device); + assert_eq!(pruned, PRUNE_BATCH_SIZE); + assert_eq!( + client.get_live_reading_count(&device), + n - PRUNE_BATCH_SIZE as u64 + ); + + // A second prune drains the remainder. + let pruned = client.prune(&device); + assert_eq!(pruned, PRUNE_BATCH_SIZE); + assert_eq!(client.get_live_reading_count(&device), 0); +} + +// --- Storage exhaustion prevention -------------------------------------- + +#[test] +fn test_storage_exhaustion_prevention() { + let env = Env::default(); + let (client, device, _admin) = setup(&env); + let source = Address::generate(&env); + + let t0: u64 = 10_000_000; + + // Backlog: 100 readings inside one retention window. + let backlog = 100u64; + for i in 0..backlog { + env.ledger().set_timestamp(t0 + i); + client.submit_reading(&device, &source, &1); + } + assert_eq!(client.get_live_reading_count(&device), backlog); + + // Jump well past retention so the whole backlog is stale, then keep + // submitting. Inline pruning (batch 10/call) drains the backlog: after + // `backlog / batch` submissions the live set collapses to just the fresh + // readings, proving storage stays bounded instead of growing without limit. + let jump = t0 + MAX_RAW_RETENTION_SECS + 1_000_000; + let calls = backlog / PRUNE_BATCH_SIZE as u64; // 10 submissions + for i in 0..calls { + env.ledger().set_timestamp(jump + i); + client.submit_reading(&device, &source, &1); + } + + // 100 stale pruned (10 calls * 10), 10 fresh added -> 10 live. + let live = client.get_live_reading_count(&device); + assert_eq!(live, calls); + assert!(live < backlog, "live storage must shrink, not grow unbounded"); +} + +#[test] +fn test_invalid_time_range_rejected() { + let env = Env::default(); + let (client, device, _admin) = setup(&env); + let res = client.try_get_aggregated_volume(&device, &100, &50); + assert!(res.is_err()); +} diff --git a/contracts/meter-aggregator/src/types.rs b/contracts/meter-aggregator/src/types.rs new file mode 100644 index 0000000..52a65da --- /dev/null +++ b/contracts/meter-aggregator/src/types.rs @@ -0,0 +1,64 @@ +//! Value types stored by the meter aggregator. + +use soroban_sdk::{contracterror, contracttype, Address}; + +/// A single raw meter reading as submitted by a device/source. +/// +/// Raw readings are short-lived: they are immediately folded into the matching +/// hourly bucket and pruned once older than [`crate::constants::MAX_RAW_RETENTION_SECS`]. +#[contracttype] +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct RawReading { + /// Ledger timestamp (seconds) at which the reading was recorded. + pub timestamp: u64, + /// Consumption value in fixed-point units (7 decimals). + pub value: i128, + /// Address that submitted the reading. + pub source: Address, +} + +/// Aggregated consumption for one hour window (`hour_epoch = timestamp / 3600`). +#[contracttype] +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct HourlyBucket { + /// Hour index since the unix epoch (`timestamp / ROLLUP_INTERVAL_SECS`). + pub hour_epoch: u64, + /// Sum of all reading values in this hour (fixed-point, 7 decimals). + pub total: i128, + /// Number of readings folded into this bucket. + pub count: u32, +} + +/// Aggregated consumption for one day window (`day_epoch = timestamp / 86400`). +/// +/// Produced by consolidating the 24 hourly buckets of a day via +/// [`crate::MeterAggregator::rollup_day`]. +#[contracttype] +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct DailyBucket { + /// Day index since the unix epoch (`timestamp / SECONDS_PER_DAY`). + pub day_epoch: u64, + /// Sum of all reading values in this day (fixed-point, 7 decimals). + pub total: i128, + /// Number of readings folded into this bucket. + pub count: u32, +} + +/// Errors surfaced by the contract. +#[contracterror] +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[repr(u32)] +pub enum Error { + /// `initialize` has not been called yet. + NotInitialized = 1, + /// `initialize` was called more than once. + AlreadyInitialized = 2, + /// Caller is not the configured admin. + NotAuthorized = 3, + /// Aggregation would overflow `i128`. + Overflow = 4, + /// `from_ts` is greater than `to_ts` in a range query. + InvalidTimeRange = 5, + /// Reading value must be non-negative. + NegativeValue = 6, +}