Skip to content

Commit bbfd39d

Browse files
davanstrienclaude
andcommitted
refactor: replace custom XET sink with OpenDAL ObjectStore for HF URLs
Replace the custom HfBucketSinkNode (1,050 lines of XET-specific code) with a standard ObjectStore implementation backed by OpenDAL's HF service. HF URLs now flow through the same FileSink path as S3/GCS/Azure, requiring only a thin build_hf() builder in a new hf.rs module. Key changes: - Add crates/polars-io/src/cloud/hf.rs: HF URL parsing, token extraction, and OpenDAL ObjectStore construction (~175 lines) - Wire CloudType::Hf in object_store_setup.rs to call build_hf(), matching the pattern used by build_aws/build_gcp/build_azure - Delete custom sink: hf_bucket/ directory (4 files, 721 lines), HfBucketSinkNode (260 lines), IR lowering special-case, PhysNodeKind::HfBucketSink variant - Rename feature flag hf_bucket_sink -> hf across 9 Cargo.toml files - Bump object_store_opendal compatibility from object_store 0.12 to 0.13 Dependencies: opendal + object_store_opendal (local path deps for now, will switch to published crate versions once apache/opendal#7185 ships). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent fdf3e21 commit bbfd39d

24 files changed

Lines changed: 436 additions & 1448 deletions

File tree

Cargo.lock

Lines changed: 222 additions & 381 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ num-derive = "0.4.2"
7373
num-traits = "0.2"
7474
numpy = "0.28"
7575
object_store = { version = "0.13.1", default-features = false, features = ["fs"] }
76+
object_store_opendal = { version = "0.55.0", default-features = false }
77+
opendal = { version = "0.55.0", default-features = false }
7678
parking_lot = "0.12"
7779
percent-encoding = "2.3"
7880
pin-project-lite = "0.2"
@@ -164,6 +166,8 @@ collapsible_if = "allow"
164166
# simd-json = { git = "https://github.com/ritchie46/simd-json", branch = "alignment" }
165167
tikv-jemallocator = { git = "https://github.com/pola-rs/jemallocator", rev = "c7991e5bb6b3e9f79db6b0f48dcda67c5c3d2936" }
166168
object_store = { git = "https://github.com/kdn36/arrow-rs-object-store", branch = "feat_checksum_crc64" }
169+
opendal = { path = "opendal/core" }
170+
object_store_opendal = { path = "opendal/integrations/object_store" }
167171
color-backtrace = { git = "https://github.com/orlp/color-backtrace", rev = "bb62ccf1e9eb1f6b7af5f16acff1fd7151a876dd" }
168172

169173
[profile.mindebug-dev]

crates/polars-io/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ tokio = { workspace = true, features = ["fs", "net", "rt-multi-thread", "time",
5454
zmij = { workspace = true, optional = true }
5555
zstd = { workspace = true, optional = true }
5656

57-
hf-xet = { git = "https://github.com/huggingface/xet-core", rev = "cacd713", optional = true }
58-
xet-client = { git = "https://github.com/huggingface/xet-core", rev = "cacd713", optional = true }
57+
opendal = { workspace = true, features = ["services-hf"], optional = true }
58+
object_store_opendal = { workspace = true, optional = true }
5959

6060
[target.'cfg(not(target_family = "wasm"))'.dependencies]
6161
fs4 = { version = "0.13", features = ["sync"], optional = true }
@@ -150,7 +150,7 @@ http = ["object_store/http", "cloud"]
150150
temporal = ["dtype-datetime", "dtype-date", "dtype-time"]
151151
simd = []
152152
python = ["pyo3", "polars-error/python", "polars-utils/python"]
153-
hf_bucket_sink = ["cloud", "parquet", "dep:hf-xet", "dep:xet-client"]
153+
hf = ["cloud", "dep:opendal", "dep:object_store_opendal"]
154154
allow_unused = []
155155

156156
[package.metadata.docs.rs]

crates/polars-io/src/cloud/hf.rs

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
//! Hugging Face cloud storage support via OpenDAL.
2+
//!
3+
//! Provides an [`ObjectStore`] implementation for `hf://` URLs by bridging
4+
//! OpenDAL's HF backend through `object_store_opendal`.
5+
//!
6+
//! Gated behind `#[cfg(feature = "hf")]`.
7+
8+
use std::sync::Arc;
9+
10+
use object_store::ObjectStore;
11+
use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
12+
use polars_utils::pl_path::PlRefPath;
13+
14+
use super::options::CloudOptions;
15+
16+
/// Parse an `hf://` URL and build an [`ObjectStore`] backed by OpenDAL.
17+
///
18+
/// Supported URL formats:
19+
/// - `hf://buckets/<namespace>/<name>[/<path>]`
20+
/// - `hf://datasets/<namespace>/<name>[/<path>]`
21+
/// - `hf://models/<namespace>/<name>[/<path>]`
22+
pub fn build_hf(
23+
url: PlRefPath,
24+
options: Option<&CloudOptions>,
25+
) -> PolarsResult<Arc<dyn ObjectStore>> {
26+
let after_scheme = url.strip_scheme();
27+
let (repo_type_plural, rest) = after_scheme
28+
.split_once('/')
29+
.ok_or_else(|| polars_err!(ComputeError: "invalid hf:// URL: {}", url.as_str()))?;
30+
31+
// hf:// URLs use plural form ("buckets", "datasets", "models")
32+
// but OpenDAL expects singular ("bucket", "dataset", "model")
33+
let repo_type: &str = repo_type_plural
34+
.strip_suffix('s')
35+
.unwrap_or(repo_type_plural);
36+
37+
// Extract repo_id (namespace/name) from the remaining path
38+
let parts = rest.splitn(3, '/').collect::<Vec<&str>>();
39+
if parts.len() < 2 || parts[0].is_empty() || parts[1].is_empty() {
40+
polars_bail!(
41+
ComputeError:
42+
"invalid hf:// URL: expected hf://<type>/<namespace>/<name>[/path], got: {}",
43+
url.as_str()
44+
);
45+
}
46+
let repo_id = format!("{}/{}", parts[0], parts[1]);
47+
48+
let token = extract_hf_token(options)?;
49+
50+
let builder = opendal::services::Hf::default()
51+
.repo_type(repo_type)
52+
.repo_id(&repo_id)
53+
.token(&token);
54+
55+
let op = opendal::Operator::new(builder)
56+
.map_err(to_compute_err)?
57+
.finish();
58+
59+
Ok(Arc::new(object_store_opendal::OpendalStore::new(op)) as Arc<dyn ObjectStore>)
60+
}
61+
62+
/// Extract an HF token from cloud options, environment, or cached file.
63+
///
64+
/// Resolution order:
65+
/// 1. `storage_options` / CloudOptions HTTP Authorization header
66+
/// 2. `HF_TOKEN` environment variable
67+
/// 3. Cached token at `$HF_HOME/token` (default: `~/.cache/huggingface/token`)
68+
fn extract_hf_token(cloud_options: Option<&CloudOptions>) -> PolarsResult<String> {
69+
#[cfg(feature = "http")]
70+
if let Some(opts) = cloud_options {
71+
if let Some(super::options::CloudConfig::Http { headers }) = &opts.config {
72+
for (key, value) in headers {
73+
if key.eq_ignore_ascii_case("authorization") {
74+
if let Some(token) = value.strip_prefix("Bearer ") {
75+
return Ok(token.to_string());
76+
}
77+
}
78+
}
79+
}
80+
}
81+
82+
#[cfg(not(feature = "http"))]
83+
let _ = cloud_options;
84+
85+
if let Ok(token) = std::env::var("HF_TOKEN") {
86+
if !token.is_empty() {
87+
return Ok(token);
88+
}
89+
}
90+
91+
let hf_home = std::env::var("HF_HOME");
92+
let hf_home = hf_home.as_deref().unwrap_or("~/.cache/huggingface");
93+
let hf_home = crate::path_utils::resolve_homedir(hf_home);
94+
let cached_token_path = hf_home.join("token");
95+
96+
if let Ok(bytes) = std::fs::read(&cached_token_path) {
97+
if let Ok(token) = String::from_utf8(bytes) {
98+
let token = token.trim().to_string();
99+
if !token.is_empty() {
100+
return Ok(token);
101+
}
102+
}
103+
}
104+
105+
polars_bail!(
106+
ComputeError:
107+
"no HF token found: set HF_TOKEN env var, pass via storage_options, \
108+
or login with `huggingface-cli login`"
109+
);
110+
}
111+
112+
#[cfg(test)]
113+
mod tests {
114+
use super::*;
115+
116+
#[test]
117+
fn test_token_from_env() {
118+
let original = std::env::var("HF_TOKEN").ok();
119+
std::env::set_var("HF_TOKEN", "hf_test_token_123");
120+
121+
let result = extract_hf_token(None);
122+
assert!(result.is_ok());
123+
assert_eq!(result.unwrap(), "hf_test_token_123");
124+
125+
match original {
126+
Some(v) => std::env::set_var("HF_TOKEN", v),
127+
None => std::env::remove_var("HF_TOKEN"),
128+
}
129+
}
130+
131+
#[test]
132+
fn test_empty_token_skipped() {
133+
let original = std::env::var("HF_TOKEN").ok();
134+
std::env::set_var("HF_TOKEN", "");
135+
136+
let result = extract_hf_token(None);
137+
if let Ok(token) = &result {
138+
assert!(!token.is_empty());
139+
}
140+
141+
match original {
142+
Some(v) => std::env::set_var("HF_TOKEN", v),
143+
None => std::env::remove_var("HF_TOKEN"),
144+
}
145+
}
146+
147+
#[test]
148+
fn test_build_hf_valid_bucket_url() {
149+
std::env::set_var("HF_TOKEN", "hf_test");
150+
let url = PlRefPath::new("hf://buckets/myorg/mybucket/path/file.parquet");
151+
let result = build_hf(url, None);
152+
// Builder succeeds (actual I/O would fail without a real token,
153+
// but the ObjectStore is constructed)
154+
assert!(result.is_ok());
155+
std::env::remove_var("HF_TOKEN");
156+
}
157+
158+
#[test]
159+
fn test_build_hf_valid_dataset_url() {
160+
std::env::set_var("HF_TOKEN", "hf_test");
161+
let url = PlRefPath::new("hf://datasets/user/dataset-name/train.parquet");
162+
let result = build_hf(url, None);
163+
assert!(result.is_ok());
164+
std::env::remove_var("HF_TOKEN");
165+
}
166+
167+
#[test]
168+
fn test_build_hf_invalid_url_no_repo() {
169+
std::env::set_var("HF_TOKEN", "hf_test");
170+
let url = PlRefPath::new("hf://buckets/only-namespace");
171+
let result = build_hf(url, None);
172+
assert!(result.is_err());
173+
std::env::remove_var("HF_TOKEN");
174+
}
175+
}

crates/polars-io/src/cloud/hf_bucket/batch.rs

Lines changed: 0 additions & 89 deletions
This file was deleted.

0 commit comments

Comments
 (0)