Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 103 additions & 75 deletions services/api/src/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ impl AuditLogger {
Ok(id)
}

/// Query audit log entries with filters
/// Query audit log entries with filters.
///
/// Uses `sqlx::QueryBuilder` so every filter value is bound as a typed
/// parameter — the SQL string never contains user-supplied data directly.
pub async fn query(
&self,
actor: Option<&str>,
Expand All @@ -101,52 +104,7 @@ impl AuditLogger {
limit: i64,
offset: i64,
) -> anyhow::Result<Vec<AuditLogEntry>> {
let mut query = String::from(
r#"
SELECT id, timestamp, actor, actor_ip, action, resource_type, resource_id,
details, status, error_message, request_id, user_agent
FROM audit_log
WHERE 1=1
"#,
);

let mut bind_count = 0;
let mut bindings: Vec<Box<dyn sqlx::Encode<'_, sqlx::Postgres> + Send>> = Vec::new();

if let Some(a) = actor {
bind_count += 1;
query.push_str(&format!(" AND actor = ${}", bind_count));
}

if let Some(a) = action {
bind_count += 1;
query.push_str(&format!(" AND action = ${}", bind_count));
}

if let Some(rt) = resource_type {
bind_count += 1;
query.push_str(&format!(" AND resource_type = ${}", bind_count));
}

if let Some(f) = from {
bind_count += 1;
query.push_str(&format!(" AND timestamp >= ${}", bind_count));
}

if let Some(t) = to {
bind_count += 1;
query.push_str(&format!(" AND timestamp <= ${}", bind_count));
}

query.push_str(" ORDER BY timestamp DESC");

bind_count += 1;
query.push_str(&format!(" LIMIT ${}", bind_count));

bind_count += 1;
query.push_str(&format!(" OFFSET ${}", bind_count));

let mut q = sqlx::query_as::<_, (
type Row = (
i64,
DateTime<Utc>,
String,
Expand All @@ -159,26 +117,34 @@ impl AuditLogger {
Option<String>,
Option<Uuid>,
Option<String>,
)>(&query);
);

let mut qb = sqlx::QueryBuilder::<sqlx::Postgres>::new(
"SELECT id, timestamp, actor, actor_ip, action, resource_type, resource_id, \
details, status, error_message, request_id, user_agent \
FROM audit_log WHERE 1=1",
);

if let Some(a) = actor {
q = q.bind(a);
qb.push(" AND actor = ").push_bind(a);
}
if let Some(a) = action {
q = q.bind(a);
qb.push(" AND action = ").push_bind(a);
}
if let Some(rt) = resource_type {
q = q.bind(rt);
qb.push(" AND resource_type = ").push_bind(rt);
}
if let Some(f) = from {
q = q.bind(f);
qb.push(" AND timestamp >= ").push_bind(f);
}
if let Some(t) = to {
q = q.bind(t);
qb.push(" AND timestamp <= ").push_bind(t);
}
q = q.bind(limit).bind(offset);

let rows = q.fetch_all(&self.pool).await?;
qb.push(" ORDER BY timestamp DESC LIMIT ").push_bind(limit);
qb.push(" OFFSET ").push_bind(offset);

let rows: Vec<Row> = qb.build_query_as().fetch_all(&self.pool).await?;

Ok(rows
.into_iter()
Expand All @@ -196,26 +162,24 @@ impl AuditLogger {
error_message,
request_id,
user_agent,
)| {
AuditLogEntry {
id: Some(id),
timestamp,
actor,
actor_ip: actor_ip_str.and_then(|s| s.parse().ok()),
action,
resource_type,
resource_id,
details,
status: match status.as_str() {
"success" => AuditStatus::Success,
"failure" => AuditStatus::Failure,
"partial" => AuditStatus::Partial,
_ => AuditStatus::Success,
},
error_message,
request_id,
user_agent,
}
)| AuditLogEntry {
id: Some(id),
timestamp,
actor,
actor_ip: actor_ip_str.and_then(|s| s.parse().ok()),
action,
resource_type,
resource_id,
details,
status: match status.as_str() {
"success" => AuditStatus::Success,
"failure" => AuditStatus::Failure,
"partial" => AuditStatus::Partial,
_ => AuditStatus::Success,
},
error_message,
request_id,
user_agent,
},
)
.collect())
Expand Down Expand Up @@ -257,6 +221,70 @@ pub struct AuditStatistics {
pub failed: i64,
}

#[cfg(test)]
mod tests {
use super::*;

fn make_entry(actor: &str, action: &str, status: AuditStatus) -> AuditLogEntry {
AuditLogEntry {
id: None,
timestamp: Utc::now(),
actor: actor.to_string(),
actor_ip: None,
action: action.to_string(),
resource_type: "market".to_string(),
resource_id: None,
details: None,
status,
error_message: None,
request_id: None,
user_agent: None,
}
}

#[test]
fn audit_status_display_success() {
assert_eq!(AuditStatus::Success.to_string(), "success");
}

#[test]
fn audit_status_display_failure() {
assert_eq!(AuditStatus::Failure.to_string(), "failure");
}

#[test]
fn audit_status_display_partial() {
assert_eq!(AuditStatus::Partial.to_string(), "partial");
}

#[test]
fn create_audit_entry_sets_expected_fields() {
let entry = create_audit_entry(
"api_key_123".to_string(),
None,
"resolve_market".to_string(),
"market".to_string(),
Some("42".to_string()),
None,
None,
None,
);
assert_eq!(entry.actor, "api_key_123");
assert_eq!(entry.action, "resolve_market");
assert_eq!(entry.resource_type, "market");
assert_eq!(entry.resource_id, Some("42".to_string()));
assert!(matches!(entry.status, AuditStatus::Success));
assert!(entry.id.is_none());
}

#[test]
fn make_entry_helper_sets_status() {
let e = make_entry("admin", "delete", AuditStatus::Failure);
assert!(matches!(e.status, AuditStatus::Failure));
assert_eq!(e.actor, "admin");
}
}

/// Helper to create audit log entry from request context
pub fn create_audit_entry(
actor: String,
Expand Down
62 changes: 61 additions & 1 deletion services/api/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,25 @@ use crate::{
/// Errors that can be returned by [`Database`] methods.
#[derive(Debug)]
pub enum DbError {
/// A query exceeded the per-operation timeout.
Timeout,
/// The connection pool had no connections available within the acquire timeout.
PoolExhausted,
/// A database constraint was violated (unique, foreign-key, not-null, check).
/// The inner string is the database error message for logging.
ConstraintViolation(String),
/// Any other database error.
Other(anyhow::Error),
}

impl std::fmt::Display for DbError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DbError::Timeout => write!(f, "database query timed out"),
DbError::PoolExhausted => write!(f, "database connection pool exhausted"),
DbError::ConstraintViolation(msg) => {
write!(f, "database constraint violation: {msg}")
}
DbError::Other(e) => write!(f, "{e}"),
}
}
Expand All @@ -31,7 +42,19 @@ impl std::error::Error for DbError {}

impl From<sqlx::Error> for DbError {
fn from(e: sqlx::Error) -> Self {
DbError::Other(anyhow::Error::from(e))
match &e {
sqlx::Error::PoolTimedOut => DbError::PoolExhausted,
sqlx::Error::Database(db_err) => {
// PostgreSQL constraint violation SQLSTATE codes start with "23"
// (23000 integrity constraint, 23505 unique violation, etc.).
if db_err.code().map(|c| c.starts_with("23")).unwrap_or(false) {
DbError::ConstraintViolation(db_err.message().to_string())
} else {
DbError::Other(anyhow::Error::from(e))
}
}
_ => DbError::Other(anyhow::Error::from(e)),
}
}
}

Expand Down Expand Up @@ -725,3 +748,40 @@ impl Database {
Ok(count > 0)
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn db_error_timeout_display() {
let e = DbError::Timeout;
assert_eq!(e.to_string(), "database query timed out");
}

#[test]
fn db_error_pool_exhausted_display() {
let e = DbError::PoolExhausted;
assert_eq!(e.to_string(), "database connection pool exhausted");
}

#[test]
fn db_error_constraint_violation_display() {
let e = DbError::ConstraintViolation("duplicate key value".to_string());
assert!(e.to_string().contains("constraint violation"));
assert!(e.to_string().contains("duplicate key value"));
}

#[test]
fn from_sqlx_pool_timed_out_maps_to_pool_exhausted() {
let e = DbError::from(sqlx::Error::PoolTimedOut);
assert!(matches!(e, DbError::PoolExhausted));
}

#[test]
fn from_sqlx_other_maps_to_other() {
let e = DbError::from(sqlx::Error::RowNotFound);
assert!(matches!(e, DbError::Other(_)));
}
}
32 changes: 28 additions & 4 deletions services/api/src/email/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,15 @@ impl EmailQueue {
.collect()
}

/// Minimum delay (seconds) before a requeued dead-letter job is eligible
/// for processing. Prevents immediate re-failure loops on persistent errors.
const DEAD_LETTER_REQUEUE_DELAY_SECS: i64 = 60;

/// Move a job from the dead-letter set back to the main queue for reprocessing.
///
/// The job is scheduled `DEAD_LETTER_REQUEUE_DELAY_SECS` seconds in the future
/// so a persistent failure does not cause a tight retry loop. The attempts counter
/// is also reset to 0 so the job gets its full retry budget again.
pub async fn requeue_dead_letter(&self, job_id: Uuid) -> Result<bool> {
let mut conn = self.cache.get_connection().await?;

Expand All @@ -280,18 +288,28 @@ impl EmailQueue {
return Ok(false);
}

// Reset DB status so the worker will pick it up again.
// Reset attempts to 0 so the job gets its full retry budget.
self.db
.email_update_job_attempts(job_id, 0, None)
.await?;

// Reset status to pending.
self.db
.email_update_job_status(job_id, crate::email::types::EmailJobStatus::Pending.as_str(), None)
.await?;

let score = chrono::Utc::now().timestamp() as f64;
// Schedule processing after the cooling-off delay to prevent tight loops.
let eligible_at = chrono::Utc::now().timestamp() + Self::DEAD_LETTER_REQUEUE_DELAY_SECS;
let _: () = conn
.zadd(EMAIL_QUEUE_KEY, job_id.to_string(), score)
.zadd(EMAIL_QUEUE_KEY, job_id.to_string(), eligible_at as f64)
.await
.context("Failed to re-enqueue dead-letter job")?;

tracing::info!("Requeued dead-letter email job: {}", job_id);
tracing::info!(
job_id = %job_id,
delay_secs = Self::DEAD_LETTER_REQUEUE_DELAY_SECS,
"Requeued dead-letter email job with cooling-off delay"
);
Ok(true)
}

Expand Down Expand Up @@ -518,6 +536,12 @@ pub struct QueueStats {
mod tests {
use super::*;

#[test]
fn dead_letter_requeue_delay_is_positive() {
assert!(EmailQueue::DEAD_LETTER_REQUEUE_DELAY_SECS > 0,
"cooling-off delay must be positive to prevent immediate re-failure loops");
}

/// Test that recover_orphaned_jobs correctly identifies stale jobs.
///
/// Acceptance criteria for #472:
Expand Down
13 changes: 11 additions & 2 deletions services/api/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,17 @@ impl IntoResponse for ApiError {

fn into_api_error(err: anyhow::Error) -> ApiError {
if let Some(db_err) = err.downcast_ref::<DbError>() {
if matches!(db_err, DbError::Timeout) {
return ApiError::service_unavailable("database query timed out");
match db_err {
DbError::Timeout => {
return ApiError::service_unavailable("database query timed out");
}
DbError::PoolExhausted => {
return ApiError::service_unavailable("database connection pool exhausted");
}
DbError::ConstraintViolation(msg) => {
return ApiError::conflict(msg.clone());
}
DbError::Other(_) => {}
}
}
ApiError::internal(err)
Expand Down
Loading
Loading