From 70fe27daf51d3ebea0e93f2130ab12e11ea0c91a Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Tue, 9 Jun 2026 21:02:45 +0800 Subject: [PATCH 01/21] feat: lance supports hdfs scheme --- .github/workflows/rust.yml | 6 + java/pom.xml | 4 + rust/lance-io/Cargo.toml | 1 + rust/lance-io/src/object_store.rs | 2 + rust/lance-io/src/object_store/providers.rs | 5 + .../src/object_store/providers/hdfs.rs | 219 ++++++++++++++++++ rust/lance-io/tests/hdfs_integration.rs | 167 +++++++++++++ 7 files changed, 404 insertions(+) create mode 100644 rust/lance-io/src/object_store/providers/hdfs.rs create mode 100644 rust/lance-io/tests/hdfs_integration.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index e3c17671ce3..d0ed88cd5d0 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -104,6 +104,9 @@ jobs: run: | sudo apt update sudo apt install -y protobuf-compiler libssl-dev + JAVA_HOME="$(dirname "$(dirname "$(readlink -f "$(command -v java)")")")" + echo "JAVA_HOME=${JAVA_HOME}" >> "$GITHUB_ENV" + echo "LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH:-}" >> "$GITHUB_ENV" - name: Start DynamodDB and S3 run: docker compose -f docker-compose.yml up -d --wait - name: Install cargo-llvm-cov @@ -135,6 +138,9 @@ jobs: run: | sudo apt -y -qq update sudo apt install -y protobuf-compiler libssl-dev pkg-config + JAVA_HOME="$(dirname "$(dirname "$(readlink -f "$(command -v java)")")")" + echo "JAVA_HOME=${JAVA_HOME}" >> "$GITHUB_ENV" + echo "LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH:-}" >> "$GITHUB_ENV" - name: Build tests run: | ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests | sort | uniq | paste -s -d "," -` diff --git a/java/pom.xml b/java/pom.xml index 6306ecc63f9..546e7169d25 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -56,6 +56,7 @@ * limitations under the License. */ + @@ -399,6 +400,9 @@ ${project.build.directory}/classes/nativelib true + + ${cargo.features} + diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index 6cee04d5fd9..ad75f206e7b 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -78,6 +78,7 @@ tencent = ["dep:opendal", "opendal/services-cos", "dep:object_store_opendal"] huggingface = ["dep:opendal", "opendal/services-huggingface", "dep:object_store_opendal"] tos = ["dep:opendal", "opendal/services-tos", "dep:object_store_opendal"] tos-test = ["tos"] +hdfs = ["dep:opendal", "opendal/services-hdfs", "dep:object_store_opendal"] test-util = [] [lints] diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 0c44095f117..ce97e06ea1f 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -61,6 +61,7 @@ pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8; pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64; const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size + #[cfg(any( feature = "aws", feature = "gcp", @@ -69,6 +70,7 @@ const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size feature = "tencent", feature = "huggingface", feature = "tos", + feature = "hdfs", ))] const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index 45ac30a757a..e6ee194d50d 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -26,6 +26,8 @@ pub mod azure; pub mod gcp; #[cfg(feature = "goosefs")] pub mod goosefs; +#[cfg(feature = "hdfs")] +pub mod hdfs; #[cfg(feature = "huggingface")] pub mod huggingface; pub mod local; @@ -100,6 +102,7 @@ pub struct ObjectStoreRegistryStats { /// - `az`: An Azure Blob Storage object store. /// - `gs`: A Google Cloud Storage object store. /// - `tos`: A Volcengine TOS object store. +/// - `hdfs`: A Hadoop FileSystem object store. /// /// Use [`Self::empty()`] to create an empty registry, with no providers registered. /// @@ -339,6 +342,8 @@ impl Default for ObjectStoreRegistry { providers.insert("hf".into(), Arc::new(huggingface::HuggingfaceStoreProvider)); #[cfg(feature = "tos")] providers.insert("tos".into(), Arc::new(tos::TosStoreProvider)); + #[cfg(feature = "hdfs")] + providers.insert("hdfs".into(), Arc::new(hdfs::HdfsStoreProvider)); Self { providers: RwLock::new(providers), active_stores: RwLock::new(HashMap::new()), diff --git a/rust/lance-io/src/object_store/providers/hdfs.rs b/rust/lance-io/src/object_store/providers/hdfs.rs new file mode 100644 index 00000000000..e638e9144ef --- /dev/null +++ b/rust/lance-io/src/object_store/providers/hdfs.rs @@ -0,0 +1,219 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::collections::HashMap; +use std::sync::Arc; + +use object_store_opendal::OpendalStore; +use opendal::{Operator, services::Hdfs}; +use url::Url; + +use crate::object_store::{ + DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore, + ObjectStoreParams, ObjectStoreProvider, StorageOptions, +}; +use lance_core::error::{Error, Result}; + +/// HDFS object store provider backed by OpenDAL. +#[derive(Default, Debug)] +pub struct HdfsStoreProvider; + +impl HdfsStoreProvider { + fn build_config( + base_path: &Url, + storage_options: &StorageOptions, + env_vars: I, + ) -> Result> + where + I: IntoIterator, + K: AsRef, + V: Into, + { + base_path + .host_str() + .ok_or_else(|| Error::invalid_input("HDFS URI must contain namenode host"))?; + + let env_vars = env_vars + .into_iter() + .filter_map(|(key, value)| { + let value = value.into(); + if value.is_empty() { + None + } else { + Some((key.as_ref().to_string(), value)) + } + }) + .collect::>(); + + let name_node = storage_options + .0 + .get("hdfs_name_node") + .cloned() + .or_else(|| env_vars.get("HDFS_NAME_NODE").cloned()) + .unwrap_or_else(|| format!("hdfs://{}", base_path.authority())); + + let mut config = HashMap::from([ + ("name_node".to_string(), name_node), + ("root".to_string(), "/".to_string()), + ]); + + let user = storage_options + .0 + .get("hdfs_user") + .cloned() + .or_else(|| env_vars.get("HADOOP_USER_NAME").cloned()) + .or_else(|| env_vars.get("HDFS_USER").cloned()); + if let Some(user) = user { + config.insert("user".to_string(), user); + } + + for (storage_key, config_key) in [ + ( + "hdfs_kerberos_ticket_cache_path", + "kerberos_ticket_cache_path", + ), + ("hdfs_atomic_write_dir", "atomic_write_dir"), + ] { + if let Some(value) = storage_options.0.get(storage_key).filter(|v| !v.is_empty()) { + config.insert(config_key.to_string(), value.clone()); + } + } + + Ok(config) + } +} + +#[async_trait::async_trait] +impl ObjectStoreProvider for HdfsStoreProvider { + async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result { + let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE); + let storage_options = StorageOptions(params.storage_options().cloned().unwrap_or_default()); + let config = Self::build_config(&base_path, &storage_options, std::env::vars())?; + + let operator = Operator::from_iter::(config) + .map_err(|e| Error::invalid_input(format!("Failed to create HDFS operator: {e:?}")))? + .finish(); + let opendal_store = Arc::new(OpendalStore::new(operator)); + + Ok(ObjectStore { + scheme: "hdfs".to_string(), + inner: opendal_store, + block_size, + max_iop_size: *DEFAULT_MAX_IOP_SIZE, + use_constant_size_upload_parts: params.use_constant_size_upload_parts, + list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or(true), + io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, + download_retry_count: storage_options.download_retry_count(), + io_tracker: Default::default(), + store_prefix: self + .calculate_object_store_prefix(&base_path, params.storage_options())?, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::object_store::ObjectStoreProvider; + use object_store::path::Path; + + #[test] + fn test_hdfs_store_paths() { + let provider = HdfsStoreProvider; + let cases = [ + ("hdfs://namenode:9000/path/to/file", "path/to/file"), + ("hdfs://namenode/path/to/file", "path/to/file"), + ("hdfs://namenode:9000/", ""), + ( + "hdfs://namenode:9000/user/data/dataset/file.parquet", + "user/data/dataset/file.parquet", + ), + ("hdfs://ht-hdfsqa/user/data/file.txt", "user/data/file.txt"), + ]; + + for (url, expected_path) in cases { + let path = provider.extract_path(&Url::parse(url).unwrap()).unwrap(); + assert_eq!(path, Path::from(expected_path)); + } + } + + #[test] + fn test_hdfs_config_from_url() { + let url = Url::parse("hdfs://namenode:9000/test").unwrap(); + let config = HdfsStoreProvider::build_config( + &url, + &StorageOptions::default(), + Vec::<(&str, &str)>::new(), + ) + .unwrap(); + + assert_eq!(config.get("name_node").unwrap(), "hdfs://namenode:9000"); + assert_eq!(config.get("root").unwrap(), "/"); + } + + #[test] + fn test_hdfs_storage_options_override_environment_and_url() { + let url = Url::parse("hdfs://url-namenode:9000/test").unwrap(); + let storage_options = StorageOptions(HashMap::from([ + ( + "hdfs_name_node".to_string(), + "hdfs://option-namenode:8020".to_string(), + ), + ("hdfs_user".to_string(), "option-user".to_string()), + ( + "hdfs_kerberos_ticket_cache_path".to_string(), + "/tmp/krb5cc".to_string(), + ), + ( + "hdfs_atomic_write_dir".to_string(), + "/tmp/atomic".to_string(), + ), + ])); + let env_vars = [ + ("HDFS_NAME_NODE", "hdfs://env-namenode:9000"), + ("HADOOP_USER_NAME", "env-user"), + ]; + + let config = HdfsStoreProvider::build_config(&url, &storage_options, env_vars).unwrap(); + + assert_eq!( + config.get("name_node").unwrap(), + "hdfs://option-namenode:8020" + ); + assert_eq!(config.get("user").unwrap(), "option-user"); + assert_eq!( + config.get("kerberos_ticket_cache_path").unwrap(), + "/tmp/krb5cc" + ); + assert_eq!(config.get("atomic_write_dir").unwrap(), "/tmp/atomic"); + } + + #[test] + fn test_hdfs_config_from_environment() { + let url = Url::parse("hdfs://url-namenode:9000/test").unwrap(); + let env_vars = [ + ("HDFS_NAME_NODE", "hdfs://env-namenode:9000"), + ("HADOOP_USER_NAME", "env-user"), + ]; + + let config = + HdfsStoreProvider::build_config(&url, &StorageOptions::default(), env_vars).unwrap(); + + assert_eq!(config.get("name_node").unwrap(), "hdfs://env-namenode:9000"); + assert_eq!(config.get("user").unwrap(), "env-user"); + } + + #[test] + fn test_hdfs_config_rejects_url_without_host() { + let url = Url::parse("hdfs:///test").unwrap(); + let error = HdfsStoreProvider::build_config( + &url, + &StorageOptions::default(), + Vec::<(&str, &str)>::new(), + ) + .unwrap_err(); + + assert!(matches!(error, Error::InvalidInput { .. })); + assert!(error.to_string().contains("namenode host")); + } +} diff --git a/rust/lance-io/tests/hdfs_integration.rs b/rust/lance-io/tests/hdfs_integration.rs new file mode 100644 index 00000000000..55d753144f1 --- /dev/null +++ b/rust/lance-io/tests/hdfs_integration.rs @@ -0,0 +1,167 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Integration tests for HDFS object store provider +//! +//! These tests require a running HDFS cluster. They can be run with: +//! cargo test --features hdfs hdfs_integration_test + +#[cfg(feature = "hdfs")] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use lance_io::object_store::{ + ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor, + }; + use object_store::ObjectStoreExt; + use object_store::path::Path; + use url::Url; + + fn hdfs_available() -> bool { + std::env::var("HDFS_NAME_NODE").is_ok() || std::env::var("HDFS_TEST_ENABLED").is_ok() + } + + #[tokio::test] + async fn test_hdfs_store_creation() { + if !hdfs_available() { + return; + } + + let registry = Arc::new(ObjectStoreRegistry::default()); + let url = "hdfs://localhost:9000/test/path"; + let params = ObjectStoreParams::default(); + + let (store, path) = ObjectStore::from_uri_and_params(registry, url, ¶ms) + .await + .unwrap(); + + assert_eq!(store.scheme(), "hdfs"); + assert_eq!(path, Path::from("test/path")); + } + + #[tokio::test] + async fn test_hdfs_store_with_custom_config() { + if !hdfs_available() { + return; + } + + let registry = Arc::new(ObjectStoreRegistry::default()); + let url = "hdfs://namenode:9000/user/test"; + + let mut storage_options = HashMap::new(); + storage_options.insert("hdfs_user".to_string(), "testuser".to_string()); + storage_options.insert( + "hdfs_name_node".to_string(), + "hdfs://namenode:9000".to_string(), + ); + + let params = ObjectStoreParams { + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( + storage_options, + ))), + ..Default::default() + }; + + let (store, path) = ObjectStore::from_uri_and_params(registry, url, ¶ms) + .await + .unwrap(); + + assert_eq!(store.scheme(), "hdfs"); + assert_eq!(path, Path::from("user/test")); + } + + #[tokio::test] + async fn test_hdfs_url_parsing() { + let registry = Arc::new(ObjectStoreRegistry::default()); + + let test_cases = vec![ + ("hdfs://localhost:9000/", ""), + ("hdfs://localhost:9000/user", "user"), + ( + "hdfs://localhost:9000/user/data/file.txt", + "user/data/file.txt", + ), + ("hdfs://namenode/path/to/file", "path/to/file"), + // HA configuration tests + ("hdfs://ht-hdfsqa/", ""), + ("hdfs://ht-hdfsqa/user/data", "user/data"), + ("hdfs://mycluster/tmp/test.txt", "tmp/test.txt"), + ]; + + for (url_str, expected_path) in test_cases { + let url = Url::parse(url_str).unwrap(); + let provider = registry.get_provider("hdfs").unwrap(); + let path = provider.extract_path(&url).unwrap(); + assert_eq!(path, Path::from(expected_path)); + } + } + + #[tokio::test] + async fn test_hdfs_basic_operations() { + if !hdfs_available() { + return; + } + + let registry = Arc::new(ObjectStoreRegistry::default()); + let url = "hdfs://localhost:9000/test"; + let params = ObjectStoreParams::default(); + + let (store, _) = ObjectStore::from_uri_and_params(registry, url, ¶ms) + .await + .unwrap(); + let test_path = Path::from("test_file.txt"); + let test_data = bytes::Bytes::from("Hello, HDFS!"); + + store + .inner + .put(&test_path, test_data.clone().into()) + .await + .unwrap(); + let read_data = store + .inner + .get(&test_path) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(read_data, test_data); + store.inner.delete(&test_path).await.unwrap(); + } + + #[test] + fn test_hdfs_config_validation() { + // Test that missing namenode configuration is properly handled + let storage_options = HashMap::new(); + let params = ObjectStoreParams { + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( + storage_options, + ))), + ..Default::default() + }; + + // This test verifies the configuration validation logic without actually connecting + // The actual connection would fail, but we're testing the config validation + assert!(params.storage_options().is_some()); + } + + #[tokio::test] + async fn test_hdfs_ha_configuration() { + if !hdfs_available() { + return; + } + + // Test HA configuration like hdfs://ht-hdfsqa + let registry = Arc::new(ObjectStoreRegistry::default()); + let url = "hdfs://ht-hdfsqa/user/test"; + let params = ObjectStoreParams::default(); + + let (store, path) = ObjectStore::from_uri_and_params(registry, url, ¶ms) + .await + .unwrap(); + + assert_eq!(store.scheme(), "hdfs"); + assert_eq!(path, Path::from("user/test")); + } +} From a2cbc96f1975a7cfd51be2130f9443414c783ac4 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Wed, 10 Jun 2026 13:55:40 +0800 Subject: [PATCH 02/21] fix: exclude hdfs from CI all-features builds and update Cargo.lock The hdfs feature requires Hadoop native libraries (libhdfs.so) which are not available on CI runners. Exclude it from ALL_FEATURES computation, following the same pattern as protoc and slow_tests. Also update Cargo.lock to include hdfs-related dependencies (hdrs, hdfs-sys, java-locator, opendal-service-hdfs) so --locked builds don't fail when hdfs is accidentally enabled. --- .github/workflows/rust.yml | 12 ++-- Cargo.lock | 114 +++++++++++++++++++++++++++++++++---- 2 files changed, 108 insertions(+), 18 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index d0ed88cd5d0..bde3d0566df 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -68,7 +68,7 @@ jobs: sudo apt install -y protobuf-compiler libssl-dev - name: Get features run: | - ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | sort | uniq | paste -s -d "," -` + ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` echo "ALL_FEATURES=${ALL_FEATURES}" >> $GITHUB_ENV - name: Clippy run: cargo clippy --profile ci --locked --features ${{ env.ALL_FEATURES }} --all-targets -- -D warnings @@ -113,7 +113,7 @@ jobs: uses: taiki-e/install-action@66068bfca13dcb2ea07c3f613ca2836a37c755d5 # cargo-llvm-cov - name: Run tests run: | - ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests | sort | uniq | paste -s -d "," -` + ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` cargo +nightly llvm-cov --profile ci --locked --workspace --codecov --output-path coverage.codecov --features ${ALL_FEATURES} - name: Upload coverage to Codecov uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4 @@ -143,13 +143,13 @@ jobs: echo "LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH:-}" >> "$GITHUB_ENV" - name: Build tests run: | - ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests | sort | uniq | paste -s -d "," -` + ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` cargo test --profile ci --locked --features ${ALL_FEATURES} --no-run - name: Start DynamodDB and S3 run: docker compose -f docker-compose.yml up -d --wait - name: Run tests run: | - ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests | sort | uniq | paste -s -d "," -` + ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` cargo test --profile ci --locked --features ${ALL_FEATURES} query-integration-tests: runs-on: ubuntu-24.04-4x @@ -201,7 +201,7 @@ jobs: sudo apt install -y protobuf-compiler libssl-dev - name: Build all run: | - ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests | sort | uniq | paste -s -d "," -` + ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` cargo build --profile ci --benches --features ${ALL_FEATURES} --tests mac-build: runs-on: warp-macos-14-arm64-6x @@ -286,5 +286,5 @@ jobs: env: RUSTUP_TOOLCHAIN: ${{ matrix.msrv }} run: | - ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests | sort | uniq | paste -s -d "," -` + ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` cargo check --profile ci --workspace --tests --benches --features ${ALL_FEATURES} diff --git a/Cargo.lock b/Cargo.lock index 89d20bdf647..f8cd049ad6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -463,6 +463,12 @@ dependencies = [ "syn 2.0.118", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.89" @@ -1180,6 +1186,20 @@ dependencies = [ ] [[package]] +name = "blocking" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + +[[package]] + name = "bs58" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1455,7 +1475,7 @@ version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2764,7 +2784,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2916,7 +2936,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3147,6 +3167,16 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.32" @@ -3536,6 +3566,30 @@ version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" +[[package]] +name = "hdfs-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33e2d5cefba2d51a26b44d2a493f963a32725a0f6593c91be4a610ad449c49cb" +dependencies = [ + "cc", + "java-locator", +] + +[[package]] +name = "hdrs" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c42a693bfe5dc8fcad1f24044c5ec355c5f157b8ce63c7d62f51cecbc7878d" +dependencies = [ + "blocking", + "errno", + "futures", + "hdfs-sys", + "libc", + "log", +] + [[package]] name = "heapify" version = "0.2.0" @@ -4145,7 +4199,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -4196,6 +4250,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "java-locator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09c46c1fe465c59b1474e665e85e1256c3893dd00927b8d55f63b09044c1e64f" +dependencies = [ + "glob", +] + [[package]] name = "jieba-macros" version = "0.10.1" @@ -5681,7 +5744,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -5960,6 +6023,7 @@ dependencies = [ "opendal-service-cos", "opendal-service-gcs", "opendal-service-goosefs", + "opendal-service-hdfs", "opendal-service-hf", "opendal-service-oss", "opendal-service-s3", @@ -6140,6 +6204,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-hdfs" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb51ce674ce98b8b7d9ac7d2dfc9d7fc8f1bbd1da1a5fb928ecc5f13ba7dc88a" +dependencies = [ + "bytes", + "futures", + "hdrs", + "log", + "opendal-core", + "serde", + "tokio", +] + [[package]] name = "opendal-service-hf" version = "0.57.0" @@ -6569,6 +6648,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c835479a4443ded371d6c535cbfd8d31ad92c5d23ae9770a61bc155e4992a3c1" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkcs1" version = "0.7.5" @@ -6804,7 +6894,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03da047801ff44bb6a4d407d4860c05fd70bb81714e6b2f3812603d5b145b042" dependencies = [ "heck", - "itertools 0.14.0", + "itertools 0.11.0", "log", "multimap", "petgraph", @@ -6823,7 +6913,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.118", @@ -6964,7 +7054,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.60.2", + "windows-sys 0.59.0", ] [[package]] @@ -7685,7 +7775,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -7744,7 +7834,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -8630,7 +8720,7 @@ dependencies = [ "getrandom 0.4.3", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -9654,7 +9744,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] From 2cbc75304e8e33d3f61f248e17cdd8bc43ae2a28 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Wed, 10 Jun 2026 15:32:19 +0800 Subject: [PATCH 03/21] fix: improve optional HDFS handling --- .github/workflows/rust.yml | 6 ---- .../src/object_store/providers/hdfs.rs | 28 ++++++++++++++++++- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index bde3d0566df..8e1fad87688 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -104,9 +104,6 @@ jobs: run: | sudo apt update sudo apt install -y protobuf-compiler libssl-dev - JAVA_HOME="$(dirname "$(dirname "$(readlink -f "$(command -v java)")")")" - echo "JAVA_HOME=${JAVA_HOME}" >> "$GITHUB_ENV" - echo "LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH:-}" >> "$GITHUB_ENV" - name: Start DynamodDB and S3 run: docker compose -f docker-compose.yml up -d --wait - name: Install cargo-llvm-cov @@ -138,9 +135,6 @@ jobs: run: | sudo apt -y -qq update sudo apt install -y protobuf-compiler libssl-dev pkg-config - JAVA_HOME="$(dirname "$(dirname "$(readlink -f "$(command -v java)")")")" - echo "JAVA_HOME=${JAVA_HOME}" >> "$GITHUB_ENV" - echo "LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH:-}" >> "$GITHUB_ENV" - name: Build tests run: | ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` diff --git a/rust/lance-io/src/object_store/providers/hdfs.rs b/rust/lance-io/src/object_store/providers/hdfs.rs index e638e9144ef..21299c93f29 100644 --- a/rust/lance-io/src/object_store/providers/hdfs.rs +++ b/rust/lance-io/src/object_store/providers/hdfs.rs @@ -19,6 +19,12 @@ use lance_core::error::{Error, Result}; pub struct HdfsStoreProvider; impl HdfsStoreProvider { + fn operator_error(error: impl std::fmt::Display, name_node: &str, has_user: bool) -> Error { + Error::io(format!( + "Failed to create HDFS operator: {error}. name_node={name_node}, has_user={has_user}" + )) + } + fn build_config( base_path: &Url, storage_options: &StorageOptions, @@ -90,8 +96,13 @@ impl ObjectStoreProvider for HdfsStoreProvider { let storage_options = StorageOptions(params.storage_options().cloned().unwrap_or_default()); let config = Self::build_config(&base_path, &storage_options, std::env::vars())?; + let name_node = config + .get("name_node") + .cloned() + .unwrap_or_else(|| "".to_string()); + let has_user = config.contains_key("user"); let operator = Operator::from_iter::(config) - .map_err(|e| Error::invalid_input(format!("Failed to create HDFS operator: {e:?}")))? + .map_err(|error| Self::operator_error(error, &name_node, has_user))? .finish(); let opendal_store = Arc::new(OpendalStore::new(operator)); @@ -216,4 +227,19 @@ mod tests { assert!(matches!(error, Error::InvalidInput { .. })); assert!(error.to_string().contains("namenode host")); } + + #[test] + fn test_hdfs_operator_error_includes_connection_context() { + let error = HdfsStoreProvider::operator_error( + std::io::Error::other("native client unavailable"), + "hdfs://namenode:9000", + true, + ); + let message = error.to_string(); + + assert!(matches!(error, Error::IO { .. })); + assert!(message.contains("native client unavailable")); + assert!(message.contains("name_node=hdfs://namenode:9000")); + assert!(message.contains("has_user=true")); + } } From fc3d8786cd8f4d66a8ad9d65a817e748a8f758cd Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Wed, 10 Jun 2026 16:29:08 +0800 Subject: [PATCH 04/21] fix java build failed --- Cargo.lock | 1 - java/lance-jni/Cargo.lock | 89 +++++++++++++++++++++++++++++++++++++++ java/lance-jni/Cargo.toml | 1 + java/pom.xml | 4 -- 4 files changed, 90 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f8cd049ad6d..3c03c30d68c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1199,7 +1199,6 @@ dependencies = [ ] [[package]] - name = "bs58" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index ee52544ba57..d1a99d55f87 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -414,6 +414,12 @@ dependencies = [ "syn", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.89" @@ -989,6 +995,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "bs58" version = "0.5.1" @@ -2548,6 +2567,16 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.32" @@ -2925,6 +2954,30 @@ version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" +[[package]] +name = "hdfs-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33e2d5cefba2d51a26b44d2a493f963a32725a0f6593c91be4a610ad449c49cb" +dependencies = [ + "cc", + "java-locator", +] + +[[package]] +name = "hdrs" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c42a693bfe5dc8fcad1f24044c5ec355c5f157b8ce63c7d62f51cecbc7878d" +dependencies = [ + "blocking", + "errno", + "futures", + "hdfs-sys", + "libc", + "log", +] + [[package]] name = "heapify" version = "0.2.0" @@ -3488,6 +3541,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "java-locator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09c46c1fe465c59b1474e665e85e1256c3893dd00927b8d55f63b09044c1e64f" +dependencies = [ + "glob", +] + [[package]] name = "jiff" version = "0.2.28" @@ -4825,6 +4887,7 @@ dependencies = [ "opendal-service-cos", "opendal-service-gcs", "opendal-service-goosefs", + "opendal-service-hdfs", "opendal-service-hf", "opendal-service-oss", "opendal-service-s3", @@ -5005,6 +5068,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-hdfs" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb51ce674ce98b8b7d9ac7d2dfc9d7fc8f1bbd1da1a5fb928ecc5f13ba7dc88a" +dependencies = [ + "bytes", + "futures", + "hdrs", + "log", + "opendal-core", + "serde", + "tokio", +] + [[package]] name = "opendal-service-hf" version = "0.57.0" @@ -5310,6 +5388,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c835479a4443ded371d6c535cbfd8d31ad92c5d23ae9770a61bc155e4992a3c1" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkcs1" version = "0.7.5" diff --git a/java/lance-jni/Cargo.toml b/java/lance-jni/Cargo.toml index 6210c5daf1d..baaaea2e316 100644 --- a/java/lance-jni/Cargo.toml +++ b/java/lance-jni/Cargo.toml @@ -14,6 +14,7 @@ crate-type = ["cdylib"] [features] default = [] +hdfs = ["lance-io/hdfs"] [dependencies] lance = { path = "../../rust/lance", features = ["substrait"] } diff --git a/java/pom.xml b/java/pom.xml index 546e7169d25..6306ecc63f9 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -56,7 +56,6 @@ * limitations under the License. */ - @@ -400,9 +399,6 @@ ${project.build.directory}/classes/nativelib true - - ${cargo.features} - From dd5d0a82624b2710ed18daacc7ebc58969110fd1 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Wed, 10 Jun 2026 16:38:11 +0800 Subject: [PATCH 05/21] docs: explain Java HDFS builds --- java/README.md | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/java/README.md b/java/README.md index b49a4892527..c6115463bf1 100644 --- a/java/README.md +++ b/java/README.md @@ -220,30 +220,48 @@ And the whole `java` dir is a standard maven project can be imported into any ID Standard Build (Java + JNI) ```shell -mvn clean package +./mvnw clean package ``` This command executes the base Maven build process to compile all Java code in the `java` directory and generate the JNI native library. +HDFS-enabled Build: + +HDFS support is not enabled by default because it introduces additional native dependencies and requires a Java and Hadoop environment. To build the Java SDK and JNI native library with HDFS support, pass the `hdfs` Cargo feature through the Rust Maven plugin: + +```shell +./mvnw clean package -Dfeatures=hdfs +``` + +For a release build with HDFS support: + +```shell +./mvnw clean package -Drust.release.build=true -Dfeatures=hdfs +``` + +The resulting native library requires the Hadoop native client libraries at runtime. Configure `JAVA_HOME`, Hadoop configuration, and the platform library path, such as `LD_LIBRARY_PATH`, before loading the Java SDK. + Java-Only Build: ```shell -mvn clean package -Dskip.build.jni=true +./mvnw clean package -Dskip.build.jni=true ``` This will skip the JNI code compilation step and only process the Java module. Useful when focusing on Java feature development without needing native libraries, reducing build time. Product Release Build: ```shell -mvn clean package -Drust.release.build=true +./mvnw clean package -Drust.release.build=true ``` This will enable product environment optimization configurations (e.g., code shrinking, debug symbol removal, performance tuning) to generate deployment packages suitable for production environments. The optimized package is smaller in size and runs more efficiently. If you only want to build rust code(`lance-jni`), you can run the following command: ```shell -cd lance-jni && cargo build +cd lance-jni && cargo build --features hdfs ``` +Omit `--features hdfs` to build the JNI native library without HDFS support. + The java module uses `spotless` maven plugin to format the code and check the license header. And it is applied in the `validate` phase automatically. @@ -266,7 +284,7 @@ And you also need to install the rust plugin for your IDE. Then, you can build the whole java module: ```shell -mvn clean package +./mvnw clean package ``` Running these commands, it builds the rust jni binding codes automatically. From 3c34728cfe8d39d4d455eaa1fca282a019d38f67 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Wed, 10 Jun 2026 16:41:53 +0800 Subject: [PATCH 06/21] docs: use mvn in Java build examples --- java/README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/java/README.md b/java/README.md index c6115463bf1..9601cf90cc4 100644 --- a/java/README.md +++ b/java/README.md @@ -220,7 +220,7 @@ And the whole `java` dir is a standard maven project can be imported into any ID Standard Build (Java + JNI) ```shell -./mvnw clean package +mvn clean package ``` This command executes the base Maven build process to compile all Java code in the `java` directory and generate the JNI native library. @@ -229,13 +229,13 @@ HDFS-enabled Build: HDFS support is not enabled by default because it introduces additional native dependencies and requires a Java and Hadoop environment. To build the Java SDK and JNI native library with HDFS support, pass the `hdfs` Cargo feature through the Rust Maven plugin: ```shell -./mvnw clean package -Dfeatures=hdfs +mvn clean package -Dfeatures=hdfs ``` For a release build with HDFS support: ```shell -./mvnw clean package -Drust.release.build=true -Dfeatures=hdfs +mvn clean package -Drust.release.build=true -Dfeatures=hdfs ``` The resulting native library requires the Hadoop native client libraries at runtime. Configure `JAVA_HOME`, Hadoop configuration, and the platform library path, such as `LD_LIBRARY_PATH`, before loading the Java SDK. @@ -243,14 +243,14 @@ The resulting native library requires the Hadoop native client libraries at runt Java-Only Build: ```shell -./mvnw clean package -Dskip.build.jni=true +mvn clean package -Dskip.build.jni=true ``` This will skip the JNI code compilation step and only process the Java module. Useful when focusing on Java feature development without needing native libraries, reducing build time. Product Release Build: ```shell -./mvnw clean package -Drust.release.build=true +mvn clean package -Drust.release.build=true ``` This will enable product environment optimization configurations (e.g., code shrinking, debug symbol removal, performance tuning) to generate deployment packages suitable for production environments. The optimized package is smaller in size and runs more efficiently. @@ -284,7 +284,7 @@ And you also need to install the rust plugin for your IDE. Then, you can build the whole java module: ```shell -./mvnw clean package +mvn clean package ``` Running these commands, it builds the rust jni binding codes automatically. From f1f3ba1b578c1a7e557a2a8f0cd635d0598bbd1a Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Wed, 10 Jun 2026 17:22:17 +0800 Subject: [PATCH 07/21] docs: document HDFS object store setup --- docs/src/guide/object_store.md | 62 +++++++++++++++++++ .../src/object_store/providers/hdfs.rs | 23 +++++++ 2 files changed, 85 insertions(+) diff --git a/docs/src/guide/object_store.md b/docs/src/guide/object_store.md index f901d2c2411..fff6ed682a5 100644 --- a/docs/src/guide/object_store.md +++ b/docs/src/guide/object_store.md @@ -191,6 +191,68 @@ These keys can be used as both environment variables or keys in the `storage_opt | `azure_use_azure_cli` / `use_azure_cli` | Use azure cli for acquiring access token. | | `azure_disable_tagging` / `disable_tagging` | Disables tagging objects. This can be desirable if not supported by the backing store. | +## HDFS Configuration + +HDFS support is optional and must be enabled when building Lance. For Rust builds, +enable the `hdfs` feature on `lance-io`. For Java builds, see the +[Java HDFS build instructions](https://github.com/lance-format/lance/tree/main/java#hdfs-enabled-build). +Prebuilt Lance packages may not include HDFS support. + +Use an `hdfs://` URI containing a NameNode address or an HDFS high-availability +nameservice: + +```python +import lance + +ds = lance.dataset("hdfs://namenode:9000/user/lance/my-dataset") +``` + +For high-availability clusters configured in Hadoop XML files, the URI authority +can be the nameservice: + +```python +ds = lance.dataset("hdfs://mycluster/user/lance/my-dataset") +``` + +Explicit `storage_options` take priority over environment variables. If neither +specifies a NameNode, Lance uses the URI authority. + +```python +ds = lance.dataset( + "hdfs://namenode:9000/user/lance/my-dataset", + storage_options={ + "hdfs_name_node": "hdfs://namenode:8020", + "hdfs_user": "lance", + "hdfs_kerberos_ticket_cache_path": "/tmp/krb5cc_lance", + "hdfs_atomic_write_dir": "/tmp/lance-hdfs-atomic", + }, +) +``` + +| `storage_options` key | Environment variable | Description | +|-----------------------|----------------------|-------------| +| `hdfs_name_node` | `HDFS_NAME_NODE` | NameNode URI or HA nameservice. Defaults to the `hdfs://` URI authority. | +| `hdfs_user` | `HADOOP_USER_NAME`, then `HDFS_USER` | HDFS user name. The storage option takes priority, followed by the environment variables in the listed order. | +| `hdfs_kerberos_ticket_cache_path` | None | Path to the Kerberos ticket cache used to authenticate with HDFS. | +| `hdfs_atomic_write_dir` | None | HDFS directory used by OpenDAL for atomic writes. | + +Lance's HDFS provider uses OpenDAL's HDFS service, which depends on `hdrs` and +`hdfs-sys`. Building and running an HDFS-enabled artifact requires a local Java +and Hadoop native environment: + +```bash +export JAVA_HOME=/path/to/java +export HADOOP_HOME=/path/to/hadoop +export CLASSPATH="$(${HADOOP_HOME}/bin/hadoop classpath --glob)" +export LD_LIBRARY_PATH="${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native:${LD_LIBRARY_PATH}" +``` + +`hdfs-sys` dynamically links `libjvm`. If it uses a dynamically linked +`libhdfs`, `${HADOOP_HOME}/lib/native` must also be discoverable through the +platform library path. Ensure the Hadoop configuration directory, commonly +`${HADOOP_HOME}/etc/hadoop`, is included in the Hadoop classpath for HA, +Kerberos, and other cluster-specific settings. + ## AliCloud Object Storage Service Configuration OSS credentials can be set in the environment variables `OSS_ACCESS_KEY_ID`, diff --git a/rust/lance-io/src/object_store/providers/hdfs.rs b/rust/lance-io/src/object_store/providers/hdfs.rs index 21299c93f29..eabd2c36882 100644 --- a/rust/lance-io/src/object_store/providers/hdfs.rs +++ b/rust/lance-io/src/object_store/providers/hdfs.rs @@ -1,6 +1,29 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +//! HDFS object store provider backed by OpenDAL. +//! +//! HDFS support is optional and requires the `hdfs` Cargo feature. Dataset URIs +//! use the form `hdfs:///`, for example +//! `hdfs://namenode:9000/user/lance/dataset` or +//! `hdfs://mycluster/user/lance/dataset` for an HA nameservice. +//! +//! Configuration priority is `storage_options`, environment variables, then +//! the URI authority: +//! +//! | `storage_options` key | Environment variable | OpenDAL option | +//! | --- | --- | --- | +//! | `hdfs_name_node` | `HDFS_NAME_NODE` | `name_node` | +//! | `hdfs_user` | `HADOOP_USER_NAME`, then `HDFS_USER` | `user` | +//! | `hdfs_kerberos_ticket_cache_path` | None | `kerberos_ticket_cache_path` | +//! | `hdfs_atomic_write_dir` | None | `atomic_write_dir` | +//! +//! OpenDAL's HDFS service depends on `hdrs` and `hdfs-sys`. Building and +//! running it requires a local Java and Hadoop native environment. In +//! particular, configure `JAVA_HOME`, `HADOOP_HOME`, `CLASSPATH`, and the +//! platform library path so that `libjvm` and, when dynamically linked, +//! `libhdfs` can be discovered. + use std::collections::HashMap; use std::sync::Arc; From 330e010da1df0572e18aa37b3e0cbdc9e398c6f0 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Wed, 10 Jun 2026 17:30:00 +0800 Subject: [PATCH 08/21] docs: move HDFS configuration after TOS --- docs/src/guide/object_store.md | 116 ++++++++++++++++----------------- 1 file changed, 58 insertions(+), 58 deletions(-) diff --git a/docs/src/guide/object_store.md b/docs/src/guide/object_store.md index fff6ed682a5..9690a1ad9e1 100644 --- a/docs/src/guide/object_store.md +++ b/docs/src/guide/object_store.md @@ -191,6 +191,64 @@ These keys can be used as both environment variables or keys in the `storage_opt | `azure_use_azure_cli` / `use_azure_cli` | Use azure cli for acquiring access token. | | `azure_disable_tagging` / `disable_tagging` | Disables tagging objects. This can be desirable if not supported by the backing store. | +## AliCloud Object Storage Service Configuration + +OSS credentials can be set in the environment variables `OSS_ACCESS_KEY_ID`, +`OSS_ACCESS_KEY_SECRET`, `OSS_REGION`, and `OSS_SECURITY_TOKEN`. Alternatively, they can be +passed as parameters to the `storage_options` parameter: + +```python +import lance +ds = lance.dataset( + "oss://bucket/path", + storage_options={ + "oss_region": "oss-region", + "oss_endpoint": "oss-endpoint", + "oss_access_key_id": "my-access-key", + "oss_secret_access_key": "my-secret-key", + "oss_security_token": "my-session-token", + } +) +``` + +| Key | Description | +|-----|-------------| +| `oss_endpoint` | OSS endpoint. Required (for example, `https://oss-cn-hangzhou.aliyuncs.com`). | +| `oss_access_key_id` | Access key ID used for OSS authentication. Optional if credentials are provided by environment. | +| `oss_secret_access_key` | Access key secret used for OSS authentication. Optional if credentials are provided by environment. | +| `oss_region` | OSS region (for example, `cn-hangzhou`). Optional. | +| `oss_security_token` | Security token for temporary credentials (STS). Optional. | + +## Volcengine TOS Configuration + +TOS credentials can be set in the environment variables `TOS_ACCESS_KEY_ID`, +`TOS_SECRET_ACCESS_KEY`, `TOS_ENDPOINT`, `TOS_REGION`, and `TOS_SECURITY_TOKEN`. +Lance also accepts the corresponding `VOLCENGINE_` environment variable prefix. +Alternatively, credentials can be passed as parameters to the `storage_options` +parameter; explicit `storage_options` override environment variables: + +```python +import lance +ds = lance.dataset( + "tos://bucket/path", + storage_options={ + "tos_endpoint": "https://tos-cn-beijing.volces.com", + "tos_region": "cn-beijing", + "tos_access_key_id": "my-access-key", + "tos_secret_access_key": "my-secret-key", + "tos_security_token": "my-session-token", + } +) +``` + +| Key | Description | +|-----|-------------| +| `tos_endpoint` | TOS endpoint. Required (for example, `https://tos-cn-beijing.volces.com`). | +| `tos_region` | TOS signing region (for example, `cn-beijing`). Optional. | +| `tos_access_key_id` | Access key ID used for TOS authentication. Optional if credentials are provided by environment. | +| `tos_secret_access_key` | Secret access key used for TOS authentication. Optional if credentials are provided by environment. | +| `tos_security_token` | Security token for temporary credentials. Optional. | + ## HDFS Configuration HDFS support is optional and must be enabled when building Lance. For Rust builds, @@ -253,64 +311,6 @@ platform library path. Ensure the Hadoop configuration directory, commonly `${HADOOP_HOME}/etc/hadoop`, is included in the Hadoop classpath for HA, Kerberos, and other cluster-specific settings. -## AliCloud Object Storage Service Configuration - -OSS credentials can be set in the environment variables `OSS_ACCESS_KEY_ID`, -`OSS_ACCESS_KEY_SECRET`, `OSS_REGION`, and `OSS_SECURITY_TOKEN`. Alternatively, they can be -passed as parameters to the `storage_options` parameter: - -```python -import lance -ds = lance.dataset( - "oss://bucket/path", - storage_options={ - "oss_region": "oss-region", - "oss_endpoint": "oss-endpoint", - "oss_access_key_id": "my-access-key", - "oss_secret_access_key": "my-secret-key", - "oss_security_token": "my-session-token", - } -) -``` - -| Key | Description | -|-----|-------------| -| `oss_endpoint` | OSS endpoint. Required (for example, `https://oss-cn-hangzhou.aliyuncs.com`). | -| `oss_access_key_id` | Access key ID used for OSS authentication. Optional if credentials are provided by environment. | -| `oss_secret_access_key` | Access key secret used for OSS authentication. Optional if credentials are provided by environment. | -| `oss_region` | OSS region (for example, `cn-hangzhou`). Optional. | -| `oss_security_token` | Security token for temporary credentials (STS). Optional. | - -## Volcengine TOS Configuration - -TOS credentials can be set in the environment variables `TOS_ACCESS_KEY_ID`, -`TOS_SECRET_ACCESS_KEY`, `TOS_ENDPOINT`, `TOS_REGION`, and `TOS_SECURITY_TOKEN`. -Lance also accepts the corresponding `VOLCENGINE_` environment variable prefix. -Alternatively, credentials can be passed as parameters to the `storage_options` -parameter; explicit `storage_options` override environment variables: - -```python -import lance -ds = lance.dataset( - "tos://bucket/path", - storage_options={ - "tos_endpoint": "https://tos-cn-beijing.volces.com", - "tos_region": "cn-beijing", - "tos_access_key_id": "my-access-key", - "tos_secret_access_key": "my-secret-key", - "tos_security_token": "my-session-token", - } -) -``` - -| Key | Description | -|-----|-------------| -| `tos_endpoint` | TOS endpoint. Required (for example, `https://tos-cn-beijing.volces.com`). | -| `tos_region` | TOS signing region (for example, `cn-beijing`). Optional. | -| `tos_access_key_id` | Access key ID used for TOS authentication. Optional if credentials are provided by environment. | -| `tos_secret_access_key` | Secret access key used for TOS authentication. Optional if credentials are provided by environment. | -| `tos_security_token` | Security token for temporary credentials. Optional. | - ## Tencent Cloud COS Configuration [COS (Cloud Object Storage)](https://cloud.tencent.com/product/cos) credentials can be set in environment variables prefixed From af19ff3b7ffc41074313cc8672b335ca36c858b0 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Wed, 10 Jun 2026 18:20:32 +0800 Subject: [PATCH 09/21] optimize feature hdfs --- docs/src/guide/object_store.md | 9 +++- java/lance-jni/Cargo.toml | 2 +- python/Cargo.lock | 89 ++++++++++++++++++++++++++++++++++ python/Cargo.toml | 1 + rust/lance/Cargo.toml | 1 + 5 files changed, 100 insertions(+), 2 deletions(-) diff --git a/docs/src/guide/object_store.md b/docs/src/guide/object_store.md index 9690a1ad9e1..741d3a93311 100644 --- a/docs/src/guide/object_store.md +++ b/docs/src/guide/object_store.md @@ -252,10 +252,17 @@ ds = lance.dataset( ## HDFS Configuration HDFS support is optional and must be enabled when building Lance. For Rust builds, -enable the `hdfs` feature on `lance-io`. For Java builds, see the +enable the `hdfs` feature on `lance`. For Java builds, see the [Java HDFS build instructions](https://github.com/lance-format/lance/tree/main/java#hdfs-enabled-build). Prebuilt Lance packages may not include HDFS support. +To build the Python package with HDFS support from source: + +```bash +cd python +uv run maturin build --release --features hdfs +``` + Use an `hdfs://` URI containing a NameNode address or an HDFS high-availability nameservice: diff --git a/java/lance-jni/Cargo.toml b/java/lance-jni/Cargo.toml index baaaea2e316..67c7d730561 100644 --- a/java/lance-jni/Cargo.toml +++ b/java/lance-jni/Cargo.toml @@ -14,7 +14,7 @@ crate-type = ["cdylib"] [features] default = [] -hdfs = ["lance-io/hdfs"] +hdfs = ["lance/hdfs"] [dependencies] lance = { path = "../../rust/lance", features = ["substrait"] } diff --git a/python/Cargo.lock b/python/Cargo.lock index 01d2edda1c8..6e4a7e6db34 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -511,6 +511,12 @@ dependencies = [ "syn 2.0.118", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.89" @@ -1119,6 +1125,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "brotli" version = "8.0.4" @@ -2928,6 +2947,16 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.32" @@ -3314,6 +3343,30 @@ version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" +[[package]] +name = "hdfs-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33e2d5cefba2d51a26b44d2a493f963a32725a0f6593c91be4a610ad449c49cb" +dependencies = [ + "cc", + "java-locator", +] + +[[package]] +name = "hdrs" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c42a693bfe5dc8fcad1f24044c5ec355c5f157b8ce63c7d62f51cecbc7878d" +dependencies = [ + "blocking", + "errno", + "futures", + "hdfs-sys", + "libc", + "log", +] + [[package]] name = "heapify" version = "0.2.0" @@ -3883,6 +3936,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "java-locator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09c46c1fe465c59b1474e665e85e1256c3893dd00927b8d55f63b09044c1e64f" +dependencies = [ + "glob", +] + [[package]] name = "jieba-macros" version = "0.10.1" @@ -5290,6 +5352,7 @@ dependencies = [ "opendal-service-cos", "opendal-service-gcs", "opendal-service-goosefs", + "opendal-service-hdfs", "opendal-service-hf", "opendal-service-oss", "opendal-service-s3", @@ -5470,6 +5533,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-hdfs" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb51ce674ce98b8b7d9ac7d2dfc9d7fc8f1bbd1da1a5fb928ecc5f13ba7dc88a" +dependencies = [ + "bytes", + "futures", + "hdrs", + "log", + "opendal-core", + "serde", + "tokio", +] + [[package]] name = "opendal-service-hf" version = "0.57.0" @@ -5859,6 +5937,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c835479a4443ded371d6c535cbfd8d31ad92c5d23ae9770a61bc155e4992a3c1" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkcs1" version = "0.7.5" diff --git a/python/Cargo.toml b/python/Cargo.toml index 240c046e5ff..662357548d8 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -80,6 +80,7 @@ bytes = "1.4" default = [] datagen = ["lance-datagen"] fp16kernels = ["lance/fp16kernels"] +hdfs = ["lance/hdfs"] [profile.ci] debug = "line-tables-only" diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 440c3fb301a..f61c39493f3 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -158,6 +158,7 @@ azure = ["lance-io/azure"] oss = ["lance-io/oss"] tencent = ["lance-io/tencent"] goosefs = ["lance-io/goosefs"] +hdfs = ["lance-io/hdfs"] tos = ["lance-io/tos"] huggingface = ["lance-io/huggingface"] geo = ["lance-datafusion/geo", "lance-index/geo"] From aee5b6b13a7a60673a7107011286b6bfd13c9e33 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Mon, 15 Jun 2026 11:43:31 +0800 Subject: [PATCH 10/21] fix: filter empty string values in HDFS storage options Add .filter(|v| !v.is_empty()) to hdfs_name_node and hdfs_user storage options lookups for consistency with the kerberos and atomic_write_dir fields. --- rust/lance-io/src/object_store/providers/hdfs.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rust/lance-io/src/object_store/providers/hdfs.rs b/rust/lance-io/src/object_store/providers/hdfs.rs index eabd2c36882..37b2c7e9195 100644 --- a/rust/lance-io/src/object_store/providers/hdfs.rs +++ b/rust/lance-io/src/object_store/providers/hdfs.rs @@ -77,6 +77,7 @@ impl HdfsStoreProvider { let name_node = storage_options .0 .get("hdfs_name_node") + .filter(|v| !v.is_empty()) .cloned() .or_else(|| env_vars.get("HDFS_NAME_NODE").cloned()) .unwrap_or_else(|| format!("hdfs://{}", base_path.authority())); @@ -89,6 +90,7 @@ impl HdfsStoreProvider { let user = storage_options .0 .get("hdfs_user") + .filter(|v| !v.is_empty()) .cloned() .or_else(|| env_vars.get("HADOOP_USER_NAME").cloned()) .or_else(|| env_vars.get("HDFS_USER").cloned()); From 4155f24c5a65140163928f388ca25dabd0952a8b Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Mon, 15 Jun 2026 11:48:49 +0800 Subject: [PATCH 11/21] refactor: remove duplicate test_hdfs_url_parsing integration test This test was a duplicate of test_hdfs_store_paths in the unit test file, both exercise extract_path logic with the same URL patterns. --- rust/lance-io/tests/hdfs_integration.rs | 27 ------------------------- 1 file changed, 27 deletions(-) diff --git a/rust/lance-io/tests/hdfs_integration.rs b/rust/lance-io/tests/hdfs_integration.rs index 55d753144f1..a2cc64175ba 100644 --- a/rust/lance-io/tests/hdfs_integration.rs +++ b/rust/lance-io/tests/hdfs_integration.rs @@ -16,7 +16,6 @@ mod tests { }; use object_store::ObjectStoreExt; use object_store::path::Path; - use url::Url; fn hdfs_available() -> bool { std::env::var("HDFS_NAME_NODE").is_ok() || std::env::var("HDFS_TEST_ENABLED").is_ok() @@ -71,32 +70,6 @@ mod tests { assert_eq!(path, Path::from("user/test")); } - #[tokio::test] - async fn test_hdfs_url_parsing() { - let registry = Arc::new(ObjectStoreRegistry::default()); - - let test_cases = vec![ - ("hdfs://localhost:9000/", ""), - ("hdfs://localhost:9000/user", "user"), - ( - "hdfs://localhost:9000/user/data/file.txt", - "user/data/file.txt", - ), - ("hdfs://namenode/path/to/file", "path/to/file"), - // HA configuration tests - ("hdfs://ht-hdfsqa/", ""), - ("hdfs://ht-hdfsqa/user/data", "user/data"), - ("hdfs://mycluster/tmp/test.txt", "tmp/test.txt"), - ]; - - for (url_str, expected_path) in test_cases { - let url = Url::parse(url_str).unwrap(); - let provider = registry.get_provider("hdfs").unwrap(); - let path = provider.extract_path(&url).unwrap(); - assert_eq!(path, Path::from(expected_path)); - } - } - #[tokio::test] async fn test_hdfs_basic_operations() { if !hdfs_available() { From 248661756ed05bd1584e36c93d7652d5fb0e1099 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Mon, 15 Jun 2026 11:59:36 +0800 Subject: [PATCH 12/21] docs: explain HDFS integration test opt-in --- rust/lance-io/tests/hdfs_integration.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/rust/lance-io/tests/hdfs_integration.rs b/rust/lance-io/tests/hdfs_integration.rs index a2cc64175ba..6d9980ffcc0 100644 --- a/rust/lance-io/tests/hdfs_integration.rs +++ b/rust/lance-io/tests/hdfs_integration.rs @@ -3,8 +3,20 @@ //! Integration tests for HDFS object store provider //! -//! These tests require a running HDFS cluster. They can be run with: -//! cargo test --features hdfs hdfs_integration_test +//! These tests need an existing HDFS cluster and are skipped unless +//! `HDFS_NAME_NODE` or `HDFS_TEST_ENABLED` is set. +//! +//! Set `HDFS_NAME_NODE` to the name node or nameservice address +//! (e.g., `hdfs://namenode:9000` or `hdfs://mycluster`). Set +//! `HDFS_TEST_ENABLED` to any value to run the tests when the name node is +//! provided by Hadoop configuration files instead. `HDFS_TEST_ENABLED` only +//! controls whether the tests are skipped; it does not configure HDFS or +//! verify connectivity. +//! +//! Example: +//! HDFS_NAME_NODE=hdfs://localhost:9000 cargo test --features hdfs hdfs_integration_test +//! HDFS_TEST_ENABLED=1 cargo test --features hdfs hdfs_integration_test +//! #[cfg(feature = "hdfs")] mod tests { @@ -17,6 +29,7 @@ mod tests { use object_store::ObjectStoreExt; use object_store::path::Path; + /// Returns whether the environment explicitly enables the HDFS integration tests. fn hdfs_available() -> bool { std::env::var("HDFS_NAME_NODE").is_ok() || std::env::var("HDFS_TEST_ENABLED").is_ok() } From 1df96eac40d8bf808be0512ce6a3be39ac21aa85 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Mon, 15 Jun 2026 12:08:54 +0800 Subject: [PATCH 13/21] test: remove ineffective HDFS config validation --- rust/lance-io/tests/hdfs_integration.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/rust/lance-io/tests/hdfs_integration.rs b/rust/lance-io/tests/hdfs_integration.rs index 6d9980ffcc0..fd1c966e1e7 100644 --- a/rust/lance-io/tests/hdfs_integration.rs +++ b/rust/lance-io/tests/hdfs_integration.rs @@ -116,22 +116,6 @@ mod tests { store.inner.delete(&test_path).await.unwrap(); } - #[test] - fn test_hdfs_config_validation() { - // Test that missing namenode configuration is properly handled - let storage_options = HashMap::new(); - let params = ObjectStoreParams { - storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( - storage_options, - ))), - ..Default::default() - }; - - // This test verifies the configuration validation logic without actually connecting - // The actual connection would fail, but we're testing the config validation - assert!(params.storage_options().is_some()); - } - #[tokio::test] async fn test_hdfs_ha_configuration() { if !hdfs_available() { From cc0cc65bcd16c8822d119619f29ec90cf0128b34 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Mon, 15 Jun 2026 12:19:44 +0800 Subject: [PATCH 14/21] test: explicitly ignore HDFS integration tests --- rust/lance-io/tests/hdfs_integration.rs | 41 ++++++------------------- 1 file changed, 9 insertions(+), 32 deletions(-) diff --git a/rust/lance-io/tests/hdfs_integration.rs b/rust/lance-io/tests/hdfs_integration.rs index fd1c966e1e7..86bc3c761b7 100644 --- a/rust/lance-io/tests/hdfs_integration.rs +++ b/rust/lance-io/tests/hdfs_integration.rs @@ -3,20 +3,14 @@ //! Integration tests for HDFS object store provider //! -//! These tests need an existing HDFS cluster and are skipped unless -//! `HDFS_NAME_NODE` or `HDFS_TEST_ENABLED` is set. +//! These tests need an existing HDFS cluster and are ignored by default. //! //! Set `HDFS_NAME_NODE` to the name node or nameservice address -//! (e.g., `hdfs://namenode:9000` or `hdfs://mycluster`). Set -//! `HDFS_TEST_ENABLED` to any value to run the tests when the name node is -//! provided by Hadoop configuration files instead. `HDFS_TEST_ENABLED` only -//! controls whether the tests are skipped; it does not configure HDFS or -//! verify connectivity. -//! -//! Example: -//! HDFS_NAME_NODE=hdfs://localhost:9000 cargo test --features hdfs hdfs_integration_test -//! HDFS_TEST_ENABLED=1 cargo test --features hdfs hdfs_integration_test +//! (e.g., `hdfs://namenode:9000` or `hdfs://mycluster`) when it should +//! override the authority in the test URI. //! +//! Run: +//! HDFS_NAME_NODE=hdfs://localhost:9000 cargo test -p lance-io --features hdfs --test hdfs_integration -- --ignored #[cfg(feature = "hdfs")] mod tests { @@ -29,17 +23,9 @@ mod tests { use object_store::ObjectStoreExt; use object_store::path::Path; - /// Returns whether the environment explicitly enables the HDFS integration tests. - fn hdfs_available() -> bool { - std::env::var("HDFS_NAME_NODE").is_ok() || std::env::var("HDFS_TEST_ENABLED").is_ok() - } - + #[ignore = "Requires HDFS cluster"] #[tokio::test] async fn test_hdfs_store_creation() { - if !hdfs_available() { - return; - } - let registry = Arc::new(ObjectStoreRegistry::default()); let url = "hdfs://localhost:9000/test/path"; let params = ObjectStoreParams::default(); @@ -52,12 +38,9 @@ mod tests { assert_eq!(path, Path::from("test/path")); } + #[ignore = "Requires HDFS cluster"] #[tokio::test] async fn test_hdfs_store_with_custom_config() { - if !hdfs_available() { - return; - } - let registry = Arc::new(ObjectStoreRegistry::default()); let url = "hdfs://namenode:9000/user/test"; @@ -83,12 +66,9 @@ mod tests { assert_eq!(path, Path::from("user/test")); } + #[ignore = "Requires HDFS cluster"] #[tokio::test] async fn test_hdfs_basic_operations() { - if !hdfs_available() { - return; - } - let registry = Arc::new(ObjectStoreRegistry::default()); let url = "hdfs://localhost:9000/test"; let params = ObjectStoreParams::default(); @@ -116,12 +96,9 @@ mod tests { store.inner.delete(&test_path).await.unwrap(); } + #[ignore = "Requires HDFS cluster"] #[tokio::test] async fn test_hdfs_ha_configuration() { - if !hdfs_available() { - return; - } - // Test HA configuration like hdfs://ht-hdfsqa let registry = Arc::new(ObjectStoreRegistry::default()); let url = "hdfs://ht-hdfsqa/user/test"; From 535ab49126d0608d0962c399d573ffa2fde03e94 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Fri, 19 Jun 2026 21:48:54 +0800 Subject: [PATCH 15/21] fix: set list_is_lexically_ordered to false for HDFS --- rust/lance-io/src/object_store/providers/hdfs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance-io/src/object_store/providers/hdfs.rs b/rust/lance-io/src/object_store/providers/hdfs.rs index 37b2c7e9195..05224329b55 100644 --- a/rust/lance-io/src/object_store/providers/hdfs.rs +++ b/rust/lance-io/src/object_store/providers/hdfs.rs @@ -137,7 +137,7 @@ impl ObjectStoreProvider for HdfsStoreProvider { block_size, max_iop_size: *DEFAULT_MAX_IOP_SIZE, use_constant_size_upload_parts: params.use_constant_size_upload_parts, - list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or(true), + list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or(false), io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count: storage_options.download_retry_count(), io_tracker: Default::default(), From 988ad4bf37accddfee9a1fb80fe1f64bb5a814a9 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Fri, 19 Jun 2026 23:36:21 +0800 Subject: [PATCH 16/21] fix: derive store_prefix from resolved effective NameNode for HDFS --- .../src/object_store/providers/hdfs.rs | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/rust/lance-io/src/object_store/providers/hdfs.rs b/rust/lance-io/src/object_store/providers/hdfs.rs index 05224329b55..09b2fcde428 100644 --- a/rust/lance-io/src/object_store/providers/hdfs.rs +++ b/rust/lance-io/src/object_store/providers/hdfs.rs @@ -112,6 +112,27 @@ impl HdfsStoreProvider { Ok(config) } + + /// Calculate the object store prefix from the resolved effective NameNode. + /// + /// The prefix identifies the datastore for caching and must reflect the + /// actual HDFS cluster the client connects to — not just the URI authority. + /// NameNode resolution follows the same priority as `build_config`: + /// `hdfs_name_node` → `HDFS_NAME_NODE` → URI authority. + fn calculate_object_store_prefix_with_env( + url: &Url, + storage_options: Option<&HashMap>, + env_vars: &HashMap, + ) -> Result { + let authority = storage_options + .and_then(|opts| opts.get("hdfs_name_node")) + .filter(|v| !v.is_empty()) + .cloned() + .or_else(|| env_vars.get("HDFS_NAME_NODE").cloned()) + .unwrap_or_else(|| url.authority().to_string()); + + Ok(format!("{}${}", url.scheme(), authority)) + } } #[async_trait::async_trait] @@ -145,6 +166,15 @@ impl ObjectStoreProvider for HdfsStoreProvider { .calculate_object_store_prefix(&base_path, params.storage_options())?, }) } + + fn calculate_object_store_prefix( + &self, + url: &Url, + storage_options: Option<&HashMap>, + ) -> Result { + let env_vars = std::env::vars().collect::>(); + Self::calculate_object_store_prefix_with_env(url, storage_options, &env_vars) + } } #[cfg(test)] From 87a6af7f2e879a91e237aa8d6f6f3fac3b1e2004 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Fri, 19 Jun 2026 23:45:13 +0800 Subject: [PATCH 17/21] feat: add safe HDFS rename commit handler via libhdfs --- Cargo.lock | 1 + Cargo.toml | 1 + rust/lance-table/Cargo.toml | 2 + rust/lance-table/src/io/commit.rs | 10 +- rust/lance-table/src/io/commit/hdfs_rename.rs | 196 ++++++++++++++++++ rust/lance/Cargo.toml | 2 +- 6 files changed, 210 insertions(+), 2 deletions(-) create mode 100644 rust/lance-table/src/io/commit/hdfs_rename.rs diff --git a/Cargo.lock b/Cargo.lock index 3c03c30d68c..843ab253a26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5096,6 +5096,7 @@ dependencies = [ "chrono", "criterion", "futures", + "hdrs", "lance-arrow", "lance-core", "lance-datagen", diff --git a/Cargo.toml b/Cargo.toml index f902f10496b..35a2ec1803d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,6 +104,7 @@ half = { "version" = "2.1", default-features = false, features = [ "num-traits", "std", ] } +hdrs = "0.3.2" lance-bitpacking = { version = "=8.1.0-beta.0", path = "./rust/compression/bitpacking" } bitpacking = "0.9" bitvec = "1" diff --git a/rust/lance-table/Cargo.toml b/rust/lance-table/Cargo.toml index 042ae92c618..c1790b19030 100644 --- a/rust/lance-table/Cargo.toml +++ b/rust/lance-table/Cargo.toml @@ -43,6 +43,7 @@ snafu.workspace = true tokio.workspace = true tracing.workspace = true url.workspace = true +hdrs = { workspace = true, optional = true } uuid.workspace = true [dev-dependencies] @@ -59,6 +60,7 @@ prost-build.workspace = true protobuf-src = { version = "2.1", optional = true } [features] +hdfs = ["dep:hdrs", "lance-io/hdfs"] dynamodb = ["dep:aws-sdk-dynamodb", "dep:aws-credential-types", "lance-io/aws"] protoc = ["dep:protobuf-src"] diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index e1a4086730b..1d159229e48 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -47,6 +47,8 @@ use url::Url; #[cfg(feature = "dynamodb")] pub mod dynamodb; +#[cfg(feature = "hdfs")] +mod hdfs_rename; pub mod external_manifest; use lance_core::{Error, Result}; @@ -762,7 +764,7 @@ pub fn list_detached_manifests<'a>( .boxed() } -fn make_staging_manifest_path(base: &Path) -> Result { +pub(super) fn make_staging_manifest_path(base: &Path) -> Result { let id = uuid::Uuid::new_v4().to_string(); Path::parse(format!("{base}-{id}")).map_err(|e| Error::io_source(Box::new(e))) } @@ -1094,6 +1096,12 @@ pub async fn commit_handler_from_url( "s3" | "gs" | "az" | "abfss" | "memory" | "oss" | "cos" | "shared-memory" => { Ok(Arc::new(ConditionalPutCommitHandler)) } + #[cfg(feature = "hdfs")] + "hdfs" => { + let params = options.clone().unwrap_or_default(); + let client = hdfs_rename::connect_hdrs_client(&url, ¶ms).await?; + Ok(Arc::new(hdfs_rename::HdfsRenameCommitHandler::new(client))) + } #[cfg(not(feature = "dynamodb"))] "s3+ddb" => Err(Error::invalid_input_source( "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(), diff --git a/rust/lance-table/src/io/commit/hdfs_rename.rs b/rust/lance-table/src/io/commit/hdfs_rename.rs new file mode 100644 index 00000000000..32471cee533 --- /dev/null +++ b/rust/lance-table/src/io/commit/hdfs_rename.rs @@ -0,0 +1,196 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! HDFS manifest commit via libhdfs [`hdrs::Client::rename_file`] (NameNode rename RPC). +//! +//! Matches [`super::RenameCommitHandler`] semantics: write staging manifest, then rename into +//! place. On HDFS, `rename` fails when the destination path already exists, which we treat as +//! [`super::CommitError::CommitConflict`] for transaction retry. + +use std::collections::HashMap; +use std::io; +use std::sync::Arc; + +use hdrs::ClientBuilder; +use object_store::path::Path; +use tokio::task; +use url::Url; + +use lance_core::{Error, Result}; + +use super::{ + CommitError, CommitHandler, ManifestLocation, ManifestNamingScheme, ManifestWriter, + make_staging_manifest_path, +}; +use crate::format::{IndexMetadata, Manifest, Transaction}; +use lance_io::object_store::ObjectStore; +use lance_io::object_store::ObjectStoreParams; + +/// [`CommitHandler`] for HDFS: staging write + atomic rename via libhdfs (same idea as +/// [`super::RenameCommitHandler`], but uses `hdfsRename` instead of object_store's +/// `rename_if_not_exists`). +#[derive(Debug)] +pub struct HdfsRenameCommitHandler { + client: Arc, +} + +impl HdfsRenameCommitHandler { + pub fn new(client: Arc) -> Self { + Self { client } + } +} + +/// Connect a libhdfs client for commit operations. Must match the same NameNode / user as the +/// OpenDAL HDFS [`ObjectStore`](lance_io::object_store::ObjectStore) used for I/O. +pub async fn connect_hdrs_client( + url: &Url, + params: &ObjectStoreParams, +) -> Result> { + let (name_node, user) = resolve_hdfs_namenode_and_user(url, params)?; + let join = task::spawn_blocking(move || { + let user_ref: Option<&str> = user.as_deref(); + let mut builder = ClientBuilder::new(&name_node); + if let Some(u) = user_ref { + builder = builder.with_user(u); + } + builder.connect().map_err(|e| { + Error::io(format!( + "Failed to connect HDFS client for manifest commit: {e}" + )) + }) + }) + .await + .map_err(|e| Error::io(format!("HDFS connect task failed: {e}")))?; + Ok(Arc::new(join?)) +} + +fn resolve_hdfs_namenode_and_user( + url: &Url, + params: &ObjectStoreParams, +) -> Result<(String, Option)> { + let namenode = url + .host_str() + .ok_or_else(|| Error::invalid_input("HDFS URL must contain namenode host"))? + .to_string(); + + let storage_map: HashMap = params + .storage_options() + .cloned() + .unwrap_or_default(); + + let hadoop_conf_dir = std::env::var("HADOOP_CONF_DIR").ok(); + + let name_node_url = if let Some(port) = url.port() { + format!("hdfs://{namenode}:{port}") + } else { + format!("hdfs://{namenode}") + }; + + let is_logical_name = url.port().is_none() && !namenode.contains('.'); + + let name_node = if let Some(nn) = storage_map + .get("hdfs_name_node") + .filter(|v| !v.is_empty()) + { + nn.clone() + } else if let Ok(nn) = std::env::var("HDFS_NAME_NODE") { + if nn.is_empty() { + name_node_url + } else { + nn + } + } else { + if is_logical_name && hadoop_conf_dir.is_none() { + return Err(Error::invalid_input(format!( + "HDFS HA logical name '{namenode}' cannot be resolved without HADOOP_CONF_DIR or hdfs_name_node" + ))); + } + name_node_url + }; + + let user = if let Some(u) = storage_map + .get("hdfs_user") + .filter(|v| !v.is_empty()) + { + Some(u.clone()) + } else if let Ok(u) = std::env::var("HADOOP_USER_NAME") { + if u.is_empty() { None } else { Some(u) } + } else if let Ok(u) = std::env::var("HDFS_USER") { + if u.is_empty() { None } else { Some(u) } + } else { + std::env::var("USER") + .or_else(|_| std::env::var("LOGNAME")) + .ok() + .filter(|s| !s.is_empty()) + }; + + Ok((name_node, user)) +} + +fn object_store_path_to_hdfs_abs(path: &Path) -> String { + let s = path.as_ref(); + if s.is_empty() { + "/".to_string() + } else if s.starts_with('/') { + s.to_string() + } else { + format!("/{s}") + } +} + +fn hdfs_rename_implies_conflict(err: &io::Error) -> bool { + if err.kind() == io::ErrorKind::AlreadyExists { + return true; + } + let msg = err.to_string().to_lowercase(); + msg.contains("already exists") + || msg.contains("file exists") + || msg.contains("filealreadyexists") + || msg.contains("rename destination") +} + +#[async_trait::async_trait] +impl CommitHandler for HdfsRenameCommitHandler { + async fn commit( + &self, + manifest: &mut Manifest, + indices: Option>, + base_path: &Path, + object_store: &ObjectStore, + manifest_writer: ManifestWriter, + naming_scheme: ManifestNamingScheme, + transaction: Option, + ) -> std::result::Result { + let path = naming_scheme.manifest_path(base_path, manifest.version); + let tmp_path = make_staging_manifest_path(&path)?; + + let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?; + + let client = Arc::clone(&self.client); + let tmp_abs = object_store_path_to_hdfs_abs(&tmp_path); + let final_abs = object_store_path_to_hdfs_abs(&path); + + let rename_result = + task::spawn_blocking(move || client.rename_file(&tmp_abs, &final_abs)).await; + + match rename_result { + Ok(Ok(())) => Ok(ManifestLocation { + version: manifest.version, + path, + size: Some(res.size as u64), + naming_scheme, + e_tag: None, + }), + Ok(Err(e)) if hdfs_rename_implies_conflict(&e) => { + let _ = object_store.delete(&tmp_path).await; + Err(CommitError::CommitConflict) + } + Ok(Err(e)) => Err(CommitError::OtherError(Error::io(format!( + "HDFS rename failed (staging -> manifest): {e}" + )))), + Err(e) => Err(CommitError::OtherError(Error::io(format!( + "HDFS rename task join error: {e}" + )))), + } + } +} diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index f61c39493f3..03d80472fa6 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -158,7 +158,7 @@ azure = ["lance-io/azure"] oss = ["lance-io/oss"] tencent = ["lance-io/tencent"] goosefs = ["lance-io/goosefs"] -hdfs = ["lance-io/hdfs"] +hdfs = ["lance-io/hdfs", "lance-table/hdfs"] tos = ["lance-io/tos"] huggingface = ["lance-io/huggingface"] geo = ["lance-datafusion/geo", "lance-index/geo"] From d87b8d5704767db2143930d512ebeeceb891c0d3 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Sat, 20 Jun 2026 00:05:31 +0800 Subject: [PATCH 18/21] style: fix cargo fmt issues in hdfs commit handler --- rust/lance-table/src/io/commit.rs | 2 +- rust/lance-table/src/io/commit/hdfs_rename.rs | 22 +++++-------------- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index 1d159229e48..5535d5509e9 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -47,9 +47,9 @@ use url::Url; #[cfg(feature = "dynamodb")] pub mod dynamodb; +pub mod external_manifest; #[cfg(feature = "hdfs")] mod hdfs_rename; -pub mod external_manifest; use lance_core::{Error, Result}; use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams}; diff --git a/rust/lance-table/src/io/commit/hdfs_rename.rs b/rust/lance-table/src/io/commit/hdfs_rename.rs index 32471cee533..2ed29f75ba4 100644 --- a/rust/lance-table/src/io/commit/hdfs_rename.rs +++ b/rust/lance-table/src/io/commit/hdfs_rename.rs @@ -73,10 +73,8 @@ fn resolve_hdfs_namenode_and_user( .ok_or_else(|| Error::invalid_input("HDFS URL must contain namenode host"))? .to_string(); - let storage_map: HashMap = params - .storage_options() - .cloned() - .unwrap_or_default(); + let storage_map: HashMap = + params.storage_options().cloned().unwrap_or_default(); let hadoop_conf_dir = std::env::var("HADOOP_CONF_DIR").ok(); @@ -88,17 +86,10 @@ fn resolve_hdfs_namenode_and_user( let is_logical_name = url.port().is_none() && !namenode.contains('.'); - let name_node = if let Some(nn) = storage_map - .get("hdfs_name_node") - .filter(|v| !v.is_empty()) - { + let name_node = if let Some(nn) = storage_map.get("hdfs_name_node").filter(|v| !v.is_empty()) { nn.clone() } else if let Ok(nn) = std::env::var("HDFS_NAME_NODE") { - if nn.is_empty() { - name_node_url - } else { - nn - } + if nn.is_empty() { name_node_url } else { nn } } else { if is_logical_name && hadoop_conf_dir.is_none() { return Err(Error::invalid_input(format!( @@ -108,10 +99,7 @@ fn resolve_hdfs_namenode_and_user( name_node_url }; - let user = if let Some(u) = storage_map - .get("hdfs_user") - .filter(|v| !v.is_empty()) - { + let user = if let Some(u) = storage_map.get("hdfs_user").filter(|v| !v.is_empty()) { Some(u.clone()) } else if let Ok(u) = std::env::var("HADOOP_USER_NAME") { if u.is_empty() { None } else { Some(u) } From 1cc22157ae0e9803d24420d36a8709d15f07c368 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Sat, 20 Jun 2026 00:11:47 +0800 Subject: [PATCH 19/21] test: add unit tests for HDFS commit handler helper functions --- rust/lance-table/src/io/commit/hdfs_rename.rs | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/rust/lance-table/src/io/commit/hdfs_rename.rs b/rust/lance-table/src/io/commit/hdfs_rename.rs index 2ed29f75ba4..265daf22807 100644 --- a/rust/lance-table/src/io/commit/hdfs_rename.rs +++ b/rust/lance-table/src/io/commit/hdfs_rename.rs @@ -25,6 +25,7 @@ use super::{ use crate::format::{IndexMetadata, Manifest, Transaction}; use lance_io::object_store::ObjectStore; use lance_io::object_store::ObjectStoreParams; +use lance_io::object_store::StorageOptionsAccessor; /// [`CommitHandler`] for HDFS: staging write + atomic rename via libhdfs (same idea as /// [`super::RenameCommitHandler`], but uses `hdfsRename` instead of object_store's @@ -182,3 +183,86 @@ impl CommitHandler for HdfsRenameCommitHandler { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_object_store_path_to_hdfs_abs() { + assert_eq!(object_store_path_to_hdfs_abs(&Path::from("")), "/"); + assert_eq!( + object_store_path_to_hdfs_abs(&Path::from("/data/manifest")), + "/data/manifest" + ); + assert_eq!( + object_store_path_to_hdfs_abs(&Path::from("data/manifest")), + "/data/manifest" + ); + } + + #[test] + fn test_hdfs_rename_implies_conflict() { + let already_exists = io::Error::new(io::ErrorKind::AlreadyExists, "file exists"); + assert!(hdfs_rename_implies_conflict(&already_exists)); + + let other = io::Error::new(io::ErrorKind::Other, "rename destination already exists"); + assert!(hdfs_rename_implies_conflict(&other)); + + let file_exists = + io::Error::new(io::ErrorKind::Other, "FileAlreadyExistsException at path"); + assert!(hdfs_rename_implies_conflict(&file_exists)); + + let unrelated = io::Error::new(io::ErrorKind::Other, "connection refused"); + assert!(!hdfs_rename_implies_conflict(&unrelated)); + } + + #[test] + fn test_resolve_namenode_from_uri_authority() { + let url = Url::parse("hdfs://namenode:9000/data").unwrap(); + let params = ObjectStoreParams::default(); + let (name_node, _user) = resolve_hdfs_namenode_and_user(&url, ¶ms).unwrap(); + assert_eq!(name_node, "hdfs://namenode:9000"); + } + + #[test] + fn test_resolve_namenode_from_storage_options() { + let url = Url::parse("hdfs://namenode:9000/data").unwrap(); + let mut opts = HashMap::new(); + opts.insert( + "hdfs_name_node".to_string(), + "hdfs://override:8020".to_string(), + ); + let params = ObjectStoreParams { + storage_options_accessor: Some(Arc::new( + lance_io::object_store::StorageOptionsAccessor::with_static_options(opts), + )), + ..Default::default() + }; + let (name_node, _user) = resolve_hdfs_namenode_and_user(&url, ¶ms).unwrap(); + assert_eq!(name_node, "hdfs://override:8020"); + } + + #[test] + fn test_resolve_user_from_storage_options() { + let url = Url::parse("hdfs://namenode:9000/data").unwrap(); + let mut opts = HashMap::new(); + opts.insert("hdfs_user".to_string(), "hduser".to_string()); + let params = ObjectStoreParams { + storage_options_accessor: Some(Arc::new( + lance_io::object_store::StorageOptionsAccessor::with_static_options(opts), + )), + ..Default::default() + }; + let (_name_node, user) = resolve_hdfs_namenode_and_user(&url, ¶ms).unwrap(); + assert_eq!(user, Some("hduser".to_string())); + } + + #[test] + fn test_resolve_namenode_rejects_no_host() { + let url = Url::parse("hdfs:///data").unwrap(); + let params = ObjectStoreParams::default(); + let err = resolve_hdfs_namenode_and_user(&url, ¶ms).unwrap_err(); + assert!(err.to_string().contains("namenode host")); + } +} From 3de1e1b7b2cb2394e12c7af395605a83e966b790 Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Sat, 20 Jun 2026 00:20:40 +0800 Subject: [PATCH 20/21] fix: move StorageOptionsAccessor import into test module --- rust/lance-table/src/io/commit/hdfs_rename.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/lance-table/src/io/commit/hdfs_rename.rs b/rust/lance-table/src/io/commit/hdfs_rename.rs index 265daf22807..6a4e20efdfb 100644 --- a/rust/lance-table/src/io/commit/hdfs_rename.rs +++ b/rust/lance-table/src/io/commit/hdfs_rename.rs @@ -25,7 +25,6 @@ use super::{ use crate::format::{IndexMetadata, Manifest, Transaction}; use lance_io::object_store::ObjectStore; use lance_io::object_store::ObjectStoreParams; -use lance_io::object_store::StorageOptionsAccessor; /// [`CommitHandler`] for HDFS: staging write + atomic rename via libhdfs (same idea as /// [`super::RenameCommitHandler`], but uses `hdfsRename` instead of object_store's @@ -188,6 +187,8 @@ impl CommitHandler for HdfsRenameCommitHandler { mod tests { use super::*; + use lance_io::object_store::StorageOptionsAccessor; + #[test] fn test_object_store_path_to_hdfs_abs() { assert_eq!(object_store_path_to_hdfs_abs(&Path::from("")), "/"); From 3ac5474234725a6a1a992cc2fcd07ba05ea28e6a Mon Sep 17 00:00:00 2001 From: "zhanghaobo@kanzhun.com" Date: Sat, 20 Jun 2026 00:55:35 +0800 Subject: [PATCH 21/21] chore: sync python/Cargo.lock with workspace hdrs dep --- python/Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/python/Cargo.lock b/python/Cargo.lock index 6e4a7e6db34..ccaabc440dc 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4644,6 +4644,7 @@ dependencies = [ "bytes", "chrono", "futures", + "hdrs", "lance-arrow", "lance-core", "lance-file",