Skip to content

Commit 5ab2b72

Browse files
Ubuntuclaude
andcommitted
feat: FR-011 streaming + FR-012 LLM bridge + FR-013 vector store + FR-014 alerting
FR-011: Streaming Integrations (Kafka, Redis Streams, NATS) - StreamConsumer trait: connect, subscribe, poll, commit, disconnect - StreamMessage: offset, key, payload, headers, source - InMemoryConsumer: full test implementation for bridge testing - KafkaConfig, NatsConfig, RedisStreamConfig: provider configurations - Backpressure: max_buffer_size, CommitStrategy (AfterProcess/AfterPoll/Manual) - 9 tests FR-012: LLM Provider Bridge (OpenAI, Anthropic, Local) - LlmProvider trait: complete, embed, is_available, metrics - CompletionRequest/Response: messages, tools, temperature, streaming - ChatMessage: user, assistant, tool_response roles - ToolDefinition + ToolCall: function calling support - EmbeddingRequest/Response: batch embedding generation - TokenUsage with cost estimation (per-provider pricing) - EchoProvider: test implementation - 7 tests FR-013: Vector Store / Embedding Index - VectorStore: brute-force exact nearest-neighbor search - Distance metrics: Cosine, L2, DotProduct - VectorEntry: id + vector + metadata - flat_vectors(): contiguous f32 layout for GPU upload - Insert, search (top-K), delete operations - GPU-friendly memory layout (contiguous, cache-aligned) - 10 tests FR-014: Advanced Alerting (extends existing 900-line module) - AlertTrigger: threshold-based metric → alert firing - ThresholdOperator: >, >=, <, <=, == - Hold duration: condition must persist before firing - Cooldown: suppress repeated alerts within window - AlertRoutingRule: route by severity + message content to specific sinks - 4 tests 1,546 workspace tests pass, 0 failures. 14/20 feature requests now implemented. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e9fb8db commit 5ab2b72

6 files changed

Lines changed: 1608 additions & 0 deletions

File tree

crates/ringkernel-core/src/alerting.rs

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -898,3 +898,253 @@ mod tests {
898898
sink.send(&warning).await.unwrap();
899899
}
900900
}
901+
902+
// ============================================================================
903+
// FR-014: Threshold-Based Alert Triggers
904+
// ============================================================================
905+
906+
/// A threshold-based alert trigger that fires when a metric crosses a threshold.
907+
pub struct AlertTrigger {
908+
/// Trigger name.
909+
pub name: String,
910+
/// Metric name to watch.
911+
pub metric: String,
912+
/// Threshold value.
913+
pub threshold: f64,
914+
/// Comparison operator.
915+
pub operator: ThresholdOperator,
916+
/// Severity when triggered.
917+
pub severity: AlertSeverity,
918+
/// Minimum duration the condition must hold before triggering.
919+
pub hold_duration: Duration,
920+
/// Cooldown after firing (suppress repeated alerts).
921+
pub cooldown: Duration,
922+
/// Last triggered timestamp.
923+
last_triggered: Option<Instant>,
924+
/// When the condition first became true.
925+
condition_start: Option<Instant>,
926+
}
927+
928+
/// Comparison operator for threshold triggers.
929+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
930+
pub enum ThresholdOperator {
931+
/// Fire when value > threshold.
932+
GreaterThan,
933+
/// Fire when value >= threshold.
934+
GreaterThanOrEqual,
935+
/// Fire when value < threshold.
936+
LessThan,
937+
/// Fire when value <= threshold.
938+
LessThanOrEqual,
939+
/// Fire when value == threshold.
940+
Equal,
941+
}
942+
943+
impl AlertTrigger {
944+
/// Create a new alert trigger.
945+
pub fn new(
946+
name: impl Into<String>,
947+
metric: impl Into<String>,
948+
operator: ThresholdOperator,
949+
threshold: f64,
950+
severity: AlertSeverity,
951+
) -> Self {
952+
Self {
953+
name: name.into(),
954+
metric: metric.into(),
955+
threshold,
956+
operator,
957+
severity,
958+
hold_duration: Duration::from_secs(0),
959+
cooldown: Duration::from_secs(60),
960+
last_triggered: None,
961+
condition_start: None,
962+
}
963+
}
964+
965+
/// Set the hold duration (condition must persist this long before triggering).
966+
pub fn with_hold_duration(mut self, duration: Duration) -> Self {
967+
self.hold_duration = duration;
968+
self
969+
}
970+
971+
/// Set the cooldown period after triggering.
972+
pub fn with_cooldown(mut self, duration: Duration) -> Self {
973+
self.cooldown = duration;
974+
self
975+
}
976+
977+
/// Evaluate the trigger against a metric value.
978+
///
979+
/// Returns Some(Alert) if the trigger should fire, None otherwise.
980+
pub fn evaluate(&mut self, value: f64) -> Option<Alert> {
981+
let condition_met = match self.operator {
982+
ThresholdOperator::GreaterThan => value > self.threshold,
983+
ThresholdOperator::GreaterThanOrEqual => value >= self.threshold,
984+
ThresholdOperator::LessThan => value < self.threshold,
985+
ThresholdOperator::LessThanOrEqual => value <= self.threshold,
986+
ThresholdOperator::Equal => (value - self.threshold).abs() < f64::EPSILON,
987+
};
988+
989+
if !condition_met {
990+
self.condition_start = None;
991+
return None;
992+
}
993+
994+
// Track when condition first became true
995+
let now = Instant::now();
996+
if self.condition_start.is_none() {
997+
self.condition_start = Some(now);
998+
}
999+
1000+
// Check hold duration
1001+
if let Some(start) = self.condition_start {
1002+
if now.duration_since(start) < self.hold_duration {
1003+
return None; // Condition hasn't held long enough
1004+
}
1005+
}
1006+
1007+
// Check cooldown
1008+
if let Some(last) = self.last_triggered {
1009+
if now.duration_since(last) < self.cooldown {
1010+
return None; // Still in cooldown
1011+
}
1012+
}
1013+
1014+
// Fire!
1015+
self.last_triggered = Some(now);
1016+
self.condition_start = None;
1017+
1018+
Some(Alert::new(
1019+
self.severity,
1020+
format!(
1021+
"[{}] {} {} {:.2} (threshold: {:.2})",
1022+
self.name, self.metric, operator_symbol(self.operator), value, self.threshold
1023+
),
1024+
))
1025+
}
1026+
}
1027+
1028+
fn operator_symbol(op: ThresholdOperator) -> &'static str {
1029+
match op {
1030+
ThresholdOperator::GreaterThan => ">",
1031+
ThresholdOperator::GreaterThanOrEqual => ">=",
1032+
ThresholdOperator::LessThan => "<",
1033+
ThresholdOperator::LessThanOrEqual => "<=",
1034+
ThresholdOperator::Equal => "==",
1035+
}
1036+
}
1037+
1038+
/// Alert routing rule: route alerts to specific sinks based on criteria.
1039+
#[derive(Debug, Clone)]
1040+
pub struct AlertRoutingRule {
1041+
/// Rule name.
1042+
pub name: String,
1043+
/// Match alerts with this severity or higher.
1044+
pub min_severity: AlertSeverity,
1045+
/// Match alerts containing this text (empty = match all).
1046+
pub message_contains: String,
1047+
/// Sink indices to route to.
1048+
pub sink_indices: Vec<usize>,
1049+
}
1050+
1051+
impl AlertRoutingRule {
1052+
/// Create a new routing rule.
1053+
pub fn new(name: impl Into<String>, min_severity: AlertSeverity) -> Self {
1054+
Self {
1055+
name: name.into(),
1056+
min_severity,
1057+
message_contains: String::new(),
1058+
sink_indices: Vec::new(),
1059+
}
1060+
}
1061+
1062+
/// Match alerts containing specific text.
1063+
pub fn with_message_filter(mut self, contains: impl Into<String>) -> Self {
1064+
self.message_contains = contains.into();
1065+
self
1066+
}
1067+
1068+
/// Route to specific sink indices.
1069+
pub fn route_to(mut self, indices: Vec<usize>) -> Self {
1070+
self.sink_indices = indices;
1071+
self
1072+
}
1073+
1074+
/// Check if an alert matches this rule.
1075+
pub fn matches(&self, alert: &Alert) -> bool {
1076+
if (alert.severity as u32) < (self.min_severity as u32) {
1077+
return false;
1078+
}
1079+
if !self.message_contains.is_empty() && !alert.title.contains(&self.message_contains) {
1080+
return false;
1081+
}
1082+
true
1083+
}
1084+
}
1085+
1086+
#[cfg(test)]
1087+
mod trigger_tests {
1088+
use super::*;
1089+
1090+
#[test]
1091+
fn test_threshold_trigger_fires() {
1092+
let mut trigger = AlertTrigger::new(
1093+
"high_latency", "p99_latency_ms", ThresholdOperator::GreaterThan, 100.0,
1094+
AlertSeverity::Warning,
1095+
).with_cooldown(Duration::from_millis(0)); // No cooldown for testing
1096+
1097+
// Below threshold: no alert
1098+
assert!(trigger.evaluate(50.0).is_none());
1099+
1100+
// Above threshold: fires
1101+
let alert = trigger.evaluate(150.0);
1102+
assert!(alert.is_some());
1103+
assert_eq!(alert.unwrap().severity, AlertSeverity::Warning);
1104+
}
1105+
1106+
#[test]
1107+
fn test_threshold_trigger_cooldown() {
1108+
let mut trigger = AlertTrigger::new(
1109+
"test", "metric", ThresholdOperator::GreaterThan, 10.0,
1110+
AlertSeverity::Critical,
1111+
).with_cooldown(Duration::from_secs(60));
1112+
1113+
// First fire: OK
1114+
assert!(trigger.evaluate(20.0).is_some());
1115+
1116+
// Second fire: suppressed by cooldown
1117+
assert!(trigger.evaluate(20.0).is_none());
1118+
}
1119+
1120+
#[test]
1121+
fn test_routing_rule_matches() {
1122+
let rule = AlertRoutingRule::new("oncall", AlertSeverity::Critical)
1123+
.with_message_filter("OOM")
1124+
.route_to(vec![0]);
1125+
1126+
let critical_oom = Alert::new(AlertSeverity::Critical, "GPU OOM detected");
1127+
assert!(rule.matches(&critical_oom));
1128+
1129+
let warning = Alert::new(AlertSeverity::Warning, "GPU OOM warning");
1130+
assert!(!rule.matches(&warning)); // Below severity
1131+
1132+
let critical_other = Alert::new(AlertSeverity::Critical, "High latency");
1133+
assert!(!rule.matches(&critical_other)); // Doesn't contain "OOM"
1134+
}
1135+
1136+
#[test]
1137+
fn test_all_operators() {
1138+
let test = |op, val, thresh| {
1139+
let mut t = AlertTrigger::new("t", "m", op, thresh, AlertSeverity::Info)
1140+
.with_cooldown(Duration::from_millis(0));
1141+
t.evaluate(val).is_some()
1142+
};
1143+
1144+
assert!(test(ThresholdOperator::GreaterThan, 10.0, 5.0));
1145+
assert!(!test(ThresholdOperator::GreaterThan, 5.0, 10.0));
1146+
assert!(test(ThresholdOperator::LessThan, 5.0, 10.0));
1147+
assert!(test(ThresholdOperator::GreaterThanOrEqual, 10.0, 10.0));
1148+
assert!(test(ThresholdOperator::LessThanOrEqual, 10.0, 10.0));
1149+
}
1150+
}

crates/ringkernel-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ pub mod state;
8484
pub mod telemetry;
8585
pub mod telemetry_pipeline;
8686
pub mod types;
87+
pub mod vector;
8788

8889
// Enterprise modules
8990
pub mod alerting;

0 commit comments

Comments
 (0)