[ENH]: Enable rebuilds for sharded collections#6916
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
|
Enable shard-targeted rebuilds for sharded collections across compaction orchestration This PR adds shard-aware rebuild functionality end-to-end in the Rust compaction pipeline. Rebuild requests can now optionally include a shard index via The rebuild execution path now operates on specific shards by selecting This summary was automatically generated by @propel-code-bot |
d0bdf23 to
e98b237
Compare
This comment has been minimized.
This comment has been minimized.
e98b237 to
6506960
Compare
This comment has been minimized.
This comment has been minimized.
| segment_scopes: Vec<String>, | ||
| /// Specify which shard to rebuild (defaults to 0) | ||
| #[arg(long)] | ||
| shard: Option<u32>, |
There was a problem hiding this comment.
would be nice to have the flexibility of specifying different shards for different collections of the batch so Vec<Option<u32>>?
| // Segment scopes to rebuild. If empty, rebuilds all segments (metadata + vector). | ||
| repeated SegmentScope segment_scopes = 2; | ||
| // Optional shard index to rebuild. If not specified, defaults to shard 0. | ||
| optional uint32 shard_index = 3; |
There was a problem hiding this comment.
since this is a batch api, why not have this a repeated field?
6506960 to
398cb8e
Compare
| ) | ||
| } | ||
| let log_task = if self.context.is_rebuild() { | ||
| let shard_count = output |
There was a problem hiding this comment.
nit: there should be a num_shards method in Segment type that you can use here
| ) | ||
| .await; | ||
|
|
||
| if record_segment_reader.is_none() { |
There was a problem hiding this comment.
how was this even correct? for e.g. if a collection is compacted for the first time then this will not fetch the logs and return early?
There was a problem hiding this comment.
from_segment from above always returned a reader so we never hit this
There was a problem hiding this comment.
what you linked is of RecordSegmentWriterShard and not of RecordSegmentReader
There was a problem hiding this comment.
But I found the relevant that returns. It returns a Vec<Option<RecordSegmentReaderShard>> so it will return [None] for one shard and uninitialized reader
There was a problem hiding this comment.
But you should still keep it for e.g. if ok_or_terminate returns None because the reader creation error'd out
| // Prefetch segments | ||
| let prefetch_segments = match self.context.is_rebuild { | ||
| let prefetch_segments = match self.context.is_rebuild() { | ||
| true => vec![output.record_segment], |
There was a problem hiding this comment.
should prefetch the shard that is getting rebuilt here instead of the active shard
| } | ||
|
|
||
| #[async_trait] | ||
| impl Handler<TaskResult<SourceRecordSegmentOutput, SourceRecordSegmentError>> |
There was a problem hiding this comment.
add a todo to clean up V1
|
|
||
| // Create a vector with empty shards for all positions except shard_index | ||
| let mut shards = vec![]; | ||
| for i in 0..self.shard_count { |
There was a problem hiding this comment.
this and the below block of code are exactly the same so can extract into a helper
| @@ -935,4 +925,4 @@ | |||
| collection_info.collection.total_records_post_compaction = output.total_records as u64; | |||
There was a problem hiding this comment.
this would be wrong since total_records is only of that shard
There was a problem hiding this comment.
hmm, this might be a bit hard to compute now
There was a problem hiding this comment.
should we just not compute it
| collection_info.collection.total_records_post_compaction = output.total_records as u64; | ||
|
|
||
| // If no records, terminate early | ||
| if output.partitions.is_empty() { |
There was a problem hiding this comment.
this code needs to change now
66ada8c to
da0abb0
Compare
| ids: id.iter().map(ToString::to_string).collect(), | ||
| }), | ||
| segment_scopes: proto_scopes, | ||
| shard_index: *shard, |
There was a problem hiding this comment.
why deref here? seems unnecessary
| ids: id.iter().map(ToString::to_string).collect(), | ||
| }), | ||
| segment_scopes: proto_scopes, | ||
| shard_index: *shard, |
There was a problem hiding this comment.
also would it be simpler for the server to just have an u32 instead of Option and in the client you set to 0 if None?
There was a problem hiding this comment.
Effectively doing this at the server level. I think it makes sense to different on the client-server layer of whether or not a shard was specified. Say in the future you want a multi-shard rebuild API
| // TODO(tanujnay112): This is awful, we need to find a better way to pass | ||
| // the active collection info around. | ||
| self.collection_info = OnceCell::new(); | ||
| let rebuild_info = if is_getting_compacted_logs { |
There was a problem hiding this comment.
this is a bit weird. Clarifying my understanding - Is it to prevent abstraction violation that the attached functions call does not have to construct a RebuildInfo struct? Because you are using RebuildInfo as Some to implicitly mean "fetch logs from record segment" in LogFetchOrchestrator
There was a problem hiding this comment.
Yeah, for abstraction boundaries. In the attached function backfill path I need to signal that backfilling is happening this way without trying to break the abstraction boundaries you mentioned.
| blockfile_provider: &BlockfileProvider, | ||
| ) { | ||
| // Get offset IDs from vector segment | ||
| // Get offset IDs from vector segment using distributed reader (handles all shards) |
| } | ||
|
|
||
| // Also create RecordSegmentReaderShard for source operators | ||
| let record_segment_shard = match self |
There was a problem hiding this comment.
the construction of this segment shard and its reader should be done in the conditional below in the if self.context.is_rebuild() path. No need to pay this cost in the general non rebuild case
| vec![output.record_segment] | ||
| } else { | ||
| let mut segments = vec![output.metadata_segment, output.record_segment]; | ||
| if vector_segment.r#type == chroma_types::SegmentType::Spann { |
There was a problem hiding this comment.
also prefetch QuantizedSpann here
| let vec = vec![self.new_shard()]; | ||
| return Ok(vec); | ||
| } | ||
| // If there are no file paths, return a single empty shard |
There was a problem hiding this comment.
Segment type also has num_shards() method. should use that here. It already has the validation logic for shard count
| }) | ||
| .collect(); | ||
| MaterializeLogOutput { | ||
| result: PartitionedMaterializeLogsResult { shards }, |
There was a problem hiding this comment.
as a cleanup we should rename this to Sharded* since Partition is an existing concept in the codebase which is different from Sharding
There was a problem hiding this comment.
Will do in a followup change
There was a problem hiding this comment.
sg, added to the burndown doc
| pub fn get_shards(&self) -> Result<Vec<SegmentShard>, SegmentShardError> { | ||
| // If there are no file paths, return empty vector | ||
| let num_shards = self.num_shards()?; | ||
| if self.file_path.is_empty() { |
There was a problem hiding this comment.
this case should already be handled in the SegmentShard::try_from. can you pls double check?
| vec![output.record_segment] | ||
| } else { | ||
| let mut segments = vec![output.metadata_segment, output.record_segment]; | ||
| if vector_segment.r#type != chroma_types::SegmentType::HnswDistributed { |
There was a problem hiding this comment.
this won't work because it will try to prefetch the local hnsw or sqlite segment types
|
These changes need so much care, great job at that |
This PR cherry-picks the commit fdcd216 onto release/2026-04-03. If there are unresolved conflicts, please resolve them manually. Co-authored-by: tanujnay112 <tanujnay112@live.com>

Description of changes
This change adds rebuild capabilities for sharded collections. A rebuild command can now specify a shard index it wants to rebuild. An unspecified one defaults to 0 (first shard). This shard's filepaths are set to empty during rebuild compaction, analogous to all of the segment's filepaths being set to empty before this change. A new struct packaging the rebuild command parameters called
RebuildInfois passed toCompactionContextnow. This contains the desired shard index to be rebuilt.Test plan
Tests have been added in compact.rs. They test rebuilding various shards in the full and partial rebuild scenarios.
pytestfor python,yarn testfor js,cargo testfor rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the _docs section?_