Skip to content

Commit 8b986c7

Browse files
davanstrienclaude
andcommitted
feat: add HF Storage Bucket support via OpenDAL
Add an ObjectStore implementation for `hf://` URLs backed by OpenDAL's HF service, enabling `sink_parquet("hf://buckets/org/name/file.parquet")` to stream directly to Hugging Face Storage Buckets. The implementation follows the same pattern as existing cloud backends (S3/GCS/Azure): a `build_hf()` function in a new `hf.rs` module constructs the ObjectStore, and `object_store_setup.rs` calls it from the `CloudType::Hf` match arm. HF URLs flow through the standard FileSink path with no custom sink node or IR special-casing. New files: - crates/polars-io/src/cloud/hf.rs — URL parsing, token resolution, OpenDAL ObjectStore construction Feature flag: `hf` (opt-in, propagated through the workspace) Dependencies: opendal + object_store_opendal (local path deps for now, will switch to published crate versions once apache/opendal#7185 ships) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a06064c commit 8b986c7

14 files changed

Lines changed: 1779 additions & 61 deletions

File tree

Cargo.lock

Lines changed: 1564 additions & 59 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ num-derive = "0.4.2"
7373
num-traits = "0.2"
7474
numpy = "0.28"
7575
object_store = { version = "0.13.1", default-features = false, features = ["fs"] }
76+
object_store_opendal = { version = "0.55.0", default-features = false }
77+
opendal = { version = "0.55.0", default-features = false }
7678
parking_lot = "0.12"
7779
percent-encoding = "2.3"
7880
pin-project-lite = "0.2"
@@ -164,6 +166,8 @@ collapsible_if = "allow"
164166
# simd-json = { git = "https://github.com/ritchie46/simd-json", branch = "alignment" }
165167
tikv-jemallocator = { git = "https://github.com/pola-rs/jemallocator", rev = "c7991e5bb6b3e9f79db6b0f48dcda67c5c3d2936" }
166168
object_store = { git = "https://github.com/kdn36/arrow-rs-object-store", branch = "feat_checksum_crc64" }
169+
opendal = { path = "opendal/core" }
170+
object_store_opendal = { path = "opendal/integrations/object_store" }
167171
color-backtrace = { git = "https://github.com/orlp/color-backtrace", rev = "bb62ccf1e9eb1f6b7af5f16acff1fd7151a876dd" }
168172

169173
[profile.mindebug-dev]

crates/polars-io/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ tokio = { workspace = true, features = ["fs", "net", "rt-multi-thread", "time",
5454
zmij = { workspace = true, optional = true }
5555
zstd = { workspace = true, optional = true }
5656

57+
opendal = { workspace = true, features = ["services-hf"], optional = true }
58+
object_store_opendal = { workspace = true, optional = true }
59+
5760
[target.'cfg(not(target_family = "wasm"))'.dependencies]
5861
fs4 = { version = "0.13", features = ["sync"], optional = true }
5962
home = "0.5.4"
@@ -147,6 +150,7 @@ http = ["object_store/http", "cloud"]
147150
temporal = ["dtype-datetime", "dtype-date", "dtype-time"]
148151
simd = []
149152
python = ["pyo3", "polars-error/python", "polars-utils/python"]
153+
hf = ["cloud", "dep:opendal", "dep:object_store_opendal"]
150154
allow_unused = []
151155

152156
[package.metadata.docs.rs]

crates/polars-io/src/cloud/hf.rs

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
//! Hugging Face cloud storage support via OpenDAL.
2+
//!
3+
//! Provides an [`ObjectStore`] implementation for `hf://` URLs by bridging
4+
//! OpenDAL's HF backend through `object_store_opendal`.
5+
//!
6+
//! Gated behind `#[cfg(feature = "hf")]`.
7+
8+
use std::sync::Arc;
9+
10+
use object_store::ObjectStore;
11+
use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
12+
use polars_utils::pl_path::PlRefPath;
13+
14+
use super::options::CloudOptions;
15+
16+
/// Parse an `hf://` URL and build an [`ObjectStore`] backed by OpenDAL.
17+
///
18+
/// Supported URL formats:
19+
/// - `hf://buckets/<namespace>/<name>[/<path>]`
20+
/// - `hf://datasets/<namespace>/<name>[/<path>]`
21+
/// - `hf://models/<namespace>/<name>[/<path>]`
22+
pub fn build_hf(
23+
url: PlRefPath,
24+
options: Option<&CloudOptions>,
25+
) -> PolarsResult<Arc<dyn ObjectStore>> {
26+
let after_scheme = url.strip_scheme();
27+
let (repo_type_plural, rest) = after_scheme
28+
.split_once('/')
29+
.ok_or_else(|| polars_err!(ComputeError: "invalid hf:// URL: {}", url.as_str()))?;
30+
31+
// hf:// URLs use plural form ("buckets", "datasets", "models")
32+
// but OpenDAL expects singular ("bucket", "dataset", "model")
33+
let repo_type: &str = repo_type_plural
34+
.strip_suffix('s')
35+
.unwrap_or(repo_type_plural);
36+
37+
// Extract repo_id (namespace/name) from the remaining path
38+
let parts = rest.splitn(3, '/').collect::<Vec<&str>>();
39+
if parts.len() < 2 || parts[0].is_empty() || parts[1].is_empty() {
40+
polars_bail!(
41+
ComputeError:
42+
"invalid hf:// URL: expected hf://<type>/<namespace>/<name>[/path], got: {}",
43+
url.as_str()
44+
);
45+
}
46+
let repo_id = format!("{}/{}", parts[0], parts[1]);
47+
48+
let token = extract_hf_token(options)?;
49+
50+
let builder = opendal::services::Hf::default()
51+
.repo_type(repo_type)
52+
.repo_id(&repo_id)
53+
.token(&token);
54+
55+
let op = opendal::Operator::new(builder)
56+
.map_err(to_compute_err)?
57+
.finish();
58+
59+
Ok(Arc::new(object_store_opendal::OpendalStore::new(op)) as Arc<dyn ObjectStore>)
60+
}
61+
62+
/// Extract an HF token from cloud options, environment, or cached file.
63+
///
64+
/// Resolution order:
65+
/// 1. `storage_options` / CloudOptions HTTP Authorization header
66+
/// 2. `HF_TOKEN` environment variable
67+
/// 3. Cached token at `$HF_HOME/token` (default: `~/.cache/huggingface/token`)
68+
fn extract_hf_token(cloud_options: Option<&CloudOptions>) -> PolarsResult<String> {
69+
#[cfg(feature = "http")]
70+
if let Some(opts) = cloud_options {
71+
if let Some(super::options::CloudConfig::Http { headers }) = &opts.config {
72+
for (key, value) in headers {
73+
if key.eq_ignore_ascii_case("authorization") {
74+
if let Some(token) = value.strip_prefix("Bearer ") {
75+
return Ok(token.to_string());
76+
}
77+
}
78+
}
79+
}
80+
}
81+
82+
#[cfg(not(feature = "http"))]
83+
let _ = cloud_options;
84+
85+
if let Ok(token) = std::env::var("HF_TOKEN") {
86+
if !token.is_empty() {
87+
return Ok(token);
88+
}
89+
}
90+
91+
let hf_home = std::env::var("HF_HOME");
92+
let hf_home = hf_home.as_deref().unwrap_or("~/.cache/huggingface");
93+
let hf_home = crate::path_utils::resolve_homedir(hf_home);
94+
let cached_token_path = hf_home.join("token");
95+
96+
if let Ok(bytes) = std::fs::read(&cached_token_path) {
97+
if let Ok(token) = String::from_utf8(bytes) {
98+
let token = token.trim().to_string();
99+
if !token.is_empty() {
100+
return Ok(token);
101+
}
102+
}
103+
}
104+
105+
polars_bail!(
106+
ComputeError:
107+
"no HF token found: set HF_TOKEN env var, pass via storage_options, \
108+
or login with `huggingface-cli login`"
109+
);
110+
}
111+
112+
#[cfg(test)]
113+
mod tests {
114+
use super::*;
115+
116+
#[test]
117+
fn test_token_from_env() {
118+
let original = std::env::var("HF_TOKEN").ok();
119+
std::env::set_var("HF_TOKEN", "hf_test_token_123");
120+
121+
let result = extract_hf_token(None);
122+
assert!(result.is_ok());
123+
assert_eq!(result.unwrap(), "hf_test_token_123");
124+
125+
match original {
126+
Some(v) => std::env::set_var("HF_TOKEN", v),
127+
None => std::env::remove_var("HF_TOKEN"),
128+
}
129+
}
130+
131+
#[test]
132+
fn test_empty_token_skipped() {
133+
let original = std::env::var("HF_TOKEN").ok();
134+
std::env::set_var("HF_TOKEN", "");
135+
136+
let result = extract_hf_token(None);
137+
if let Ok(token) = &result {
138+
assert!(!token.is_empty());
139+
}
140+
141+
match original {
142+
Some(v) => std::env::set_var("HF_TOKEN", v),
143+
None => std::env::remove_var("HF_TOKEN"),
144+
}
145+
}
146+
147+
#[test]
148+
fn test_build_hf_valid_bucket_url() {
149+
std::env::set_var("HF_TOKEN", "hf_test");
150+
let url = PlRefPath::new("hf://buckets/myorg/mybucket/path/file.parquet");
151+
let result = build_hf(url, None);
152+
// Builder succeeds (actual I/O would fail without a real token,
153+
// but the ObjectStore is constructed)
154+
assert!(result.is_ok());
155+
std::env::remove_var("HF_TOKEN");
156+
}
157+
158+
#[test]
159+
fn test_build_hf_valid_dataset_url() {
160+
std::env::set_var("HF_TOKEN", "hf_test");
161+
let url = PlRefPath::new("hf://datasets/user/dataset-name/train.parquet");
162+
let result = build_hf(url, None);
163+
assert!(result.is_ok());
164+
std::env::remove_var("HF_TOKEN");
165+
}
166+
167+
#[test]
168+
fn test_build_hf_invalid_url_no_repo() {
169+
std::env::set_var("HF_TOKEN", "hf_test");
170+
let url = PlRefPath::new("hf://buckets/only-namespace");
171+
let result = build_hf(url, None);
172+
assert!(result.is_err());
173+
std::env::remove_var("HF_TOKEN");
174+
}
175+
}

crates/polars-io/src/cloud/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,5 @@ pub use polars_object_store::*;
2020
pub mod cloud_writer;
2121
#[cfg(feature = "cloud")]
2222
pub mod credential_provider;
23+
#[cfg(feature = "hf")]
24+
pub mod hf;

crates/polars-io/src/cloud/object_store_setup.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,15 @@ impl PolarsObjectStoreBuilder {
177177
#[cfg(not(feature = "http"))]
178178
return err_missing_feature("http", &cloud_location.scheme);
179179
},
180-
CloudType::Hf => panic!("impl error: unresolved hf:// path"),
180+
CloudType::Hf => {
181+
#[cfg(feature = "hf")]
182+
{
183+
let store = super::hf::build_hf(self.path.clone(), self.options.as_ref())?;
184+
Ok::<_, PolarsError>(store)
185+
}
186+
#[cfg(not(feature = "hf"))]
187+
return err_missing_feature("hf", &self.cloud_type);
188+
},
181189
}?;
182190

183191
Ok(store)
@@ -253,7 +261,19 @@ pub async fn build_object_store(
253261
let cloud_type = path
254262
.scheme()
255263
.map_or(CloudType::File, CloudType::from_cloud_scheme);
256-
let cloud_location = CloudLocation::new(path.clone(), glob)?;
264+
let mut cloud_location = CloudLocation::new(path.clone(), glob)?;
265+
266+
// For HF URLs, strip the repo_id (namespace/name) from the prefix
267+
// since the OpenDAL operator already has repo_id configured.
268+
// e.g. prefix "ns/name/path/file.parquet" → "path/file.parquet"
269+
if cloud_type == CloudType::Hf {
270+
let prefix = &cloud_location.prefix;
271+
let file_path = prefix
272+
.splitn(3, '/')
273+
.nth(2)
274+
.unwrap_or("");
275+
cloud_location.prefix = file_path.to_string();
276+
}
257277

258278
let store = PolarsObjectStoreBuilder {
259279
path,

crates/polars-lazy/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ cloud = [
6262
"polars-mem-engine/cloud",
6363
"polars-stream?/cloud",
6464
]
65+
hf = ["polars-stream?/hf"]
6566
ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-mem-engine/ipc", "polars-stream?/ipc"]
6667
json = [
6768
"polars-io/json",

crates/polars-python/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ rle = ["polars/rle"]
189189
extract_groups = ["polars/extract_groups"]
190190
ffi_plugin = ["polars-lazy/ffi_plugin"]
191191
cloud = ["polars/cloud", "polars/aws", "polars/gcp", "polars/azure", "polars/http"]
192+
hf = ["polars/hf"]
192193
peaks = ["polars/peaks"]
193194
hist = ["polars/hist"]
194195
find_many = ["polars/find_many"]

crates/polars-stream/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ range = ["polars-plan/range"]
133133
top_k = ["polars-plan/top_k"]
134134
cum_agg = ["polars-plan/cum_agg", "polars-ops/cum_agg"]
135135
is_first_distinct = ["polars-core/is_first_distinct", "polars-expr/is_first_distinct", "polars-plan/is_first_distinct"]
136+
hf = ["cloud", "polars-io/hf"]
136137

137138
# We need to specify default features here to match workspace defaults.
138139
# Otherwise we get warnings with cargo check/clippy.

crates/polars/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ parquet = [
9494
]
9595
async = ["polars-lazy?/async"]
9696
cloud = ["polars-lazy?/cloud", "polars-io/cloud"]
97+
hf = ["polars-lazy?/hf", "new_streaming"]
9798
aws = ["async", "cloud", "polars-io/aws"]
9899
http = ["async", "cloud", "polars-io/http"]
99100
azure = ["async", "cloud", "polars-io/azure"]

0 commit comments

Comments
 (0)