Skip to content

Commit b0e7910

Browse files
caikpigossoclaude
andcommitted
fix(replication): replicate vectors via ha_manager in Raft HA mode
Vector inserts only checked state.master_node (always None in Raft mode) for replication. Collections replicated because create_collection checked both state.master_node AND ha_manager.master_node(). Now vector insert handlers (upsert_points, insert_text) use the same pattern. Also adds 2 tests: - test_ha_manager_master_node_accessible_for_replication - test_vector_replication_operation_construction Total HA tests: 20 (all passing) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 31e0c88 commit b0e7910

6 files changed

Lines changed: 131 additions & 7 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "vectorizer"
3-
version = "2.5.14"
3+
version = "2.5.15"
44
edition = "2024"
55
authors = ["HiveLLM Contributors"]
66
description = "High-performance, in-memory vector database written in Rust"

helm/vectorizer/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: vectorizer
33
description: A Helm chart for Vectorizer - High-performance vector database
44
type: application
55
version: 1.5.0
6-
appVersion: "2.5.14"
6+
appVersion: "2.5.15"
77
keywords:
88
- vector-database
99
- semantic-search

k8s/ermes-deploy/statefulset.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ spec:
3939
mountPath: /active-config
4040
containers:
4141
- name: vectorizer
42-
image: ghcr.io/hivellm/vectorizer:2.5.14
42+
image: ghcr.io/hivellm/vectorizer:2.5.15
4343
imagePullPolicy: Always
4444
ports:
4545
- name: rest

src/server/qdrant_vector_handlers.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,18 @@ pub async fn upsert_points(
181181
// Fire-and-forget: Return response immediately and process in background
182182
// This improves response time for large batches
183183
let store_clone = state.store.clone();
184-
let master_node = state.master_node.clone();
184+
// Check both static master_node and Raft-managed HaManager master.
185+
// In Raft HA mode, state.master_node is always None — the active
186+
// MasterNode lives in ha_manager.master_node().
187+
let active_master: Option<std::sync::Arc<crate::replication::MasterNode>> = state
188+
.master_node
189+
.clone()
190+
.or_else(|| state.ha_manager.as_ref().and_then(|ha| ha.master_node()));
185191
let collection_name_for_bg = collection_name.clone();
186192
let points_count_for_bg = points_count;
187193

188194
// Clone vector data for replication before moving into spawn_blocking
189-
let repl_vectors: Vec<(String, Vec<f32>, Option<Vec<u8>>)> = if master_node.is_some() {
195+
let repl_vectors: Vec<(String, Vec<f32>, Option<Vec<u8>>)> = if active_master.is_some() {
190196
vectors
191197
.iter()
192198
.map(|v| {
@@ -224,7 +230,7 @@ pub async fn upsert_points(
224230
);
225231

226232
// Replicate to replicas if master mode is active
227-
if let Some(ref master) = master_node {
233+
if let Some(ref master) = active_master {
228234
for (id, data, payload) in &repl_vectors {
229235
let op = crate::replication::VectorOperation::InsertVector {
230236
collection: repl_collection.clone(),

src/server/rest_handlers.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,6 +1410,35 @@ pub async fn insert_text(
14101410
collection_name
14111411
);
14121412

1413+
// Replicate vector insertions to followers (Raft HA mode)
1414+
let active_master: Option<std::sync::Arc<crate::replication::MasterNode>> = state
1415+
.master_node
1416+
.clone()
1417+
.or_else(|| state.ha_manager.as_ref().and_then(|ha| ha.master_node()));
1418+
if let Some(ref master) = active_master {
1419+
// Re-read the inserted vectors for replication
1420+
if let Ok(col) = state.store.get_collection(collection_name) {
1421+
for vid in &vector_ids {
1422+
if let Ok(v) = col.get_vector(vid) {
1423+
let payload_bytes = v.payload.as_ref().and_then(|p| serde_json::to_vec(p).ok());
1424+
let op = crate::replication::VectorOperation::InsertVector {
1425+
collection: collection_name.to_string(),
1426+
id: vid.clone(),
1427+
vector: v.data.clone(),
1428+
payload: payload_bytes,
1429+
owner_id: None,
1430+
};
1431+
master.replicate(op);
1432+
}
1433+
}
1434+
debug!(
1435+
"Replicated {} vectors for collection '{}'",
1436+
vector_ids.len(),
1437+
collection_name
1438+
);
1439+
}
1440+
}
1441+
14131442
info!(
14141443
"Successfully inserted {} vector(s) into collection '{}'",
14151444
vectors_created, collection_name

tests/integration/cluster_ha.rs

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,10 @@ async fn test_raft_trigger_elect_does_not_panic() {
684684
// Give time for re-election to settle
685685
tokio::time::sleep(Duration::from_secs(2)).await;
686686
// Single-node should eventually become leader again
687-
assert!(mgr.is_leader().await, "Should re-elect as leader after trigger");
687+
assert!(
688+
mgr.is_leader().await,
689+
"Should re-elect as leader after trigger"
690+
);
688691
}
689692

690693
// ---------------------------------------------------------------------------
@@ -734,3 +737,89 @@ async fn test_raft_bootstrap_with_unreachable_peers_does_not_block() {
734737
elapsed
735738
);
736739
}
740+
741+
// ---------------------------------------------------------------------------
742+
// Test 19: Vector replication uses ha_manager.master_node() in Raft mode
743+
// ---------------------------------------------------------------------------
744+
745+
/// Verifies that the HaManager's master_node() is accessible and returns
746+
/// None when the node is not a leader. This tests the code path that
747+
/// the vector insert handler uses to find the active master for replication.
748+
///
749+
/// The bug was: upsert_points() only checked state.master_node (always None
750+
/// in Raft mode) and never checked ha_manager.master_node(). Collections
751+
/// replicated but vectors didn't.
752+
#[tokio::test]
753+
async fn test_ha_manager_master_node_accessible_for_replication() {
754+
let store = Arc::new(VectorStore::new());
755+
let repl_config = ReplicationConfig::default();
756+
757+
let ha = HaManager::new(1, store.clone(), repl_config);
758+
759+
// Before becoming leader, master_node() should be None
760+
assert!(
761+
ha.master_node().is_none(),
762+
"master_node should be None before becoming leader"
763+
);
764+
765+
// Become leader — MasterNode is created
766+
ha.on_become_leader().await;
767+
768+
// After becoming leader, master_node() should return Some
769+
// (the MasterNode may fail to bind port 7001 in test env, but
770+
// the Arc should still be created)
771+
// Note: in test we can't guarantee bind succeeds, but the
772+
// ha_manager should have attempted to create it
773+
let _master = ha.master_node(); // Should not panic
774+
775+
// Become follower — master_node() should be None again
776+
ha.on_become_follower(None).await;
777+
assert!(
778+
ha.master_node().is_none(),
779+
"master_node should be None after becoming follower"
780+
);
781+
}
782+
783+
// ---------------------------------------------------------------------------
784+
// Test 20: Replication operation can be created for vector insert
785+
// ---------------------------------------------------------------------------
786+
787+
/// Verifies that VectorOperation::InsertVector can be constructed
788+
/// with the expected fields. This is the operation that gets replicated
789+
/// from leader to followers.
790+
#[tokio::test]
791+
async fn test_vector_replication_operation_construction() {
792+
use vectorizer::replication::VectorOperation;
793+
794+
let op = VectorOperation::InsertVector {
795+
collection: "test-collection".to_string(),
796+
id: "vec-1".to_string(),
797+
vector: vec![0.1, 0.2, 0.3, 0.4],
798+
payload: Some(b"{\"key\":\"value\"}".to_vec()),
799+
owner_id: None,
800+
};
801+
802+
// Verify the operation can be serialized (needed for replication log)
803+
let serialized = vectorizer::codec::serialize(&op);
804+
assert!(serialized.is_ok(), "VectorOperation should serialize");
805+
806+
let deserialized: Result<VectorOperation, _> =
807+
vectorizer::codec::deserialize(&serialized.unwrap());
808+
assert!(deserialized.is_ok(), "VectorOperation should deserialize");
809+
810+
match deserialized.unwrap() {
811+
VectorOperation::InsertVector {
812+
collection,
813+
id,
814+
vector,
815+
payload,
816+
..
817+
} => {
818+
assert_eq!(collection, "test-collection");
819+
assert_eq!(id, "vec-1");
820+
assert_eq!(vector, vec![0.1, 0.2, 0.3, 0.4]);
821+
assert!(payload.is_some());
822+
}
823+
_ => panic!("Expected InsertVector"),
824+
}
825+
}

0 commit comments

Comments
 (0)