diff --git a/contract/contracts/hello-world/src/autoshare_logic.rs b/contract/contracts/hello-world/src/autoshare_logic.rs index a099068..20e4dd7 100644 --- a/contract/contracts/hello-world/src/autoshare_logic.rs +++ b/contract/contracts/hello-world/src/autoshare_logic.rs @@ -3,6 +3,8 @@ use crate::base::events::{ AdminTransferred, AuditAction, AuditRecordAppended, AuthorizationFailure, AutoshareCreated, AutoshareUpdated, BatchNotificationsCreated, CategoryRegistered, ContractPaused, ContractUnpaused, GroupActivated, GroupDeactivated, NotificationCategory, NotificationExpired, + NotificationPriority, NotificationRevoked, NotificationScheduled, ScheduledNotificationCancelled, + Withdrawal, BatchProcessingCompleted, NotificationExtended, NotificationLimitsConfigured, NotificationPriority, NotificationRevoked, NotificationScheduled, ScheduledNotificationCancelled, Withdrawal, }; @@ -1456,6 +1458,17 @@ pub fn is_notification_revoked(env: Env, notification_id: BytesN<32>) -> Result< Ok(is_revoked(¬ification)) } +/// Emits a `BatchProcessingCompleted` event for off-chain consumers. +pub fn emit_batch_completed(env: Env, batch_id: BytesN<32>, processed_count: u32) -> Result<(), Error> { + BatchProcessingCompleted { + batch_id, + category: NotificationCategory::Notification, + priority: NotificationPriority::Medium, + processed_count, + } + .publish(&env); + Ok(()) +} /// Extends the expiration period of a scheduled notification by `extension_seconds`. /// /// Only authorized callers (the notification creator or the contract admin) can diff --git a/contract/contracts/hello-world/src/base/events.rs b/contract/contracts/hello-world/src/base/events.rs index 2c46678..3bd65f9 100644 --- a/contract/contracts/hello-world/src/base/events.rs +++ b/contract/contracts/hello-world/src/base/events.rs @@ -313,6 +313,12 @@ pub struct NotificationRevoked { pub revoked_at: u64, } +/// Emitted when an off-chain batch of notifications finishes processing. +#[contractevent(data_format = "single-value")] +#[derive(Clone)] +pub struct BatchProcessingCompleted { + #[topic] + pub batch_id: BytesN<32>, /// Emitted when a scheduled notification's expiry period is extended by an authorized sender. #[contractevent(data_format = "single-value")] #[derive(Clone)] @@ -338,6 +344,7 @@ pub struct NotificationLimitsConfigured { pub category: NotificationCategory, #[topic] pub priority: NotificationPriority, + pub processed_count: u32, pub max_payload_size: u32, pub max_expiration_seconds: u64, pub min_expiration_seconds: u64, diff --git a/contract/contracts/hello-world/src/lib.rs b/contract/contracts/hello-world/src/lib.rs index c36d802..fa619c4 100644 --- a/contract/contracts/hello-world/src/lib.rs +++ b/contract/contracts/hello-world/src/lib.rs @@ -387,6 +387,9 @@ impl AutoShareContract { autoshare_logic::expire_notification(env, notification_id).unwrap(); } + /// Emits a `BatchProcessingCompleted` event for off-chain listeners. + pub fn emit_batch_completed(env: Env, batch_id: BytesN<32>, processed_count: u32) { + autoshare_logic::emit_batch_completed(env, batch_id, processed_count).unwrap(); // ============================================================================ // Batch Notification Creation // ============================================================================ diff --git a/contract/contracts/hello-world/src/tests/batch_event_test.rs b/contract/contracts/hello-world/src/tests/batch_event_test.rs new file mode 100644 index 0000000..04456fa --- /dev/null +++ b/contract/contracts/hello-world/src/tests/batch_event_test.rs @@ -0,0 +1,55 @@ +use crate::test_utils::setup_test_env; +use crate::AutoShareContractClient; +use crate::base::events::NotificationCategory; +use crate::base::events::NotificationPriority; +use soroban_sdk::testutils::Events; +use soroban_sdk::{BytesN, Symbol, Val}; + +#[test] +fn test_emit_batch_processing_completed_event() { + let test_env = setup_test_env(); + let client = AutoShareContractClient::new(&test_env.env, &test_env.autoshare_contract); + + let mut id_bytes = [0u8; 32]; + id_bytes[0] = 7; + let batch_id = BytesN::from_array(&test_env.env, &id_bytes); + let processed = 42u32; + + client.emit_batch_completed(&batch_id, &processed); + + // Ensure the event was emitted with expected topics and data + let emitted = test_env + .env + .events() + .all() + .iter() + .find(|(_addr, topics, _data)| { + if topics.is_empty() { + return false; + } + let first = topics.get(0).unwrap(); + if let Ok(name) = Symbol::try_from_val(&test_env.env, &first) { + return name == Symbol::new(&test_env.env, "batch_processing_completed"); + } + false + }) + .expect("expected batch_processing_completed event"); + + // topics shape: [name, batch_id, category, priority] + let topics = &emitted.1; + assert_eq!(topics.len(), 4); + + let topic_batch = BytesN::<32>::try_from_val(&test_env.env, &topics.get(1).unwrap()).unwrap(); + assert_eq!(topic_batch, batch_id); + + let category = NotificationCategory::try_from_val(&test_env.env, &topics.get(2).unwrap()).unwrap(); + assert_eq!(category, NotificationCategory::Notification); + + let priority = NotificationPriority::try_from_val(&test_env.env, &topics.get(3).unwrap()).unwrap(); + assert_eq!(priority, NotificationPriority::Medium); + + // data should contain the processed_count (u32) + let data = emitted.2; + let val = u32::try_from_val(&test_env.env, &data).unwrap(); + assert_eq!(val, processed); +} diff --git a/listener/src/services/discord-notification.ts b/listener/src/services/discord-notification.ts index 37cbb6b..9fbae5f 100644 --- a/listener/src/services/discord-notification.ts +++ b/listener/src/services/discord-notification.ts @@ -4,6 +4,7 @@ import { ContractConfig, DiscordConfig } from '../types'; import { getEventName } from '../utils/event-utils'; import { NotificationDeduplicator, generateFingerprint } from './notification-deduplicator'; import { getNotificationAnalyticsAggregator, NotificationAnalyticsAggregator } from './notification-analytics-aggregator'; +import { sendWebhook } from './webhook-sender'; import { NotificationType } from '../types/scheduled-notification'; export interface DiscordMessage { @@ -186,31 +187,20 @@ export class DiscordNotificationService { } private async sendWebhook(message: DiscordMessage): Promise { - const timeoutMs = this.config.timeoutMs ?? 5000; - const controller = new AbortController(); - const timeoutId = setTimeout(() => controller.abort(), timeoutMs); - try { - const response = await fetch(this.config.webhookUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(message), - signal: controller.signal as any, + const response = await sendWebhook(this.config.webhookUrl, message, { + timeoutMs: this.config.timeoutMs, }); return response; } catch (error: any) { - if (error.name === 'AbortError') { + if (error && error.name === 'AbortError') { this.timeoutCount++; logger.error('Discord webhook request timed out', { webhookId: this.config.webhookId, - timeoutMs, + timeoutMs: this.config.timeoutMs ?? 5000, }); } throw error; - } finally { - clearTimeout(timeoutId); } } diff --git a/listener/src/services/notification-analytics-aggregator.test.ts b/listener/src/services/notification-analytics-aggregator.test.ts index 5962050..affe99c 100644 --- a/listener/src/services/notification-analytics-aggregator.test.ts +++ b/listener/src/services/notification-analytics-aggregator.test.ts @@ -81,6 +81,23 @@ describe('NotificationAnalyticsAggregator', () => { expect(small.lifetimeCount).toBe(5); expect(small.size).toBe(2); }); + + it('maintains correct counters after eviction (no drift)', () => { + const small = new NotificationAnalyticsAggregator({ maxRecords: 3, now: () => fixedNow }); + // push 5 records with mixed outcomes + small.record(buildRecord('success')); + small.record(buildRecord('failure')); + small.record(buildRecord('success')); + small.record(buildRecord('failure')); + small.record(buildRecord('success')); + + // only last 3 should be visible: success, failure, success + expect(small.size).toBe(3); + const snap = small.snapshot(); + expect(snap.overall.total).toBe(3); + expect(snap.overall.success).toBe(2); + expect(snap.overall.failure).toBe(1); + }); }); describe('outcome classification', () => { diff --git a/listener/src/services/notification-analytics-aggregator.ts b/listener/src/services/notification-analytics-aggregator.ts index 433327b..d01cc7c 100644 --- a/listener/src/services/notification-analytics-aggregator.ts +++ b/listener/src/services/notification-analytics-aggregator.ts @@ -162,6 +162,28 @@ export class NotificationAnalyticsAggregator { if (this.records.length > this.maxRecords) { const evicted = this.records.length - this.maxRecords; + // adjust counters based on evicted records to avoid drift + for (let i = 0; i < evicted; i++) { + const r = this.records[i]; + switch (r.outcome) { + case 'success': + this.successCount = Math.max(0, this.successCount - 1); + break; + case 'failure': + this.failureCount = Math.max(0, this.failureCount - 1); + break; + case 'retry': + this.retryCount = Math.max(0, this.retryCount - 1); + break; + case 'skipped': + this.skippedCount = Math.max(0, this.skippedCount - 1); + break; + } + if (r.durationMs > 0) { + this.totalDurationMs = Math.max(0, this.totalDurationMs - r.durationMs); + this.durationSamples = Math.max(0, this.durationSamples - 1); + } + } this.records.splice(0, evicted); } } diff --git a/listener/src/services/webhook-sender.ts b/listener/src/services/webhook-sender.ts new file mode 100644 index 0000000..a3ca830 --- /dev/null +++ b/listener/src/services/webhook-sender.ts @@ -0,0 +1,29 @@ +export interface WebhookSendOptions { + timeoutMs?: number; + headers?: Record; +} + +export async function sendWebhook( + url: string, + payload: any, + opts: WebhookSendOptions = {}, +): Promise { + const timeoutMs = opts.timeoutMs ?? 5000; + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), timeoutMs); + + try { + const response = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...(opts.headers ?? {}), + }, + body: JSON.stringify(payload), + signal: controller.signal as any, + }); + return response; + } finally { + clearTimeout(timeoutId); + } +}