Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions contract/contracts/hello-world/src/autoshare_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crate::base::events::{
AdminTransferred, AuditAction, AuditRecordAppended, AuthorizationFailure, AutoshareCreated,
AutoshareUpdated, BatchNotificationsCreated, CategoryRegistered, ContractPaused,
ContractUnpaused, GroupActivated, GroupDeactivated, NotificationCategory, NotificationExpired,
NotificationPriority, NotificationRevoked, NotificationScheduled, ScheduledNotificationCancelled,
Withdrawal, BatchProcessingCompleted,
NotificationExtended, NotificationLimitsConfigured, NotificationPriority, NotificationRevoked,
NotificationScheduled, ScheduledNotificationCancelled, Withdrawal,
};
Expand Down Expand Up @@ -1456,6 +1458,17 @@ pub fn is_notification_revoked(env: Env, notification_id: BytesN<32>) -> Result<
Ok(is_revoked(&notification))
}

/// Emits a `BatchProcessingCompleted` event for off-chain consumers.
pub fn emit_batch_completed(env: Env, batch_id: BytesN<32>, processed_count: u32) -> Result<(), Error> {
BatchProcessingCompleted {
batch_id,
category: NotificationCategory::Notification,
priority: NotificationPriority::Medium,
processed_count,
}
.publish(&env);
Ok(())
}
/// Extends the expiration period of a scheduled notification by `extension_seconds`.
///
/// Only authorized callers (the notification creator or the contract admin) can
Expand Down
7 changes: 7 additions & 0 deletions contract/contracts/hello-world/src/base/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ pub struct NotificationRevoked {
pub revoked_at: u64,
}

/// Emitted when an off-chain batch of notifications finishes processing.
#[contractevent(data_format = "single-value")]
#[derive(Clone)]
pub struct BatchProcessingCompleted {
#[topic]
pub batch_id: BytesN<32>,
/// Emitted when a scheduled notification's expiry period is extended by an authorized sender.
#[contractevent(data_format = "single-value")]
#[derive(Clone)]
Expand All @@ -338,6 +344,7 @@ pub struct NotificationLimitsConfigured {
pub category: NotificationCategory,
#[topic]
pub priority: NotificationPriority,
pub processed_count: u32,
pub max_payload_size: u32,
pub max_expiration_seconds: u64,
pub min_expiration_seconds: u64,
Expand Down
3 changes: 3 additions & 0 deletions contract/contracts/hello-world/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ impl AutoShareContract {
autoshare_logic::expire_notification(env, notification_id).unwrap();
}

/// Emits a `BatchProcessingCompleted` event for off-chain listeners.
pub fn emit_batch_completed(env: Env, batch_id: BytesN<32>, processed_count: u32) {
autoshare_logic::emit_batch_completed(env, batch_id, processed_count).unwrap();
// ============================================================================
// Batch Notification Creation
// ============================================================================
Expand Down
55 changes: 55 additions & 0 deletions contract/contracts/hello-world/src/tests/batch_event_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use crate::test_utils::setup_test_env;
use crate::AutoShareContractClient;
use crate::base::events::NotificationCategory;
use crate::base::events::NotificationPriority;
use soroban_sdk::testutils::Events;
use soroban_sdk::{BytesN, Symbol, Val};

#[test]
fn test_emit_batch_processing_completed_event() {
let test_env = setup_test_env();
let client = AutoShareContractClient::new(&test_env.env, &test_env.autoshare_contract);

let mut id_bytes = [0u8; 32];
id_bytes[0] = 7;
let batch_id = BytesN::from_array(&test_env.env, &id_bytes);
let processed = 42u32;

client.emit_batch_completed(&batch_id, &processed);

// Ensure the event was emitted with expected topics and data
let emitted = test_env
.env
.events()
.all()
.iter()
.find(|(_addr, topics, _data)| {
if topics.is_empty() {
return false;
}
let first = topics.get(0).unwrap();
if let Ok(name) = Symbol::try_from_val(&test_env.env, &first) {
return name == Symbol::new(&test_env.env, "batch_processing_completed");
}
false
})
.expect("expected batch_processing_completed event");

// topics shape: [name, batch_id, category, priority]
let topics = &emitted.1;
assert_eq!(topics.len(), 4);

let topic_batch = BytesN::<32>::try_from_val(&test_env.env, &topics.get(1).unwrap()).unwrap();
assert_eq!(topic_batch, batch_id);

let category = NotificationCategory::try_from_val(&test_env.env, &topics.get(2).unwrap()).unwrap();
assert_eq!(category, NotificationCategory::Notification);

let priority = NotificationPriority::try_from_val(&test_env.env, &topics.get(3).unwrap()).unwrap();
assert_eq!(priority, NotificationPriority::Medium);

// data should contain the processed_count (u32)
let data = emitted.2;
let val = u32::try_from_val(&test_env.env, &data).unwrap();
assert_eq!(val, processed);
}
20 changes: 5 additions & 15 deletions listener/src/services/discord-notification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ContractConfig, DiscordConfig } from '../types';
import { getEventName } from '../utils/event-utils';
import { NotificationDeduplicator, generateFingerprint } from './notification-deduplicator';
import { getNotificationAnalyticsAggregator, NotificationAnalyticsAggregator } from './notification-analytics-aggregator';
import { sendWebhook } from './webhook-sender';
import { NotificationType } from '../types/scheduled-notification';

export interface DiscordMessage {
Expand Down Expand Up @@ -186,31 +187,20 @@ export class DiscordNotificationService {
}

private async sendWebhook(message: DiscordMessage): Promise<Response> {
const timeoutMs = this.config.timeoutMs ?? 5000;
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);

try {
const response = await fetch(this.config.webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(message),
signal: controller.signal as any,
const response = await sendWebhook(this.config.webhookUrl, message, {
timeoutMs: this.config.timeoutMs,
});
return response;
} catch (error: any) {
if (error.name === 'AbortError') {
if (error && error.name === 'AbortError') {
this.timeoutCount++;
logger.error('Discord webhook request timed out', {
webhookId: this.config.webhookId,
timeoutMs,
timeoutMs: this.config.timeoutMs ?? 5000,
});
}
throw error;
} finally {
clearTimeout(timeoutId);
}
}

Expand Down
17 changes: 17 additions & 0 deletions listener/src/services/notification-analytics-aggregator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,23 @@ describe('NotificationAnalyticsAggregator', () => {
expect(small.lifetimeCount).toBe(5);
expect(small.size).toBe(2);
});

it('maintains correct counters after eviction (no drift)', () => {
const small = new NotificationAnalyticsAggregator({ maxRecords: 3, now: () => fixedNow });
// push 5 records with mixed outcomes
small.record(buildRecord('success'));
small.record(buildRecord('failure'));
small.record(buildRecord('success'));
small.record(buildRecord('failure'));
small.record(buildRecord('success'));

// only last 3 should be visible: success, failure, success
expect(small.size).toBe(3);
const snap = small.snapshot();
expect(snap.overall.total).toBe(3);
expect(snap.overall.success).toBe(2);
expect(snap.overall.failure).toBe(1);
});
});

describe('outcome classification', () => {
Expand Down
22 changes: 22 additions & 0 deletions listener/src/services/notification-analytics-aggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,28 @@ export class NotificationAnalyticsAggregator {

if (this.records.length > this.maxRecords) {
const evicted = this.records.length - this.maxRecords;
// adjust counters based on evicted records to avoid drift
for (let i = 0; i < evicted; i++) {
const r = this.records[i];
switch (r.outcome) {
case 'success':
this.successCount = Math.max(0, this.successCount - 1);
break;
case 'failure':
this.failureCount = Math.max(0, this.failureCount - 1);
break;
case 'retry':
this.retryCount = Math.max(0, this.retryCount - 1);
break;
case 'skipped':
this.skippedCount = Math.max(0, this.skippedCount - 1);
break;
}
if (r.durationMs > 0) {
this.totalDurationMs = Math.max(0, this.totalDurationMs - r.durationMs);
this.durationSamples = Math.max(0, this.durationSamples - 1);
}
}
this.records.splice(0, evicted);
}
}
Expand Down
29 changes: 29 additions & 0 deletions listener/src/services/webhook-sender.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
export interface WebhookSendOptions {
timeoutMs?: number;
headers?: Record<string, string>;
}

export async function sendWebhook(
url: string,
payload: any,
opts: WebhookSendOptions = {},
): Promise<Response> {
const timeoutMs = opts.timeoutMs ?? 5000;
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);

try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(opts.headers ?? {}),
},
body: JSON.stringify(payload),
signal: controller.signal as any,
});
return response;
} finally {
clearTimeout(timeoutId);
}
}
Loading