diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index e3c17671ce3..8e1fad87688 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -68,7 +68,7 @@ jobs: sudo apt install -y protobuf-compiler libssl-dev - name: Get features run: | - ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | sort | uniq | paste -s -d "," -` + ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` echo "ALL_FEATURES=${ALL_FEATURES}" >> $GITHUB_ENV - name: Clippy run: cargo clippy --profile ci --locked --features ${{ env.ALL_FEATURES }} --all-targets -- -D warnings @@ -110,7 +110,7 @@ jobs: uses: taiki-e/install-action@66068bfca13dcb2ea07c3f613ca2836a37c755d5 # cargo-llvm-cov - name: Run tests run: | - ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests | sort | uniq | paste -s -d "," -` + ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` cargo +nightly llvm-cov --profile ci --locked --workspace --codecov --output-path coverage.codecov --features ${ALL_FEATURES} - name: Upload coverage to Codecov uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4 @@ -137,13 +137,13 @@ jobs: sudo apt install -y protobuf-compiler libssl-dev pkg-config - name: Build tests run: | - ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests | sort | uniq | paste -s -d "," -` + ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` cargo test --profile ci --locked --features ${ALL_FEATURES} --no-run - name: Start DynamodDB and S3 run: docker compose -f docker-compose.yml up -d --wait - name: Run tests run: | - ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests | sort | uniq | paste -s -d "," -` + ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` cargo test --profile ci --locked --features ${ALL_FEATURES} query-integration-tests: runs-on: ubuntu-24.04-4x @@ -195,7 +195,7 @@ jobs: sudo apt install -y protobuf-compiler libssl-dev - name: Build all run: | - ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests | sort | uniq | paste -s -d "," -` + ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` cargo build --profile ci --benches --features ${ALL_FEATURES} --tests mac-build: runs-on: warp-macos-14-arm64-6x @@ -280,5 +280,5 @@ jobs: env: RUSTUP_TOOLCHAIN: ${{ matrix.msrv }} run: | - ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests | sort | uniq | paste -s -d "," -` + ALL_FEATURES=`cargo metadata --format-version=1 --no-deps | jq -r '.packages[] | .features | keys | .[]' | grep -v -e protoc -e slow_tests -e hdfs | sort | uniq | paste -s -d "," -` cargo check --profile ci --workspace --tests --benches --features ${ALL_FEATURES} diff --git a/Cargo.lock b/Cargo.lock index 89d20bdf647..843ab253a26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -463,6 +463,12 @@ dependencies = [ "syn 2.0.118", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.89" @@ -1179,6 +1185,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "bs58" version = "0.5.1" @@ -1455,7 +1474,7 @@ version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2764,7 +2783,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2916,7 +2935,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3147,6 +3166,16 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.32" @@ -3536,6 +3565,30 @@ version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" +[[package]] +name = "hdfs-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33e2d5cefba2d51a26b44d2a493f963a32725a0f6593c91be4a610ad449c49cb" +dependencies = [ + "cc", + "java-locator", +] + +[[package]] +name = "hdrs" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c42a693bfe5dc8fcad1f24044c5ec355c5f157b8ce63c7d62f51cecbc7878d" +dependencies = [ + "blocking", + "errno", + "futures", + "hdfs-sys", + "libc", + "log", +] + [[package]] name = "heapify" version = "0.2.0" @@ -4145,7 +4198,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -4196,6 +4249,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "java-locator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09c46c1fe465c59b1474e665e85e1256c3893dd00927b8d55f63b09044c1e64f" +dependencies = [ + "glob", +] + [[package]] name = "jieba-macros" version = "0.10.1" @@ -5034,6 +5096,7 @@ dependencies = [ "chrono", "criterion", "futures", + "hdrs", "lance-arrow", "lance-core", "lance-datagen", @@ -5681,7 +5744,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -5960,6 +6023,7 @@ dependencies = [ "opendal-service-cos", "opendal-service-gcs", "opendal-service-goosefs", + "opendal-service-hdfs", "opendal-service-hf", "opendal-service-oss", "opendal-service-s3", @@ -6140,6 +6204,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-hdfs" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb51ce674ce98b8b7d9ac7d2dfc9d7fc8f1bbd1da1a5fb928ecc5f13ba7dc88a" +dependencies = [ + "bytes", + "futures", + "hdrs", + "log", + "opendal-core", + "serde", + "tokio", +] + [[package]] name = "opendal-service-hf" version = "0.57.0" @@ -6569,6 +6648,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c835479a4443ded371d6c535cbfd8d31ad92c5d23ae9770a61bc155e4992a3c1" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkcs1" version = "0.7.5" @@ -6804,7 +6894,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03da047801ff44bb6a4d407d4860c05fd70bb81714e6b2f3812603d5b145b042" dependencies = [ "heck", - "itertools 0.14.0", + "itertools 0.11.0", "log", "multimap", "petgraph", @@ -6823,7 +6913,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.118", @@ -6964,7 +7054,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.60.2", + "windows-sys 0.59.0", ] [[package]] @@ -7685,7 +7775,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -7744,7 +7834,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -8630,7 +8720,7 @@ dependencies = [ "getrandom 0.4.3", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -9654,7 +9744,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f902f10496b..35a2ec1803d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,6 +104,7 @@ half = { "version" = "2.1", default-features = false, features = [ "num-traits", "std", ] } +hdrs = "0.3.2" lance-bitpacking = { version = "=8.1.0-beta.0", path = "./rust/compression/bitpacking" } bitpacking = "0.9" bitvec = "1" diff --git a/docs/src/guide/object_store.md b/docs/src/guide/object_store.md index f901d2c2411..741d3a93311 100644 --- a/docs/src/guide/object_store.md +++ b/docs/src/guide/object_store.md @@ -249,6 +249,75 @@ ds = lance.dataset( | `tos_secret_access_key` | Secret access key used for TOS authentication. Optional if credentials are provided by environment. | | `tos_security_token` | Security token for temporary credentials. Optional. | +## HDFS Configuration + +HDFS support is optional and must be enabled when building Lance. For Rust builds, +enable the `hdfs` feature on `lance`. For Java builds, see the +[Java HDFS build instructions](https://github.com/lance-format/lance/tree/main/java#hdfs-enabled-build). +Prebuilt Lance packages may not include HDFS support. + +To build the Python package with HDFS support from source: + +```bash +cd python +uv run maturin build --release --features hdfs +``` + +Use an `hdfs://` URI containing a NameNode address or an HDFS high-availability +nameservice: + +```python +import lance + +ds = lance.dataset("hdfs://namenode:9000/user/lance/my-dataset") +``` + +For high-availability clusters configured in Hadoop XML files, the URI authority +can be the nameservice: + +```python +ds = lance.dataset("hdfs://mycluster/user/lance/my-dataset") +``` + +Explicit `storage_options` take priority over environment variables. If neither +specifies a NameNode, Lance uses the URI authority. + +```python +ds = lance.dataset( + "hdfs://namenode:9000/user/lance/my-dataset", + storage_options={ + "hdfs_name_node": "hdfs://namenode:8020", + "hdfs_user": "lance", + "hdfs_kerberos_ticket_cache_path": "/tmp/krb5cc_lance", + "hdfs_atomic_write_dir": "/tmp/lance-hdfs-atomic", + }, +) +``` + +| `storage_options` key | Environment variable | Description | +|-----------------------|----------------------|-------------| +| `hdfs_name_node` | `HDFS_NAME_NODE` | NameNode URI or HA nameservice. Defaults to the `hdfs://` URI authority. | +| `hdfs_user` | `HADOOP_USER_NAME`, then `HDFS_USER` | HDFS user name. The storage option takes priority, followed by the environment variables in the listed order. | +| `hdfs_kerberos_ticket_cache_path` | None | Path to the Kerberos ticket cache used to authenticate with HDFS. | +| `hdfs_atomic_write_dir` | None | HDFS directory used by OpenDAL for atomic writes. | + +Lance's HDFS provider uses OpenDAL's HDFS service, which depends on `hdrs` and +`hdfs-sys`. Building and running an HDFS-enabled artifact requires a local Java +and Hadoop native environment: + +```bash +export JAVA_HOME=/path/to/java +export HADOOP_HOME=/path/to/hadoop +export CLASSPATH="$(${HADOOP_HOME}/bin/hadoop classpath --glob)" +export LD_LIBRARY_PATH="${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native:${LD_LIBRARY_PATH}" +``` + +`hdfs-sys` dynamically links `libjvm`. If it uses a dynamically linked +`libhdfs`, `${HADOOP_HOME}/lib/native` must also be discoverable through the +platform library path. Ensure the Hadoop configuration directory, commonly +`${HADOOP_HOME}/etc/hadoop`, is included in the Hadoop classpath for HA, +Kerberos, and other cluster-specific settings. + ## Tencent Cloud COS Configuration [COS (Cloud Object Storage)](https://cloud.tencent.com/product/cos) credentials can be set in environment variables prefixed diff --git a/java/README.md b/java/README.md index b49a4892527..9601cf90cc4 100644 --- a/java/README.md +++ b/java/README.md @@ -224,6 +224,22 @@ mvn clean package ``` This command executes the base Maven build process to compile all Java code in the `java` directory and generate the JNI native library. +HDFS-enabled Build: + +HDFS support is not enabled by default because it introduces additional native dependencies and requires a Java and Hadoop environment. To build the Java SDK and JNI native library with HDFS support, pass the `hdfs` Cargo feature through the Rust Maven plugin: + +```shell +mvn clean package -Dfeatures=hdfs +``` + +For a release build with HDFS support: + +```shell +mvn clean package -Drust.release.build=true -Dfeatures=hdfs +``` + +The resulting native library requires the Hadoop native client libraries at runtime. Configure `JAVA_HOME`, Hadoop configuration, and the platform library path, such as `LD_LIBRARY_PATH`, before loading the Java SDK. + Java-Only Build: ```shell @@ -241,9 +257,11 @@ This will enable product environment optimization configurations (e.g., code shr If you only want to build rust code(`lance-jni`), you can run the following command: ```shell -cd lance-jni && cargo build +cd lance-jni && cargo build --features hdfs ``` +Omit `--features hdfs` to build the JNI native library without HDFS support. + The java module uses `spotless` maven plugin to format the code and check the license header. And it is applied in the `validate` phase automatically. diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index ee52544ba57..d1a99d55f87 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -414,6 +414,12 @@ dependencies = [ "syn", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.89" @@ -989,6 +995,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "bs58" version = "0.5.1" @@ -2548,6 +2567,16 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.32" @@ -2925,6 +2954,30 @@ version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" +[[package]] +name = "hdfs-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33e2d5cefba2d51a26b44d2a493f963a32725a0f6593c91be4a610ad449c49cb" +dependencies = [ + "cc", + "java-locator", +] + +[[package]] +name = "hdrs" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c42a693bfe5dc8fcad1f24044c5ec355c5f157b8ce63c7d62f51cecbc7878d" +dependencies = [ + "blocking", + "errno", + "futures", + "hdfs-sys", + "libc", + "log", +] + [[package]] name = "heapify" version = "0.2.0" @@ -3488,6 +3541,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "java-locator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09c46c1fe465c59b1474e665e85e1256c3893dd00927b8d55f63b09044c1e64f" +dependencies = [ + "glob", +] + [[package]] name = "jiff" version = "0.2.28" @@ -4825,6 +4887,7 @@ dependencies = [ "opendal-service-cos", "opendal-service-gcs", "opendal-service-goosefs", + "opendal-service-hdfs", "opendal-service-hf", "opendal-service-oss", "opendal-service-s3", @@ -5005,6 +5068,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-hdfs" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb51ce674ce98b8b7d9ac7d2dfc9d7fc8f1bbd1da1a5fb928ecc5f13ba7dc88a" +dependencies = [ + "bytes", + "futures", + "hdrs", + "log", + "opendal-core", + "serde", + "tokio", +] + [[package]] name = "opendal-service-hf" version = "0.57.0" @@ -5310,6 +5388,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c835479a4443ded371d6c535cbfd8d31ad92c5d23ae9770a61bc155e4992a3c1" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkcs1" version = "0.7.5" diff --git a/java/lance-jni/Cargo.toml b/java/lance-jni/Cargo.toml index 6210c5daf1d..67c7d730561 100644 --- a/java/lance-jni/Cargo.toml +++ b/java/lance-jni/Cargo.toml @@ -14,6 +14,7 @@ crate-type = ["cdylib"] [features] default = [] +hdfs = ["lance/hdfs"] [dependencies] lance = { path = "../../rust/lance", features = ["substrait"] } diff --git a/python/Cargo.lock b/python/Cargo.lock index 01d2edda1c8..ccaabc440dc 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -511,6 +511,12 @@ dependencies = [ "syn 2.0.118", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.89" @@ -1119,6 +1125,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "brotli" version = "8.0.4" @@ -2928,6 +2947,16 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.32" @@ -3314,6 +3343,30 @@ version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a" +[[package]] +name = "hdfs-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33e2d5cefba2d51a26b44d2a493f963a32725a0f6593c91be4a610ad449c49cb" +dependencies = [ + "cc", + "java-locator", +] + +[[package]] +name = "hdrs" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c42a693bfe5dc8fcad1f24044c5ec355c5f157b8ce63c7d62f51cecbc7878d" +dependencies = [ + "blocking", + "errno", + "futures", + "hdfs-sys", + "libc", + "log", +] + [[package]] name = "heapify" version = "0.2.0" @@ -3883,6 +3936,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "java-locator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09c46c1fe465c59b1474e665e85e1256c3893dd00927b8d55f63b09044c1e64f" +dependencies = [ + "glob", +] + [[package]] name = "jieba-macros" version = "0.10.1" @@ -4582,6 +4644,7 @@ dependencies = [ "bytes", "chrono", "futures", + "hdrs", "lance-arrow", "lance-core", "lance-file", @@ -5290,6 +5353,7 @@ dependencies = [ "opendal-service-cos", "opendal-service-gcs", "opendal-service-goosefs", + "opendal-service-hdfs", "opendal-service-hf", "opendal-service-oss", "opendal-service-s3", @@ -5470,6 +5534,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-hdfs" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb51ce674ce98b8b7d9ac7d2dfc9d7fc8f1bbd1da1a5fb928ecc5f13ba7dc88a" +dependencies = [ + "bytes", + "futures", + "hdrs", + "log", + "opendal-core", + "serde", + "tokio", +] + [[package]] name = "opendal-service-hf" version = "0.57.0" @@ -5859,6 +5938,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c835479a4443ded371d6c535cbfd8d31ad92c5d23ae9770a61bc155e4992a3c1" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkcs1" version = "0.7.5" diff --git a/python/Cargo.toml b/python/Cargo.toml index 240c046e5ff..662357548d8 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -80,6 +80,7 @@ bytes = "1.4" default = [] datagen = ["lance-datagen"] fp16kernels = ["lance/fp16kernels"] +hdfs = ["lance/hdfs"] [profile.ci] debug = "line-tables-only" diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index 6cee04d5fd9..ad75f206e7b 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -78,6 +78,7 @@ tencent = ["dep:opendal", "opendal/services-cos", "dep:object_store_opendal"] huggingface = ["dep:opendal", "opendal/services-huggingface", "dep:object_store_opendal"] tos = ["dep:opendal", "opendal/services-tos", "dep:object_store_opendal"] tos-test = ["tos"] +hdfs = ["dep:opendal", "opendal/services-hdfs", "dep:object_store_opendal"] test-util = [] [lints] diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 0c44095f117..ce97e06ea1f 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -61,6 +61,7 @@ pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8; pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64; const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size + #[cfg(any( feature = "aws", feature = "gcp", @@ -69,6 +70,7 @@ const DEFAULT_LOCAL_BLOCK_SIZE: usize = 4 * 1024; // 4KB block size feature = "tencent", feature = "huggingface", feature = "tos", + feature = "hdfs", ))] const DEFAULT_CLOUD_BLOCK_SIZE: usize = 64 * 1024; // 64KB block size diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index 45ac30a757a..e6ee194d50d 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -26,6 +26,8 @@ pub mod azure; pub mod gcp; #[cfg(feature = "goosefs")] pub mod goosefs; +#[cfg(feature = "hdfs")] +pub mod hdfs; #[cfg(feature = "huggingface")] pub mod huggingface; pub mod local; @@ -100,6 +102,7 @@ pub struct ObjectStoreRegistryStats { /// - `az`: An Azure Blob Storage object store. /// - `gs`: A Google Cloud Storage object store. /// - `tos`: A Volcengine TOS object store. +/// - `hdfs`: A Hadoop FileSystem object store. /// /// Use [`Self::empty()`] to create an empty registry, with no providers registered. /// @@ -339,6 +342,8 @@ impl Default for ObjectStoreRegistry { providers.insert("hf".into(), Arc::new(huggingface::HuggingfaceStoreProvider)); #[cfg(feature = "tos")] providers.insert("tos".into(), Arc::new(tos::TosStoreProvider)); + #[cfg(feature = "hdfs")] + providers.insert("hdfs".into(), Arc::new(hdfs::HdfsStoreProvider)); Self { providers: RwLock::new(providers), active_stores: RwLock::new(HashMap::new()), diff --git a/rust/lance-io/src/object_store/providers/hdfs.rs b/rust/lance-io/src/object_store/providers/hdfs.rs new file mode 100644 index 00000000000..09b2fcde428 --- /dev/null +++ b/rust/lance-io/src/object_store/providers/hdfs.rs @@ -0,0 +1,300 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! HDFS object store provider backed by OpenDAL. +//! +//! HDFS support is optional and requires the `hdfs` Cargo feature. Dataset URIs +//! use the form `hdfs:///`, for example +//! `hdfs://namenode:9000/user/lance/dataset` or +//! `hdfs://mycluster/user/lance/dataset` for an HA nameservice. +//! +//! Configuration priority is `storage_options`, environment variables, then +//! the URI authority: +//! +//! | `storage_options` key | Environment variable | OpenDAL option | +//! | --- | --- | --- | +//! | `hdfs_name_node` | `HDFS_NAME_NODE` | `name_node` | +//! | `hdfs_user` | `HADOOP_USER_NAME`, then `HDFS_USER` | `user` | +//! | `hdfs_kerberos_ticket_cache_path` | None | `kerberos_ticket_cache_path` | +//! | `hdfs_atomic_write_dir` | None | `atomic_write_dir` | +//! +//! OpenDAL's HDFS service depends on `hdrs` and `hdfs-sys`. Building and +//! running it requires a local Java and Hadoop native environment. In +//! particular, configure `JAVA_HOME`, `HADOOP_HOME`, `CLASSPATH`, and the +//! platform library path so that `libjvm` and, when dynamically linked, +//! `libhdfs` can be discovered. + +use std::collections::HashMap; +use std::sync::Arc; + +use object_store_opendal::OpendalStore; +use opendal::{Operator, services::Hdfs}; +use url::Url; + +use crate::object_store::{ + DEFAULT_CLOUD_BLOCK_SIZE, DEFAULT_CLOUD_IO_PARALLELISM, DEFAULT_MAX_IOP_SIZE, ObjectStore, + ObjectStoreParams, ObjectStoreProvider, StorageOptions, +}; +use lance_core::error::{Error, Result}; + +/// HDFS object store provider backed by OpenDAL. +#[derive(Default, Debug)] +pub struct HdfsStoreProvider; + +impl HdfsStoreProvider { + fn operator_error(error: impl std::fmt::Display, name_node: &str, has_user: bool) -> Error { + Error::io(format!( + "Failed to create HDFS operator: {error}. name_node={name_node}, has_user={has_user}" + )) + } + + fn build_config( + base_path: &Url, + storage_options: &StorageOptions, + env_vars: I, + ) -> Result> + where + I: IntoIterator, + K: AsRef, + V: Into, + { + base_path + .host_str() + .ok_or_else(|| Error::invalid_input("HDFS URI must contain namenode host"))?; + + let env_vars = env_vars + .into_iter() + .filter_map(|(key, value)| { + let value = value.into(); + if value.is_empty() { + None + } else { + Some((key.as_ref().to_string(), value)) + } + }) + .collect::>(); + + let name_node = storage_options + .0 + .get("hdfs_name_node") + .filter(|v| !v.is_empty()) + .cloned() + .or_else(|| env_vars.get("HDFS_NAME_NODE").cloned()) + .unwrap_or_else(|| format!("hdfs://{}", base_path.authority())); + + let mut config = HashMap::from([ + ("name_node".to_string(), name_node), + ("root".to_string(), "/".to_string()), + ]); + + let user = storage_options + .0 + .get("hdfs_user") + .filter(|v| !v.is_empty()) + .cloned() + .or_else(|| env_vars.get("HADOOP_USER_NAME").cloned()) + .or_else(|| env_vars.get("HDFS_USER").cloned()); + if let Some(user) = user { + config.insert("user".to_string(), user); + } + + for (storage_key, config_key) in [ + ( + "hdfs_kerberos_ticket_cache_path", + "kerberos_ticket_cache_path", + ), + ("hdfs_atomic_write_dir", "atomic_write_dir"), + ] { + if let Some(value) = storage_options.0.get(storage_key).filter(|v| !v.is_empty()) { + config.insert(config_key.to_string(), value.clone()); + } + } + + Ok(config) + } + + /// Calculate the object store prefix from the resolved effective NameNode. + /// + /// The prefix identifies the datastore for caching and must reflect the + /// actual HDFS cluster the client connects to — not just the URI authority. + /// NameNode resolution follows the same priority as `build_config`: + /// `hdfs_name_node` → `HDFS_NAME_NODE` → URI authority. + fn calculate_object_store_prefix_with_env( + url: &Url, + storage_options: Option<&HashMap>, + env_vars: &HashMap, + ) -> Result { + let authority = storage_options + .and_then(|opts| opts.get("hdfs_name_node")) + .filter(|v| !v.is_empty()) + .cloned() + .or_else(|| env_vars.get("HDFS_NAME_NODE").cloned()) + .unwrap_or_else(|| url.authority().to_string()); + + Ok(format!("{}${}", url.scheme(), authority)) + } +} + +#[async_trait::async_trait] +impl ObjectStoreProvider for HdfsStoreProvider { + async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result { + let block_size = params.block_size.unwrap_or(DEFAULT_CLOUD_BLOCK_SIZE); + let storage_options = StorageOptions(params.storage_options().cloned().unwrap_or_default()); + let config = Self::build_config(&base_path, &storage_options, std::env::vars())?; + + let name_node = config + .get("name_node") + .cloned() + .unwrap_or_else(|| "".to_string()); + let has_user = config.contains_key("user"); + let operator = Operator::from_iter::(config) + .map_err(|error| Self::operator_error(error, &name_node, has_user))? + .finish(); + let opendal_store = Arc::new(OpendalStore::new(operator)); + + Ok(ObjectStore { + scheme: "hdfs".to_string(), + inner: opendal_store, + block_size, + max_iop_size: *DEFAULT_MAX_IOP_SIZE, + use_constant_size_upload_parts: params.use_constant_size_upload_parts, + list_is_lexically_ordered: params.list_is_lexically_ordered.unwrap_or(false), + io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, + download_retry_count: storage_options.download_retry_count(), + io_tracker: Default::default(), + store_prefix: self + .calculate_object_store_prefix(&base_path, params.storage_options())?, + }) + } + + fn calculate_object_store_prefix( + &self, + url: &Url, + storage_options: Option<&HashMap>, + ) -> Result { + let env_vars = std::env::vars().collect::>(); + Self::calculate_object_store_prefix_with_env(url, storage_options, &env_vars) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::object_store::ObjectStoreProvider; + use object_store::path::Path; + + #[test] + fn test_hdfs_store_paths() { + let provider = HdfsStoreProvider; + let cases = [ + ("hdfs://namenode:9000/path/to/file", "path/to/file"), + ("hdfs://namenode/path/to/file", "path/to/file"), + ("hdfs://namenode:9000/", ""), + ( + "hdfs://namenode:9000/user/data/dataset/file.parquet", + "user/data/dataset/file.parquet", + ), + ("hdfs://ht-hdfsqa/user/data/file.txt", "user/data/file.txt"), + ]; + + for (url, expected_path) in cases { + let path = provider.extract_path(&Url::parse(url).unwrap()).unwrap(); + assert_eq!(path, Path::from(expected_path)); + } + } + + #[test] + fn test_hdfs_config_from_url() { + let url = Url::parse("hdfs://namenode:9000/test").unwrap(); + let config = HdfsStoreProvider::build_config( + &url, + &StorageOptions::default(), + Vec::<(&str, &str)>::new(), + ) + .unwrap(); + + assert_eq!(config.get("name_node").unwrap(), "hdfs://namenode:9000"); + assert_eq!(config.get("root").unwrap(), "/"); + } + + #[test] + fn test_hdfs_storage_options_override_environment_and_url() { + let url = Url::parse("hdfs://url-namenode:9000/test").unwrap(); + let storage_options = StorageOptions(HashMap::from([ + ( + "hdfs_name_node".to_string(), + "hdfs://option-namenode:8020".to_string(), + ), + ("hdfs_user".to_string(), "option-user".to_string()), + ( + "hdfs_kerberos_ticket_cache_path".to_string(), + "/tmp/krb5cc".to_string(), + ), + ( + "hdfs_atomic_write_dir".to_string(), + "/tmp/atomic".to_string(), + ), + ])); + let env_vars = [ + ("HDFS_NAME_NODE", "hdfs://env-namenode:9000"), + ("HADOOP_USER_NAME", "env-user"), + ]; + + let config = HdfsStoreProvider::build_config(&url, &storage_options, env_vars).unwrap(); + + assert_eq!( + config.get("name_node").unwrap(), + "hdfs://option-namenode:8020" + ); + assert_eq!(config.get("user").unwrap(), "option-user"); + assert_eq!( + config.get("kerberos_ticket_cache_path").unwrap(), + "/tmp/krb5cc" + ); + assert_eq!(config.get("atomic_write_dir").unwrap(), "/tmp/atomic"); + } + + #[test] + fn test_hdfs_config_from_environment() { + let url = Url::parse("hdfs://url-namenode:9000/test").unwrap(); + let env_vars = [ + ("HDFS_NAME_NODE", "hdfs://env-namenode:9000"), + ("HADOOP_USER_NAME", "env-user"), + ]; + + let config = + HdfsStoreProvider::build_config(&url, &StorageOptions::default(), env_vars).unwrap(); + + assert_eq!(config.get("name_node").unwrap(), "hdfs://env-namenode:9000"); + assert_eq!(config.get("user").unwrap(), "env-user"); + } + + #[test] + fn test_hdfs_config_rejects_url_without_host() { + let url = Url::parse("hdfs:///test").unwrap(); + let error = HdfsStoreProvider::build_config( + &url, + &StorageOptions::default(), + Vec::<(&str, &str)>::new(), + ) + .unwrap_err(); + + assert!(matches!(error, Error::InvalidInput { .. })); + assert!(error.to_string().contains("namenode host")); + } + + #[test] + fn test_hdfs_operator_error_includes_connection_context() { + let error = HdfsStoreProvider::operator_error( + std::io::Error::other("native client unavailable"), + "hdfs://namenode:9000", + true, + ); + let message = error.to_string(); + + assert!(matches!(error, Error::IO { .. })); + assert!(message.contains("native client unavailable")); + assert!(message.contains("name_node=hdfs://namenode:9000")); + assert!(message.contains("has_user=true")); + } +} diff --git a/rust/lance-io/tests/hdfs_integration.rs b/rust/lance-io/tests/hdfs_integration.rs new file mode 100644 index 00000000000..86bc3c761b7 --- /dev/null +++ b/rust/lance-io/tests/hdfs_integration.rs @@ -0,0 +1,114 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Integration tests for HDFS object store provider +//! +//! These tests need an existing HDFS cluster and are ignored by default. +//! +//! Set `HDFS_NAME_NODE` to the name node or nameservice address +//! (e.g., `hdfs://namenode:9000` or `hdfs://mycluster`) when it should +//! override the authority in the test URI. +//! +//! Run: +//! HDFS_NAME_NODE=hdfs://localhost:9000 cargo test -p lance-io --features hdfs --test hdfs_integration -- --ignored + +#[cfg(feature = "hdfs")] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use lance_io::object_store::{ + ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor, + }; + use object_store::ObjectStoreExt; + use object_store::path::Path; + + #[ignore = "Requires HDFS cluster"] + #[tokio::test] + async fn test_hdfs_store_creation() { + let registry = Arc::new(ObjectStoreRegistry::default()); + let url = "hdfs://localhost:9000/test/path"; + let params = ObjectStoreParams::default(); + + let (store, path) = ObjectStore::from_uri_and_params(registry, url, ¶ms) + .await + .unwrap(); + + assert_eq!(store.scheme(), "hdfs"); + assert_eq!(path, Path::from("test/path")); + } + + #[ignore = "Requires HDFS cluster"] + #[tokio::test] + async fn test_hdfs_store_with_custom_config() { + let registry = Arc::new(ObjectStoreRegistry::default()); + let url = "hdfs://namenode:9000/user/test"; + + let mut storage_options = HashMap::new(); + storage_options.insert("hdfs_user".to_string(), "testuser".to_string()); + storage_options.insert( + "hdfs_name_node".to_string(), + "hdfs://namenode:9000".to_string(), + ); + + let params = ObjectStoreParams { + storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options( + storage_options, + ))), + ..Default::default() + }; + + let (store, path) = ObjectStore::from_uri_and_params(registry, url, ¶ms) + .await + .unwrap(); + + assert_eq!(store.scheme(), "hdfs"); + assert_eq!(path, Path::from("user/test")); + } + + #[ignore = "Requires HDFS cluster"] + #[tokio::test] + async fn test_hdfs_basic_operations() { + let registry = Arc::new(ObjectStoreRegistry::default()); + let url = "hdfs://localhost:9000/test"; + let params = ObjectStoreParams::default(); + + let (store, _) = ObjectStore::from_uri_and_params(registry, url, ¶ms) + .await + .unwrap(); + let test_path = Path::from("test_file.txt"); + let test_data = bytes::Bytes::from("Hello, HDFS!"); + + store + .inner + .put(&test_path, test_data.clone().into()) + .await + .unwrap(); + let read_data = store + .inner + .get(&test_path) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(read_data, test_data); + store.inner.delete(&test_path).await.unwrap(); + } + + #[ignore = "Requires HDFS cluster"] + #[tokio::test] + async fn test_hdfs_ha_configuration() { + // Test HA configuration like hdfs://ht-hdfsqa + let registry = Arc::new(ObjectStoreRegistry::default()); + let url = "hdfs://ht-hdfsqa/user/test"; + let params = ObjectStoreParams::default(); + + let (store, path) = ObjectStore::from_uri_and_params(registry, url, ¶ms) + .await + .unwrap(); + + assert_eq!(store.scheme(), "hdfs"); + assert_eq!(path, Path::from("user/test")); + } +} diff --git a/rust/lance-table/Cargo.toml b/rust/lance-table/Cargo.toml index 042ae92c618..c1790b19030 100644 --- a/rust/lance-table/Cargo.toml +++ b/rust/lance-table/Cargo.toml @@ -43,6 +43,7 @@ snafu.workspace = true tokio.workspace = true tracing.workspace = true url.workspace = true +hdrs = { workspace = true, optional = true } uuid.workspace = true [dev-dependencies] @@ -59,6 +60,7 @@ prost-build.workspace = true protobuf-src = { version = "2.1", optional = true } [features] +hdfs = ["dep:hdrs", "lance-io/hdfs"] dynamodb = ["dep:aws-sdk-dynamodb", "dep:aws-credential-types", "lance-io/aws"] protoc = ["dep:protobuf-src"] diff --git a/rust/lance-table/src/io/commit.rs b/rust/lance-table/src/io/commit.rs index e1a4086730b..5535d5509e9 100644 --- a/rust/lance-table/src/io/commit.rs +++ b/rust/lance-table/src/io/commit.rs @@ -48,6 +48,8 @@ use url::Url; #[cfg(feature = "dynamodb")] pub mod dynamodb; pub mod external_manifest; +#[cfg(feature = "hdfs")] +mod hdfs_rename; use lance_core::{Error, Result}; use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams}; @@ -762,7 +764,7 @@ pub fn list_detached_manifests<'a>( .boxed() } -fn make_staging_manifest_path(base: &Path) -> Result { +pub(super) fn make_staging_manifest_path(base: &Path) -> Result { let id = uuid::Uuid::new_v4().to_string(); Path::parse(format!("{base}-{id}")).map_err(|e| Error::io_source(Box::new(e))) } @@ -1094,6 +1096,12 @@ pub async fn commit_handler_from_url( "s3" | "gs" | "az" | "abfss" | "memory" | "oss" | "cos" | "shared-memory" => { Ok(Arc::new(ConditionalPutCommitHandler)) } + #[cfg(feature = "hdfs")] + "hdfs" => { + let params = options.clone().unwrap_or_default(); + let client = hdfs_rename::connect_hdrs_client(&url, ¶ms).await?; + Ok(Arc::new(hdfs_rename::HdfsRenameCommitHandler::new(client))) + } #[cfg(not(feature = "dynamodb"))] "s3+ddb" => Err(Error::invalid_input_source( "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(), diff --git a/rust/lance-table/src/io/commit/hdfs_rename.rs b/rust/lance-table/src/io/commit/hdfs_rename.rs new file mode 100644 index 00000000000..6a4e20efdfb --- /dev/null +++ b/rust/lance-table/src/io/commit/hdfs_rename.rs @@ -0,0 +1,269 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! HDFS manifest commit via libhdfs [`hdrs::Client::rename_file`] (NameNode rename RPC). +//! +//! Matches [`super::RenameCommitHandler`] semantics: write staging manifest, then rename into +//! place. On HDFS, `rename` fails when the destination path already exists, which we treat as +//! [`super::CommitError::CommitConflict`] for transaction retry. + +use std::collections::HashMap; +use std::io; +use std::sync::Arc; + +use hdrs::ClientBuilder; +use object_store::path::Path; +use tokio::task; +use url::Url; + +use lance_core::{Error, Result}; + +use super::{ + CommitError, CommitHandler, ManifestLocation, ManifestNamingScheme, ManifestWriter, + make_staging_manifest_path, +}; +use crate::format::{IndexMetadata, Manifest, Transaction}; +use lance_io::object_store::ObjectStore; +use lance_io::object_store::ObjectStoreParams; + +/// [`CommitHandler`] for HDFS: staging write + atomic rename via libhdfs (same idea as +/// [`super::RenameCommitHandler`], but uses `hdfsRename` instead of object_store's +/// `rename_if_not_exists`). +#[derive(Debug)] +pub struct HdfsRenameCommitHandler { + client: Arc, +} + +impl HdfsRenameCommitHandler { + pub fn new(client: Arc) -> Self { + Self { client } + } +} + +/// Connect a libhdfs client for commit operations. Must match the same NameNode / user as the +/// OpenDAL HDFS [`ObjectStore`](lance_io::object_store::ObjectStore) used for I/O. +pub async fn connect_hdrs_client( + url: &Url, + params: &ObjectStoreParams, +) -> Result> { + let (name_node, user) = resolve_hdfs_namenode_and_user(url, params)?; + let join = task::spawn_blocking(move || { + let user_ref: Option<&str> = user.as_deref(); + let mut builder = ClientBuilder::new(&name_node); + if let Some(u) = user_ref { + builder = builder.with_user(u); + } + builder.connect().map_err(|e| { + Error::io(format!( + "Failed to connect HDFS client for manifest commit: {e}" + )) + }) + }) + .await + .map_err(|e| Error::io(format!("HDFS connect task failed: {e}")))?; + Ok(Arc::new(join?)) +} + +fn resolve_hdfs_namenode_and_user( + url: &Url, + params: &ObjectStoreParams, +) -> Result<(String, Option)> { + let namenode = url + .host_str() + .ok_or_else(|| Error::invalid_input("HDFS URL must contain namenode host"))? + .to_string(); + + let storage_map: HashMap = + params.storage_options().cloned().unwrap_or_default(); + + let hadoop_conf_dir = std::env::var("HADOOP_CONF_DIR").ok(); + + let name_node_url = if let Some(port) = url.port() { + format!("hdfs://{namenode}:{port}") + } else { + format!("hdfs://{namenode}") + }; + + let is_logical_name = url.port().is_none() && !namenode.contains('.'); + + let name_node = if let Some(nn) = storage_map.get("hdfs_name_node").filter(|v| !v.is_empty()) { + nn.clone() + } else if let Ok(nn) = std::env::var("HDFS_NAME_NODE") { + if nn.is_empty() { name_node_url } else { nn } + } else { + if is_logical_name && hadoop_conf_dir.is_none() { + return Err(Error::invalid_input(format!( + "HDFS HA logical name '{namenode}' cannot be resolved without HADOOP_CONF_DIR or hdfs_name_node" + ))); + } + name_node_url + }; + + let user = if let Some(u) = storage_map.get("hdfs_user").filter(|v| !v.is_empty()) { + Some(u.clone()) + } else if let Ok(u) = std::env::var("HADOOP_USER_NAME") { + if u.is_empty() { None } else { Some(u) } + } else if let Ok(u) = std::env::var("HDFS_USER") { + if u.is_empty() { None } else { Some(u) } + } else { + std::env::var("USER") + .or_else(|_| std::env::var("LOGNAME")) + .ok() + .filter(|s| !s.is_empty()) + }; + + Ok((name_node, user)) +} + +fn object_store_path_to_hdfs_abs(path: &Path) -> String { + let s = path.as_ref(); + if s.is_empty() { + "/".to_string() + } else if s.starts_with('/') { + s.to_string() + } else { + format!("/{s}") + } +} + +fn hdfs_rename_implies_conflict(err: &io::Error) -> bool { + if err.kind() == io::ErrorKind::AlreadyExists { + return true; + } + let msg = err.to_string().to_lowercase(); + msg.contains("already exists") + || msg.contains("file exists") + || msg.contains("filealreadyexists") + || msg.contains("rename destination") +} + +#[async_trait::async_trait] +impl CommitHandler for HdfsRenameCommitHandler { + async fn commit( + &self, + manifest: &mut Manifest, + indices: Option>, + base_path: &Path, + object_store: &ObjectStore, + manifest_writer: ManifestWriter, + naming_scheme: ManifestNamingScheme, + transaction: Option, + ) -> std::result::Result { + let path = naming_scheme.manifest_path(base_path, manifest.version); + let tmp_path = make_staging_manifest_path(&path)?; + + let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?; + + let client = Arc::clone(&self.client); + let tmp_abs = object_store_path_to_hdfs_abs(&tmp_path); + let final_abs = object_store_path_to_hdfs_abs(&path); + + let rename_result = + task::spawn_blocking(move || client.rename_file(&tmp_abs, &final_abs)).await; + + match rename_result { + Ok(Ok(())) => Ok(ManifestLocation { + version: manifest.version, + path, + size: Some(res.size as u64), + naming_scheme, + e_tag: None, + }), + Ok(Err(e)) if hdfs_rename_implies_conflict(&e) => { + let _ = object_store.delete(&tmp_path).await; + Err(CommitError::CommitConflict) + } + Ok(Err(e)) => Err(CommitError::OtherError(Error::io(format!( + "HDFS rename failed (staging -> manifest): {e}" + )))), + Err(e) => Err(CommitError::OtherError(Error::io(format!( + "HDFS rename task join error: {e}" + )))), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use lance_io::object_store::StorageOptionsAccessor; + + #[test] + fn test_object_store_path_to_hdfs_abs() { + assert_eq!(object_store_path_to_hdfs_abs(&Path::from("")), "/"); + assert_eq!( + object_store_path_to_hdfs_abs(&Path::from("/data/manifest")), + "/data/manifest" + ); + assert_eq!( + object_store_path_to_hdfs_abs(&Path::from("data/manifest")), + "/data/manifest" + ); + } + + #[test] + fn test_hdfs_rename_implies_conflict() { + let already_exists = io::Error::new(io::ErrorKind::AlreadyExists, "file exists"); + assert!(hdfs_rename_implies_conflict(&already_exists)); + + let other = io::Error::new(io::ErrorKind::Other, "rename destination already exists"); + assert!(hdfs_rename_implies_conflict(&other)); + + let file_exists = + io::Error::new(io::ErrorKind::Other, "FileAlreadyExistsException at path"); + assert!(hdfs_rename_implies_conflict(&file_exists)); + + let unrelated = io::Error::new(io::ErrorKind::Other, "connection refused"); + assert!(!hdfs_rename_implies_conflict(&unrelated)); + } + + #[test] + fn test_resolve_namenode_from_uri_authority() { + let url = Url::parse("hdfs://namenode:9000/data").unwrap(); + let params = ObjectStoreParams::default(); + let (name_node, _user) = resolve_hdfs_namenode_and_user(&url, ¶ms).unwrap(); + assert_eq!(name_node, "hdfs://namenode:9000"); + } + + #[test] + fn test_resolve_namenode_from_storage_options() { + let url = Url::parse("hdfs://namenode:9000/data").unwrap(); + let mut opts = HashMap::new(); + opts.insert( + "hdfs_name_node".to_string(), + "hdfs://override:8020".to_string(), + ); + let params = ObjectStoreParams { + storage_options_accessor: Some(Arc::new( + lance_io::object_store::StorageOptionsAccessor::with_static_options(opts), + )), + ..Default::default() + }; + let (name_node, _user) = resolve_hdfs_namenode_and_user(&url, ¶ms).unwrap(); + assert_eq!(name_node, "hdfs://override:8020"); + } + + #[test] + fn test_resolve_user_from_storage_options() { + let url = Url::parse("hdfs://namenode:9000/data").unwrap(); + let mut opts = HashMap::new(); + opts.insert("hdfs_user".to_string(), "hduser".to_string()); + let params = ObjectStoreParams { + storage_options_accessor: Some(Arc::new( + lance_io::object_store::StorageOptionsAccessor::with_static_options(opts), + )), + ..Default::default() + }; + let (_name_node, user) = resolve_hdfs_namenode_and_user(&url, ¶ms).unwrap(); + assert_eq!(user, Some("hduser".to_string())); + } + + #[test] + fn test_resolve_namenode_rejects_no_host() { + let url = Url::parse("hdfs:///data").unwrap(); + let params = ObjectStoreParams::default(); + let err = resolve_hdfs_namenode_and_user(&url, ¶ms).unwrap_err(); + assert!(err.to_string().contains("namenode host")); + } +} diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 440c3fb301a..03d80472fa6 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -158,6 +158,7 @@ azure = ["lance-io/azure"] oss = ["lance-io/oss"] tencent = ["lance-io/tencent"] goosefs = ["lance-io/goosefs"] +hdfs = ["lance-io/hdfs", "lance-table/hdfs"] tos = ["lance-io/tos"] huggingface = ["lance-io/huggingface"] geo = ["lance-datafusion/geo", "lance-index/geo"]