From f1a268a0a19f16426e1f25d6ee31144113bee7cd Mon Sep 17 00:00:00 2001 From: JD Williams Date: Mon, 18 May 2026 07:29:55 -0400 Subject: [PATCH] fix(reader): gracefully handle missing Parquet column index in row selection When row_selection_enabled is true and the Parquet file lacks column or offset index metadata (common with older/migrated files), the reader now skips page-level row pruning instead of returning an error. Row-group filtering via statistics and the ArrowPredicate row filter still function normally; only page-index-based RowSelection is skipped. Closes #2452 --- crates/iceberg/src/arrow/reader/pipeline.rs | 4 +- crates/iceberg/src/arrow/reader/projection.rs | 104 ++++++++++++++++++ crates/iceberg/src/arrow/reader/row_filter.rs | 26 +++-- 3 files changed, 120 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index edc0a3fa96..6f15e69fd7 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -337,13 +337,13 @@ impl FileScanTaskReader { } if self.row_selection_enabled { - row_selection = Some(ArrowReader::get_row_selection_for_filter_predicate( + row_selection = ArrowReader::get_row_selection_for_filter_predicate( &predicate, record_batch_stream_builder.metadata(), &selected_row_group_indices, &field_id_map, &task.schema, - )?); + )?; } } diff --git a/crates/iceberg/src/arrow/reader/projection.rs b/crates/iceberg/src/arrow/reader/projection.rs index 2589c78366..9e8366c964 100644 --- a/crates/iceberg/src/arrow/reader/projection.rs +++ b/crates/iceberg/src/arrow/reader/projection.rs @@ -1725,4 +1725,108 @@ message schema { .collect(); assert_eq!(ids, vec![2, 3]); } + + /// Test that row_selection_enabled gracefully handles Parquet files that lack + /// column index metadata (common with older/migrated files). The reader should + /// fall back to returning all rows instead of erroring. + /// + /// Regression test for . + #[tokio::test] + async fn test_read_parquet_without_column_index_with_row_selection_enabled() { + use arrow_array::Int32Array; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::new_with_fs(); + + let id_data = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap(); + + // Write Parquet WITHOUT page index (column index / offset index) by disabling + // statistics at the page level. This simulates older Parquet files. + let props = WriterProperties::builder() + .set_statistics_enabled(parquet::file::properties::EnabledStatistics::None) + .set_compression(Compression::SNAPPY) + .build(); + + let file_path = format!("{table_location}/no_column_index.parquet"); + let file = File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Use a predicate that would match some rows + let predicate = Reference::new("id").less_than(Datum::int(3)); + + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()) + .with_row_group_filtering_enabled(true) + .with_row_selection_enabled(true) + .build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(), + start: 0, + length: 0, + record_count: None, + data_file_path: file_path, + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: Some(predicate.bind(schema, true).unwrap()), + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + })] + .into_iter(), + )) as FileScanTaskStream; + + // This should NOT error — page-level pruning is skipped when column index + // is absent, but the Arrow row filter still applies the predicate. + let result = reader + .read(tasks) + .unwrap() + .stream() + .try_collect::>() + .await + .unwrap(); + + // The Arrow row filter (id < 3) should still apply even without page-index + // pruning, returning only the matching rows. + let ids: Vec = result + .iter() + .flat_map(|batch| { + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + assert_eq!(ids, vec![1, 2]); + } } diff --git a/crates/iceberg/src/arrow/reader/row_filter.rs b/crates/iceberg/src/arrow/reader/row_filter.rs index bc5a05e473..e6e5ce97da 100644 --- a/crates/iceberg/src/arrow/reader/row_filter.rs +++ b/crates/iceberg/src/arrow/reader/row_filter.rs @@ -35,7 +35,6 @@ use crate::expr::visitors::bound_predicate_visitor::visit; use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::spec::Schema; -use crate::{Error, ErrorKind}; impl ArrowReader { pub(super) fn get_row_filter( @@ -90,32 +89,33 @@ impl ArrowReader { Ok(results) } + /// Computes a [`RowSelection`] by evaluating the filter predicate against + /// the Parquet page index (column index + offset index). + /// + /// Returns `Ok(None)` when the Parquet file lacks column or offset index + /// metadata (common with older files written before page indexes became + /// standard). In that case page-level pruning is simply skipped; row-group + /// filtering and the Arrow row filter still apply the predicate. pub(super) fn get_row_selection_for_filter_predicate( predicate: &BoundPredicate, parquet_metadata: &Arc, selected_row_groups: &Option>, field_id_map: &HashMap, snapshot_schema: &Schema, - ) -> Result { + ) -> Result> { let Some(column_index) = parquet_metadata.column_index() else { - return Err(Error::new( - ErrorKind::Unexpected, - "Parquet file metadata does not contain a column index", - )); + return Ok(None); }; let Some(offset_index) = parquet_metadata.offset_index() else { - return Err(Error::new( - ErrorKind::Unexpected, - "Parquet file metadata does not contain an offset index", - )); + return Ok(None); }; // If all row groups were filtered out, return an empty RowSelection (select no rows) if let Some(selected_row_groups) = selected_row_groups && selected_row_groups.is_empty() { - return Ok(RowSelection::from(Vec::new())); + return Ok(Some(RowSelection::from(Vec::new()))); } let mut selected_row_groups_idx = 0; @@ -155,7 +155,9 @@ impl ArrowReader { } } - Ok(results.into_iter().flatten().collect::>().into()) + Ok(Some( + results.into_iter().flatten().collect::>().into(), + )) } /// Filters row groups by byte range to support Iceberg's file splitting.