Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/iceberg/src/arrow/reader/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,13 @@ impl FileScanTaskReader {
}

if self.row_selection_enabled {
row_selection = Some(ArrowReader::get_row_selection_for_filter_predicate(
row_selection = ArrowReader::get_row_selection_for_filter_predicate(
&predicate,
record_batch_stream_builder.metadata(),
&selected_row_group_indices,
&field_id_map,
&task.schema,
)?);
)?;
}
}

Expand Down
104 changes: 104 additions & 0 deletions crates/iceberg/src/arrow/reader/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1725,4 +1725,108 @@ message schema {
.collect();
assert_eq!(ids, vec![2, 3]);
}

/// Test that row_selection_enabled gracefully handles Parquet files that lack
/// column index metadata (common with older/migrated files). The reader should
/// fall back to returning all rows instead of erroring.
///
/// Regression test for <https://github.com/apache/iceberg-rust/issues/2452>.
#[tokio::test]
async fn test_read_parquet_without_column_index_with_row_selection_enabled() {
use arrow_array::Int32Array;

let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap(),
);

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::new_with_fs();

let id_data = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;

let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();

// Write Parquet WITHOUT page index (column index / offset index) by disabling
// statistics at the page level. This simulates older Parquet files.
let props = WriterProperties::builder()
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::None)
.set_compression(Compression::SNAPPY)
.build();

let file_path = format!("{table_location}/no_column_index.parquet");
let file = File::create(&file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();

// Use a predicate that would match some rows
let predicate = Reference::new("id").less_than(Datum::int(3));

let reader = ArrowReaderBuilder::new(file_io, Runtime::current())
.with_row_group_filtering_enabled(true)
.with_row_selection_enabled(true)
.build();

let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
start: 0,
length: 0,
record_count: None,
data_file_path: file_path,
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})]
.into_iter(),
)) as FileScanTaskStream;

// This should NOT error — page-level pruning is skipped when column index
// is absent, but the Arrow row filter still applies the predicate.
let result = reader
.read(tasks)
.unwrap()
.stream()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

// The Arrow row filter (id < 3) should still apply even without page-index
// pruning, returning only the matching rows.
let ids: Vec<i32> = result
.iter()
.flat_map(|batch| {
batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.values()
.iter()
.copied()
})
.collect();
assert_eq!(ids, vec![1, 2]);
}
}
26 changes: 14 additions & 12 deletions crates/iceberg/src/arrow/reader/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::spec::Schema;
use crate::{Error, ErrorKind};

impl ArrowReader {
pub(super) fn get_row_filter(
Expand Down Expand Up @@ -90,32 +89,33 @@ impl ArrowReader {
Ok(results)
}

/// Computes a [`RowSelection`] by evaluating the filter predicate against
/// the Parquet page index (column index + offset index).
///
/// Returns `Ok(None)` when the Parquet file lacks column or offset index
/// metadata (common with older files written before page indexes became
/// standard). In that case page-level pruning is simply skipped; row-group
/// filtering and the Arrow row filter still apply the predicate.
pub(super) fn get_row_selection_for_filter_predicate(
predicate: &BoundPredicate,
parquet_metadata: &Arc<ParquetMetaData>,
selected_row_groups: &Option<Vec<usize>>,
field_id_map: &HashMap<i32, usize>,
snapshot_schema: &Schema,
) -> Result<RowSelection> {
) -> Result<Option<RowSelection>> {
let Some(column_index) = parquet_metadata.column_index() else {
return Err(Error::new(
ErrorKind::Unexpected,
"Parquet file metadata does not contain a column index",
));
return Ok(None);
};

let Some(offset_index) = parquet_metadata.offset_index() else {
return Err(Error::new(
ErrorKind::Unexpected,
"Parquet file metadata does not contain an offset index",
));
return Ok(None);
};

// If all row groups were filtered out, return an empty RowSelection (select no rows)
if let Some(selected_row_groups) = selected_row_groups
&& selected_row_groups.is_empty()
{
return Ok(RowSelection::from(Vec::new()));
return Ok(Some(RowSelection::from(Vec::new())));
}

let mut selected_row_groups_idx = 0;
Expand Down Expand Up @@ -155,7 +155,9 @@ impl ArrowReader {
}
}

Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
Ok(Some(
results.into_iter().flatten().collect::<Vec<_>>().into(),
))
}

/// Filters row groups by byte range to support Iceberg's file splitting.
Expand Down
Loading