-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsse_connection_manager.h
More file actions
209 lines (183 loc) · 7.04 KB
/
Copy pathsse_connection_manager.h
File metadata and controls
209 lines (183 loc) · 7.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
/**
* @file sse_connection_manager.h
* @brief Canonical Doxygen file header for ThemisDB-generated maturity metadata.
* @version 0.0.47
* @note Maturity: 🟢 PRODUCTION-READY
* @note Score: 86/100
* @note Gap Summary: total=3; TODO=1, Stub=1, Unimpl=0, Mock=1, Sim=0, Debt=0, C=n/a, H=n/a, M=n/a, L=n/a
* @note Status: Production Ready
* @note This block is auto-generated and will be overwritten.
*/
/*
* ThemisDB | File: sse_connection_manager.h | Version: 0.0.47
* Maturity: 🟢 PRODUCTION-READY | Score: 100/100
* Gap Summary: total=3; TODO=1, Stub=1, Unimpl=0, Mock=1, Sim=0, Debt=0, C=n/a, H=n/a, M=n/a, L=n/a
* Status: Production Ready
* (Automatisch generiert, Änderungen werden überschrieben)
*/
#pragma once
#include <memory>
#include <atomic>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>
#include <set>
#include <chrono>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include "cdc/changefeed.h"
namespace themis {
namespace server {
namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
/**
* @brief Manages active SSE connections for Changefeed streaming
*
* Responsibilities:
* - Track active SSE connections with unique IDs
* - Send heartbeat comments to prevent timeout
* - Push new events to subscribed connections
* - Clean up on client disconnect or server shutdown
* - Memory limits per connection (buffered events)
*/
class SseConnectionManager {
public:
/**
* @brief Runtime policy for SSE connection behavior.
*/
struct ConnectionConfig {
uint32_t heartbeat_interval_ms = 15000; // Send heartbeat every 15s
uint32_t max_buffered_events = 1000; // Max events per connection buffer
uint32_t event_poll_interval_ms = 500; // Poll changefeed every 500ms
uint32_t retry_ms = 3000; // SSE client reconnect delay hint
uint32_t max_events_per_second = 0; // 0 = unlimited (server-side rate control)
bool drop_oldest_on_overflow = true; // Backpressure policy: drop oldest if buffer full
};
/**
* @brief Aggregated runtime counters for all managed SSE connections.
*/
struct ConnectionStats {
size_t active_connections = 0;
uint64_t total_events_sent = 0;
uint64_t total_heartbeats_sent = 0;
uint64_t total_disconnects = 0;
uint64_t total_dropped_events = 0;
};
explicit SseConnectionManager(
std::shared_ptr<Changefeed> changefeed,
boost::asio::io_context& ioc
);
/**
* @brief Construct SSE manager with explicit runtime configuration.
* @param changefeed Shared changefeed source used for polling events.
* @param ioc io_context used for timer scheduling.
* @param config Connection and backpressure policy.
*/
explicit SseConnectionManager(
std::shared_ptr<Changefeed> changefeed,
boost::asio::io_context& ioc,
const ConnectionConfig& config
);
~SseConnectionManager();
/**
* @brief Register a new SSE connection
* @param from_seq Starting sequence number for this connection
* @param key_prefix Optional key prefix filter
* @param event_types Optional set of event types to filter (empty = all types)
* @return Unique connection ID
*/
uint64_t registerConnection(
uint64_t from_seq,
const std::string& key_prefix = "",
const std::set<Changefeed::ChangeEventType>& event_types = {}
);
/**
* @brief Unregister connection (called on client disconnect).
* @param conn_id Connection ID
*/
void unregisterConnection(uint64_t conn_id);
/**
* @brief Get pending raw events for a connection (for at-least-once delivery tracking).
*
* Returns up to `max_events` raw `ChangeEvent` objects that have been
* buffered since the last call. This API is intended for handlers that
* need to feed events through a `DeliveryTracker` before formatting them
* as SSE lines; callers should not mix calls to `pollEvents()` and
* `pollRawEvents()` on the same connection.
*
* Rate-limiting (if configured) is applied the same way as in `pollEvents()`.
*
* @param conn_id Connection ID.
* @param max_events Maximum number of raw events to return.
* @return Raw change events in ascending sequence order.
*/
std::vector<Changefeed::ChangeEvent> pollRawEvents(uint64_t conn_id, size_t max_events = 100);
/**
* @brief Get pending events for a connection
* @param conn_id Connection ID
* @param max_events Max events to retrieve
* @return Vector of event JSON strings (formatted as SSE data lines)
*/
std::vector<std::string> pollEvents(uint64_t conn_id, size_t max_events = 100);
/**
* @brief Check if heartbeat is needed for connection
* @param conn_id Connection ID
* @return true if heartbeat should be sent
*/
bool needsHeartbeat(uint64_t conn_id) const;
/**
* @brief Mark heartbeat sent for connection
* @param conn_id Connection ID
*/
void recordHeartbeat(uint64_t conn_id);
/**
* @brief Get current connection manager statistics.
* @return Snapshot of cumulative manager counters.
*/
ConnectionStats getStats() const;
/**
* @brief Shutdown all connections gracefully.
*
* Marks all connections inactive, clears registries, and cancels the
* background poll timer.
*/
void shutdown();
private:
struct Connection {
uint64_t id;
std::atomic<uint64_t> current_sequence{0};
std::string key_prefix;
std::set<Changefeed::ChangeEventType> event_types;
std::chrono::steady_clock::time_point last_activity;
std::chrono::steady_clock::time_point last_heartbeat;
/// Formatted SSE lines ("id: N\ndata: {...}\n\n") — drained by pollEvents().
std::vector<std::string> buffered_events;
/// Raw ChangeEvent objects — drained by pollRawEvents() for at-least-once tracking.
std::vector<Changefeed::ChangeEvent> raw_buffered_events;
std::atomic<bool> active{true};
// Backpressure accounting
uint64_t dropped_events{0};
// Simple rate window (optional)
uint32_t sent_in_window{0};
std::chrono::steady_clock::time_point window_start{std::chrono::steady_clock::now()};
};
void backgroundPollTask();
std::shared_ptr<Changefeed> changefeed_;
boost::asio::io_context& ioc_;
ConnectionConfig config_;
mutable std::shared_mutex connections_mutex_;
std::unordered_map<uint64_t, std::shared_ptr<Connection>> connections_;
std::atomic<uint64_t> next_conn_id_{1};
// Background polling
std::unique_ptr<boost::asio::steady_timer> poll_timer_;
mutable std::mutex poll_timer_mutex_;
std::atomic<bool> running_{false};
// Stats
std::atomic<uint64_t> total_events_sent_{0};
std::atomic<uint64_t> total_heartbeats_sent_{0};
std::atomic<uint64_t> total_disconnects_{0};
std::atomic<uint64_t> total_dropped_events_{0};
};
} // namespace server
} // namespace themis