From c91667466b4b72a522b3dfc57c578b761f6218d1 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Mon, 18 May 2026 19:12:41 +0800 Subject: [PATCH 1/2] feat: support history metadata table. --- crates/iceberg/src/inspect/history.rs | 170 ++++++++++++++++++ crates/iceberg/src/inspect/metadata_table.rs | 24 ++- crates/iceberg/src/inspect/mod.rs | 2 + .../datafusion/src/table/metadata_table.rs | 2 + .../tests/integration_datafusion_test.rs | 32 ++++ 5 files changed, 229 insertions(+), 1 deletion(-) create mode 100644 crates/iceberg/src/inspect/history.rs diff --git a/crates/iceberg/src/inspect/history.rs b/crates/iceberg/src/inspect/history.rs new file mode 100644 index 0000000000..0e22726dd6 --- /dev/null +++ b/crates/iceberg/src/inspect/history.rs @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_array::builder::{BooleanBuilder, PrimitiveBuilder}; +use arrow_array::types::{Int64Type, TimestampMicrosecondType}; +use futures::{StreamExt, stream}; + +use crate::Result; +use crate::arrow::schema_to_arrow_schema; +use crate::scan::ArrowRecordBatchStream; +use crate::spec::{NestedField, PrimitiveType, Type}; +use crate::table::Table; +use crate::util::snapshot::ancestors_of; + +/// History table. +pub struct HistoryTable<'a> { + table: &'a Table, +} + +impl<'a> HistoryTable<'a> { + /// Create a new History table instance. + pub fn new(table: &'a Table) -> Self { + Self { table } + } + + /// Returns the iceberg schema of the history table. + pub fn schema(&self) -> crate::spec::Schema { + let fields = vec![ + NestedField::required( + 1, + "made_current_at", + Type::Primitive(PrimitiveType::Timestamptz), + ), + NestedField::required(2, "snapshot_id", Type::Primitive(PrimitiveType::Long)), + NestedField::optional(3, "parent_id", Type::Primitive(PrimitiveType::Long)), + NestedField::required( + 4, + "is_current_ancestor", + Type::Primitive(PrimitiveType::Boolean), + ), + ]; + crate::spec::Schema::builder() + .with_fields(fields.into_iter().map(|f| f.into())) + .build() + .unwrap() + } + + /// Scans the history table. + pub async fn scan(&self) -> Result { + let schema = schema_to_arrow_schema(&self.schema())?; + + let mut made_current_at = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut snapshot_id = PrimitiveBuilder::::new(); + let mut parent_id = PrimitiveBuilder::::new(); + let mut is_current_ancestor = BooleanBuilder::new(); + + let table_metadata = self.table.metadata_ref(); + let current_ancestor_ids = self + .table + .metadata() + .current_snapshot() + .map(|snapshot| { + ancestors_of(&table_metadata, snapshot.snapshot_id()) + .map(|snapshot| snapshot.snapshot_id()) + .collect::>() + }) + .unwrap_or_default(); + + for history_entry in self.table.metadata().history() { + made_current_at.append_value(history_entry.timestamp_ms() * 1000); + snapshot_id.append_value(history_entry.snapshot_id); + parent_id.append_option( + self.table + .metadata() + .snapshot_by_id(history_entry.snapshot_id) + .and_then(|snapshot| snapshot.parent_snapshot_id()), + ); + is_current_ancestor + .append_value(current_ancestor_ids.contains(&history_entry.snapshot_id)); + } + + let batch = RecordBatch::try_new(Arc::new(schema), vec![ + Arc::new(made_current_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(is_current_ancestor.finish()), + ])?; + + Ok(stream::iter(vec![Ok(batch)]).boxed()) + } +} + +#[cfg(test)] +mod tests { + use expect_test::expect; + use futures::TryStreamExt; + + use crate::scan::tests::TableTestFixture; + use crate::test_utils::check_record_batches; + + #[tokio::test] + async fn test_history_table() { + let table = TableTestFixture::new_with_deep_history().table; + + let batch_stream = table.inspect().history().scan().await.unwrap(); + + check_record_batches( + batch_stream.try_collect::>().await.unwrap(), + expect![[r#" + Field { "made_current_at": Timestamp(µs, "+00:00"), metadata: {"PARQUET:field_id": "1"} }, + Field { "snapshot_id": Int64, metadata: {"PARQUET:field_id": "2"} }, + Field { "parent_id": nullable Int64, metadata: {"PARQUET:field_id": "3"} }, + Field { "is_current_ancestor": Boolean, metadata: {"PARQUET:field_id": "4"} }"#]], + expect![[r#" + made_current_at: PrimitiveArray + [ + 2018-01-04T21:22:35.770+00:00, + 2019-04-12T20:29:15.770+00:00, + 2019-11-30T08:02:35.770+00:00, + 2020-07-18T19:35:55.770+00:00, + 2020-10-14T01:22:53.590+00:00, + ], + snapshot_id: PrimitiveArray + [ + 3051729675574597004, + 3055729675574597004, + 3056729675574597004, + 3057729675574597004, + 3059729675574597004, + ], + parent_id: PrimitiveArray + [ + null, + 3051729675574597004, + 3055729675574597004, + 3056729675574597004, + 3057729675574597004, + ], + is_current_ancestor: BooleanArray + [ + true, + true, + true, + true, + true, + ]"#]], + &[], + Some("made_current_at"), + ); + } +} diff --git a/crates/iceberg/src/inspect/metadata_table.rs b/crates/iceberg/src/inspect/metadata_table.rs index d5e9d60869..675907810d 100644 --- a/crates/iceberg/src/inspect/metadata_table.rs +++ b/crates/iceberg/src/inspect/metadata_table.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use super::{ManifestsTable, SnapshotsTable}; +use super::{HistoryTable, ManifestsTable, SnapshotsTable}; use crate::table::Table; /// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table. @@ -34,6 +34,8 @@ pub enum MetadataTableType { Snapshots, /// [`ManifestsTable`] Manifests, + /// [`HistoryTable`] + History, } impl MetadataTableType { @@ -42,6 +44,7 @@ impl MetadataTableType { match self { MetadataTableType::Snapshots => "snapshots", MetadataTableType::Manifests => "manifests", + MetadataTableType::History => "history", } } @@ -59,6 +62,7 @@ impl TryFrom<&str> for MetadataTableType { match value { "snapshots" => Ok(Self::Snapshots), "manifests" => Ok(Self::Manifests), + "history" => Ok(Self::History), _ => Err(format!("invalid metadata table type: {value}")), } } @@ -79,4 +83,22 @@ impl<'a> MetadataTable<'a> { pub fn manifests(&self) -> ManifestsTable<'_> { ManifestsTable::new(self.0) } + + /// Get the history table. + pub fn history(&self) -> HistoryTable<'_> { + HistoryTable::new(self.0) + } +} + +#[cfg(test)] +mod tests { + use super::MetadataTableType; + + #[test] + fn test_metadata_table_type_supports_history() { + assert!(matches!( + MetadataTableType::try_from("history"), + Ok(MetadataTableType::History) + )); + } } diff --git a/crates/iceberg/src/inspect/mod.rs b/crates/iceberg/src/inspect/mod.rs index b64420ea11..540715c097 100644 --- a/crates/iceberg/src/inspect/mod.rs +++ b/crates/iceberg/src/inspect/mod.rs @@ -17,10 +17,12 @@ //! Metadata table APIs. +mod history; mod manifests; mod metadata_table; mod snapshots; +pub use history::HistoryTable; pub use manifests::ManifestsTable; pub use metadata_table::*; pub use snapshots::SnapshotsTable; diff --git a/crates/integrations/datafusion/src/table/metadata_table.rs b/crates/integrations/datafusion/src/table/metadata_table.rs index 38148b4084..1ba3cc50e0 100644 --- a/crates/integrations/datafusion/src/table/metadata_table.rs +++ b/crates/integrations/datafusion/src/table/metadata_table.rs @@ -54,6 +54,7 @@ impl TableProvider for IcebergMetadataTableProvider { let schema = match self.r#type { MetadataTableType::Snapshots => metadata_table.snapshots().schema(), MetadataTableType::Manifests => metadata_table.manifests().schema(), + MetadataTableType::History => metadata_table.history().schema(), }; schema_to_arrow_schema(&schema).unwrap().into() } @@ -79,6 +80,7 @@ impl IcebergMetadataTableProvider { let stream = match self.r#type { MetadataTableType::Snapshots => metadata_table.snapshots().scan().await, MetadataTableType::Manifests => metadata_table.manifests().scan().await, + MetadataTableType::History => metadata_table.history().scan().await, } .map_err(to_datafusion_error)?; let stream = stream.map_err(to_datafusion_error); diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index cebac75dd9..5a7567448c 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -173,6 +173,7 @@ async fn test_provider_list_table_names() -> Result<()> { "my_table", "my_table$snapshots", "my_table$manifests", + "my_table$history", ] "#]] .assert_debug_eq(&result); @@ -441,6 +442,37 @@ async fn test_metadata_table() -> Result<()> { None, ); + let history = ctx + .sql("select * from catalog.ns.t1$history") + .await + .unwrap() + .collect() + .await + .unwrap(); + check_record_batches( + history, + expect![[r#" + Field { "made_current_at": Timestamp(µs, "+00:00"), metadata: {"PARQUET:field_id": "1"} }, + Field { "snapshot_id": Int64, metadata: {"PARQUET:field_id": "2"} }, + Field { "parent_id": nullable Int64, metadata: {"PARQUET:field_id": "3"} }, + Field { "is_current_ancestor": Boolean, metadata: {"PARQUET:field_id": "4"} }"#]], + expect![[r#" + made_current_at: PrimitiveArray + [ + ], + snapshot_id: PrimitiveArray + [ + ], + parent_id: PrimitiveArray + [ + ], + is_current_ancestor: BooleanArray + [ + ]"#]], + &[], + None, + ); + Ok(()) } From f1b8c8665b61fac9f2a3670b7e0d30283079412e Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Mon, 18 May 2026 22:41:37 +0800 Subject: [PATCH 2/2] feat: support history metadata table. --- crates/sqllogictest/testdata/slts/df_test/show_tables.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt index a0f0e55b5b..07b2739172 100644 --- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt +++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt @@ -26,9 +26,11 @@ datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW default default test_binary_table BASE TABLE +default default test_binary_table$history BASE TABLE default default test_binary_table$manifests BASE TABLE default default test_binary_table$snapshots BASE TABLE default default test_partitioned_table BASE TABLE +default default test_partitioned_table$history BASE TABLE default default test_partitioned_table$manifests BASE TABLE default default test_partitioned_table$snapshots BASE TABLE default information_schema columns VIEW