Skip to content

Latest commit

 

History

History
145 lines (111 loc) · 5.81 KB

File metadata and controls

145 lines (111 loc) · 5.81 KB

Real-time Notifications (SSE)

The application automatically sends Server-Sent Events (SSE) notifications whenever projections are updated, providing real-time updates to connected clients without polling.

How it Works

  1. Mutation occurs - Control flow starts when a client creates, updates, or deletes an entity.
  2. Event stored - The corresponding domain event (e.g., BookAdded) is appended to the Marten event store.
  3. Projection updates - The Async Daemon processes the event and updates the relevant read model (projection).
  4. Notification sent - MartenCommitListener detects projection changes and triggers notification fan-out.
  5. Causation ID Propagation - The SSE message includes the EventId of the original domain event, which the client uses as the CausationId for subsequent requests, ensuring a complete async trace.
  6. Clients receive - Connected clients receive updates in real-time (in-process or Redis-backed fan-out).

Data Flow

sequenceDiagram
    participant Client
    participant API as API Endpoint
    participant Marten as Event Store
    participant Daemon as Async Daemon
    participant Listener as MartenCommitListener
    participant SSE as SSE Stream
    
    Client->>API: 1. POST /api/admin/books (Create with Client-ID)
    API->>Marten: 2. Append BookAdded event
    Marten-->>API: 3. Event stored
    API-->>Client: 4. HTTP 201 Created
    
    Daemon->>Marten: 5. Poll for new events
    Daemon->>Daemon: 6. Update BookSearchProjection
    Daemon->>Listener: 7. AfterCommitAsync (projection change)
    Listener->>Listener: 8. Invalidate cache tags
    Listener->>SSE: 9. Broadcast BookCreated notification
    SSE-->>Client: 10. Push notification via SSE
Loading

Supported Events

The system currently broadcasts notifications for the following entity lifecycles:

  • Books: BookCreated, BookUpdated, BookDeleted
  • Authors: AuthorCreated, AuthorUpdated, AuthorDeleted
  • Categories: CategoryCreated, CategoryUpdated, CategoryDeleted
  • Publishers: PublisherCreated, PublisherUpdated, PublisherDeleted

Note

Soft Deletion: Soft-delete operations appear as Delete notifications. Restore operations appear as Update notifications (because the IsDeleted flag changes from true to false).

Backend Implementation

The notification logic is centralized in a Marten Document Session Listener. This ensures notifications are reliable and tied directly to the success of the data projection, rather than the API request.

// MartenCommitListener.cs
public class MartenCommitListener : IDocumentSessionListener
{
    public async Task AfterCommitAsync(IDocumentSession _, IChangeSet commit, CancellationToken token)
    {
        // Process all projection changes
        await ProcessDocumentChangesAsync(commit.Inserted, ChangeType.Insert, token);
        await ProcessDocumentChangesAsync(commit.Updated, ChangeType.Update, token);  
        await ProcessDocumentChangesAsync(commit.Deleted, ChangeType.Delete, token);
    }
    
    async Task HandleCategoryChangeAsync(CategoryProjection category, ChangeType changeType)
    {
        // Determine notification type based on change and soft-delete status
        var effectiveChangeType = DetermineEffectiveChangeType(changeType, category.IsDeleted);
        
        // 1. Invalidate cache (HybridCache)
        await InvalidateCacheTagsAsync(category.Id, CacheTags.CategoryItemPrefix, CacheTags.CategoryList);
        
        // 2. Send SSE notification
        IDomainEventNotification notification = effectiveChangeType switch
        {
            ChangeType.Insert => new CategoryCreatedNotification(category.Id),
            ChangeType.Update => new CategoryUpdatedNotification(category.Id),
            ChangeType.Delete => new CategoryDeletedNotification(category.Id)
        };
        await _notificationService.NotifyAsync(notification);
    }
}

Benefits

Benefit Description
Automatic No manual notification code needed in command handlers.
Reliable Notifications only fire if the projection update succeeds.
Efficient A single listener handles all entities generically.
Integrated Cache invalidation and SSE broadcast happen in the same atomic unit of work.

Client Integration

Clients can connect to the /api/notifications/stream endpoint to receive updates.

JavaScript / Browser

// Connect to SSE endpoint
const eventSource = new EventSource('/api/notifications/stream');

eventSource.addEventListener('BookUpdated', (event) => {
    const notification = JSON.parse(event.data);
    console.log(`Book ${notification.entityId} updated`);
    
    // Logic to update UI, e.g., re-fetch book list or update specific item
    refreshBookList();
});

Blazor (C#)

In the shared client layer, BookStoreEventsService leverages .NET SseParser for robust stream processing and is consumed by the Blazor frontend.

// BookStoreEventsService.cs
using System.Net.ServerSentEvents;

// ...
using var response = await _httpClient.GetAsync("/api/notifications/stream", 
    HttpCompletionOption.ResponseHeadersRead, token);
using var stream = await response.Content.ReadAsStreamAsync(token);

await foreach (var item in SseParser.Create(stream).EnumerateAsync(token))
{
    var notification = DeserializeNotification(item.EventType, item.Data);
    // ...
}

Causation Tracking

The client automatically extracts the EventId from the notification and updates its context:

if (notification.EventId != Guid.Empty)
{
    _clientContextService.UpdateCausationId(notification.EventId.ToString());
}

This ensures that any subsequent command sent by the client is logically linked to the event that triggered the UI update, closing the loop in the asynchronous architecture.