From fae3fbc920574442fca499e3bf0877d67f5be333 Mon Sep 17 00:00:00 2001 From: amanosiadnan-cmyk Date: Wed, 24 Jun 2026 12:30:23 +0000 Subject: [PATCH] Add webhook sender, fix analytics eviction drift, add batch-completed event --- .../hello-world/src/autoshare_logic.rs | 14 ++++- .../contracts/hello-world/src/base/events.rs | 13 +++++ contract/contracts/hello-world/src/lib.rs | 5 ++ .../hello-world/src/tests/batch_event_test.rs | 55 +++++++++++++++++++ listener/src/services/discord-notification.ts | 20 ++----- .../notification-analytics-aggregator.test.ts | 17 ++++++ .../notification-analytics-aggregator.ts | 22 ++++++++ listener/src/services/webhook-sender.ts | 29 ++++++++++ 8 files changed, 159 insertions(+), 16 deletions(-) create mode 100644 contract/contracts/hello-world/src/tests/batch_event_test.rs create mode 100644 listener/src/services/webhook-sender.ts diff --git a/contract/contracts/hello-world/src/autoshare_logic.rs b/contract/contracts/hello-world/src/autoshare_logic.rs index ad87d61..632755f 100644 --- a/contract/contracts/hello-world/src/autoshare_logic.rs +++ b/contract/contracts/hello-world/src/autoshare_logic.rs @@ -3,7 +3,7 @@ use crate::base::events::{ AdminTransferred, AuthorizationFailure, AutoshareCreated, AutoshareUpdated, ContractPaused, ContractUnpaused, GroupActivated, GroupDeactivated, NotificationCategory, NotificationExpired, NotificationPriority, NotificationRevoked, NotificationScheduled, ScheduledNotificationCancelled, - Withdrawal, + Withdrawal, BatchProcessingCompleted, }; use crate::base::types::{AutoShareDetails, GroupMember, PaymentHistory, ScheduledNotification}; use soroban_sdk::{contracttype, token, Address, BytesN, Env, String, Vec}; @@ -1083,3 +1083,15 @@ pub fn is_notification_revoked(env: Env, notification_id: BytesN<32>) -> Result< let notification = get_notification(env, notification_id)?; Ok(is_revoked(¬ification)) } + +/// Emits a `BatchProcessingCompleted` event for off-chain consumers. +pub fn emit_batch_completed(env: Env, batch_id: BytesN<32>, processed_count: u32) -> Result<(), Error> { + BatchProcessingCompleted { + batch_id, + category: NotificationCategory::Notification, + priority: NotificationPriority::Medium, + processed_count, + } + .publish(&env); + Ok(()) +} diff --git a/contract/contracts/hello-world/src/base/events.rs b/contract/contracts/hello-world/src/base/events.rs index fbb5dcd..e7d5cbb 100644 --- a/contract/contracts/hello-world/src/base/events.rs +++ b/contract/contracts/hello-world/src/base/events.rs @@ -237,3 +237,16 @@ pub struct NotificationRevoked { pub priority: NotificationPriority, pub revoked_at: u64, } + +/// Emitted when an off-chain batch of notifications finishes processing. +#[contractevent(data_format = "single-value")] +#[derive(Clone)] +pub struct BatchProcessingCompleted { + #[topic] + pub batch_id: BytesN<32>, + #[topic] + pub category: NotificationCategory, + #[topic] + pub priority: NotificationPriority, + pub processed_count: u32, +} diff --git a/contract/contracts/hello-world/src/lib.rs b/contract/contracts/hello-world/src/lib.rs index 0fef756..30034d8 100644 --- a/contract/contracts/hello-world/src/lib.rs +++ b/contract/contracts/hello-world/src/lib.rs @@ -290,6 +290,11 @@ impl AutoShareContract { autoshare_logic::expire_notification(env, notification_id).unwrap(); } + /// Emits a `BatchProcessingCompleted` event for off-chain listeners. + pub fn emit_batch_completed(env: Env, batch_id: BytesN<32>, processed_count: u32) { + autoshare_logic::emit_batch_completed(env, batch_id, processed_count).unwrap(); + } + /// Revokes a scheduled notification, preventing any further interaction with it. /// /// Only the notification creator or the contract admin can revoke a notification. diff --git a/contract/contracts/hello-world/src/tests/batch_event_test.rs b/contract/contracts/hello-world/src/tests/batch_event_test.rs new file mode 100644 index 0000000..04456fa --- /dev/null +++ b/contract/contracts/hello-world/src/tests/batch_event_test.rs @@ -0,0 +1,55 @@ +use crate::test_utils::setup_test_env; +use crate::AutoShareContractClient; +use crate::base::events::NotificationCategory; +use crate::base::events::NotificationPriority; +use soroban_sdk::testutils::Events; +use soroban_sdk::{BytesN, Symbol, Val}; + +#[test] +fn test_emit_batch_processing_completed_event() { + let test_env = setup_test_env(); + let client = AutoShareContractClient::new(&test_env.env, &test_env.autoshare_contract); + + let mut id_bytes = [0u8; 32]; + id_bytes[0] = 7; + let batch_id = BytesN::from_array(&test_env.env, &id_bytes); + let processed = 42u32; + + client.emit_batch_completed(&batch_id, &processed); + + // Ensure the event was emitted with expected topics and data + let emitted = test_env + .env + .events() + .all() + .iter() + .find(|(_addr, topics, _data)| { + if topics.is_empty() { + return false; + } + let first = topics.get(0).unwrap(); + if let Ok(name) = Symbol::try_from_val(&test_env.env, &first) { + return name == Symbol::new(&test_env.env, "batch_processing_completed"); + } + false + }) + .expect("expected batch_processing_completed event"); + + // topics shape: [name, batch_id, category, priority] + let topics = &emitted.1; + assert_eq!(topics.len(), 4); + + let topic_batch = BytesN::<32>::try_from_val(&test_env.env, &topics.get(1).unwrap()).unwrap(); + assert_eq!(topic_batch, batch_id); + + let category = NotificationCategory::try_from_val(&test_env.env, &topics.get(2).unwrap()).unwrap(); + assert_eq!(category, NotificationCategory::Notification); + + let priority = NotificationPriority::try_from_val(&test_env.env, &topics.get(3).unwrap()).unwrap(); + assert_eq!(priority, NotificationPriority::Medium); + + // data should contain the processed_count (u32) + let data = emitted.2; + let val = u32::try_from_val(&test_env.env, &data).unwrap(); + assert_eq!(val, processed); +} diff --git a/listener/src/services/discord-notification.ts b/listener/src/services/discord-notification.ts index 37cbb6b..9fbae5f 100644 --- a/listener/src/services/discord-notification.ts +++ b/listener/src/services/discord-notification.ts @@ -4,6 +4,7 @@ import { ContractConfig, DiscordConfig } from '../types'; import { getEventName } from '../utils/event-utils'; import { NotificationDeduplicator, generateFingerprint } from './notification-deduplicator'; import { getNotificationAnalyticsAggregator, NotificationAnalyticsAggregator } from './notification-analytics-aggregator'; +import { sendWebhook } from './webhook-sender'; import { NotificationType } from '../types/scheduled-notification'; export interface DiscordMessage { @@ -186,31 +187,20 @@ export class DiscordNotificationService { } private async sendWebhook(message: DiscordMessage): Promise { - const timeoutMs = this.config.timeoutMs ?? 5000; - const controller = new AbortController(); - const timeoutId = setTimeout(() => controller.abort(), timeoutMs); - try { - const response = await fetch(this.config.webhookUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(message), - signal: controller.signal as any, + const response = await sendWebhook(this.config.webhookUrl, message, { + timeoutMs: this.config.timeoutMs, }); return response; } catch (error: any) { - if (error.name === 'AbortError') { + if (error && error.name === 'AbortError') { this.timeoutCount++; logger.error('Discord webhook request timed out', { webhookId: this.config.webhookId, - timeoutMs, + timeoutMs: this.config.timeoutMs ?? 5000, }); } throw error; - } finally { - clearTimeout(timeoutId); } } diff --git a/listener/src/services/notification-analytics-aggregator.test.ts b/listener/src/services/notification-analytics-aggregator.test.ts index 5962050..affe99c 100644 --- a/listener/src/services/notification-analytics-aggregator.test.ts +++ b/listener/src/services/notification-analytics-aggregator.test.ts @@ -81,6 +81,23 @@ describe('NotificationAnalyticsAggregator', () => { expect(small.lifetimeCount).toBe(5); expect(small.size).toBe(2); }); + + it('maintains correct counters after eviction (no drift)', () => { + const small = new NotificationAnalyticsAggregator({ maxRecords: 3, now: () => fixedNow }); + // push 5 records with mixed outcomes + small.record(buildRecord('success')); + small.record(buildRecord('failure')); + small.record(buildRecord('success')); + small.record(buildRecord('failure')); + small.record(buildRecord('success')); + + // only last 3 should be visible: success, failure, success + expect(small.size).toBe(3); + const snap = small.snapshot(); + expect(snap.overall.total).toBe(3); + expect(snap.overall.success).toBe(2); + expect(snap.overall.failure).toBe(1); + }); }); describe('outcome classification', () => { diff --git a/listener/src/services/notification-analytics-aggregator.ts b/listener/src/services/notification-analytics-aggregator.ts index f2d936f..1742487 100644 --- a/listener/src/services/notification-analytics-aggregator.ts +++ b/listener/src/services/notification-analytics-aggregator.ts @@ -161,6 +161,28 @@ export class NotificationAnalyticsAggregator { if (this.records.length > this.maxRecords) { const evicted = this.records.length - this.maxRecords; + // adjust counters based on evicted records to avoid drift + for (let i = 0; i < evicted; i++) { + const r = this.records[i]; + switch (r.outcome) { + case 'success': + this.successCount = Math.max(0, this.successCount - 1); + break; + case 'failure': + this.failureCount = Math.max(0, this.failureCount - 1); + break; + case 'retry': + this.retryCount = Math.max(0, this.retryCount - 1); + break; + case 'skipped': + this.skippedCount = Math.max(0, this.skippedCount - 1); + break; + } + if (r.durationMs > 0) { + this.totalDurationMs = Math.max(0, this.totalDurationMs - r.durationMs); + this.durationSamples = Math.max(0, this.durationSamples - 1); + } + } this.records.splice(0, evicted); } } diff --git a/listener/src/services/webhook-sender.ts b/listener/src/services/webhook-sender.ts new file mode 100644 index 0000000..a3ca830 --- /dev/null +++ b/listener/src/services/webhook-sender.ts @@ -0,0 +1,29 @@ +export interface WebhookSendOptions { + timeoutMs?: number; + headers?: Record; +} + +export async function sendWebhook( + url: string, + payload: any, + opts: WebhookSendOptions = {}, +): Promise { + const timeoutMs = opts.timeoutMs ?? 5000; + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), timeoutMs); + + try { + const response = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...(opts.headers ?? {}), + }, + body: JSON.stringify(payload), + signal: controller.signal as any, + }); + return response; + } finally { + clearTimeout(timeoutId); + } +}