Skip to content
26 changes: 13 additions & 13 deletions src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,19 +304,6 @@ pub async fn register_meter(
}))
}

#[derive(Deserialize)]
pub struct RotateKeyRequest {
pub meter_id: String,
pub new_public_key_hex: String,
pub old_signature_hex: String,
}

#[derive(Serialize)]
pub struct RotateKeyResponse {
pub meter_id: String,
pub status: String,
}

pub async fn rotate_key(
Json(body): Json<RotateKeyRequest>,
) -> Result<Json<RotateKeyResponse>, StatusCode> {
Expand Down Expand Up @@ -345,6 +332,19 @@ pub async fn rotate_key(
}))
}

#[derive(Deserialize)]
pub struct RotateKeyRequest {
pub meter_id: String,
pub new_public_key_hex: String,
pub old_signature_hex: String,
}

#[derive(Serialize)]
pub struct RotateKeyResponse {
pub meter_id: String,
pub status: String,
}

#[derive(Serialize)]
pub struct RateLimiterStatusResponse {
pub top_sources: Vec<(String, u64)>,
Expand Down
23 changes: 11 additions & 12 deletions src/api/middleware.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use axum::{
body::Body,
extract::ConnectInfo,
extract::State,
extract::{ConnectInfo, State},
http::{Request, StatusCode},
middleware::Next,
response::Response,
Expand All @@ -12,7 +11,7 @@ use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::time::{Duration, Instant};
use std::time::{Duration, Instant};
use tracing::warn;

lazy_static::lazy_static! {
Expand All @@ -24,10 +23,10 @@ fn now_ns() -> u64 {
}

pub struct TokenBucket {
tokens: AtomicU64,
max_tokens: u64,
pub(crate) tokens: AtomicU64,
pub(crate) max_tokens: u64,
pub(crate) refill_rate: u64,
last_refill_ns: AtomicU64,
pub(crate) last_refill_ns: AtomicU64,
}

impl TokenBucket {
Expand Down Expand Up @@ -129,12 +128,12 @@ pub struct FraudContext {
}

pub struct DynamicRateLimiter {
global_bucket: TokenBucket,
per_source_buckets: DashMap<String, Arc<TokenBucket>>,
sliding_windows: DashMap<String, Arc<Mutex<SlidingWindow>>>,
fraud_contexts: DashMap<String, Arc<Mutex<FraudContext>>>,
rejection_counts: DashMap<String, u64>,
last_accessed: DashMap<String, Instant>,
pub(crate) global_bucket: TokenBucket,
pub(crate) per_source_buckets: DashMap<String, Arc<TokenBucket>>,
pub(crate) sliding_windows: DashMap<String, Arc<Mutex<SlidingWindow>>>,
pub(crate) fraud_contexts: DashMap<String, Arc<Mutex<FraudContext>>>,
pub(crate) rejection_counts: DashMap<String, u64>,
pub(crate) last_accessed: DashMap<String, Instant>,
}

impl DynamicRateLimiter {
Expand Down
8 changes: 4 additions & 4 deletions src/api/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ pub async fn build_router(state: AppState) -> anyhow::Result<Router> {
.route("/api/v1/meters/rotate-key", post(handlers::rotate_key))
.route("/api/v1/nonce/status", get(handlers::nonce_status))
.route("/api/v1/gateway/locks", get(handlers::list_gateway_locks))
.route(
"/api/v1/rate-limiter/status",
get(handlers::rate_limiter_status),
)
.route("/metrics", get(handlers::metrics_handler))
.route("/debug/clock_state", get(handlers::clock_state))
.route(
Expand All @@ -46,6 +42,10 @@ pub async fn build_router(state: AppState) -> anyhow::Result<Router> {
"/api/v1/database/compression/status",
get(handlers::compression_status),
)
.route(
"/api/v1/rate-limiter/status",
get(handlers::rate_limiter_status),
)
.layer(axum_mw::from_fn_with_state(
state.clone(),
crate::api::middleware::rate_limit_layer,
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async fn main() -> anyhow::Result<()> {

let advisory_lock = Arc::new(AdvisoryLock::postgres(db_pool.clone()));
let rate_limiter = DynamicRateLimiter::new();

let state = AppState {
sequencer,
db_pool,
Expand Down
Loading