Skip to content

Commit dc870a8

Browse files
committed
chore(hf): upgrade xet-core to 126f30b9 with per-operation token API
Migrate to the latest xet-core API where CAS auth tokens move from session-level to per-operation (UploadCommitBuilder/DownloadStreamGroupBuilder). Use token refresh URLs so xet-core manages JWT lifecycle automatically.
1 parent 87aa591 commit dc870a8

4 files changed

Lines changed: 63 additions & 17 deletions

File tree

core/services/hf/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ reqwest = { version = "0.12", default-features = false, features = [
4545
serde = { workspace = true, features = ["derive"] }
4646
serde_json = { workspace = true }
4747
tokio = { workspace = true, features = ["sync"] }
48-
hf-xet = { git = "https://github.com/huggingface/xet-core.git", rev = "60a916f74df506a5d10c0656ee3e86a8411ab4f6" }
48+
hf-xet = { git = "https://github.com/huggingface/xet-core.git", rev = "126f30b9816b7cc737f38c02017867264472c933" }
4949

5050
[dev-dependencies]
5151
opendal-core = { path = "../../core", version = "0.55.0", features = [

core/services/hf/src/core.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,7 @@ pub(super) struct LastCommit {
173173
#[derive(Clone, Debug, Deserialize)]
174174
#[serde(rename_all = "camelCase")]
175175
pub(super) struct XetToken {
176-
pub access_token: String,
177176
pub cas_url: String,
178-
pub exp: u64,
179177
}
180178

181179
// Core HuggingFace client that manages API interactions, authentication
@@ -322,10 +320,9 @@ impl HfCore {
322320
let this = self.clone();
323321
self.xet_session
324322
.get_or_try_init(|| async move {
325-
let token = this.xet_token("write").await?;
323+
let token = this.xet_token("read").await?;
326324
XetSessionBuilder::new()
327325
.with_endpoint(token.cas_url)
328-
.with_token_info(token.access_token, token.exp)
329326
.build()
330327
.map_err(|err| {
331328
Error::new(ErrorKind::Unexpected, "failed to create xet session")
@@ -336,6 +333,17 @@ impl HfCore {
336333
.cloned()
337334
}
338335

336+
/// Build an HTTP HeaderMap with the Authorization header for XET token refresh.
337+
pub(super) fn xet_token_refresh_headers(&self) -> http::HeaderMap {
338+
let mut headers = http::HeaderMap::new();
339+
if let Some(token) = &self.token {
340+
if let Ok(val) = format!("Bearer {}", token).parse() {
341+
headers.insert(header::AUTHORIZATION, val);
342+
}
343+
}
344+
headers
345+
}
346+
339347
/// Issue a HEAD request and extract XET file info (hash and size).
340348
///
341349
/// Returns `None` if the `X-Xet-Hash` header is absent or empty.

core/services/hf/src/reader.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,19 +84,46 @@ impl HfReader {
8484
range: BytesRange,
8585
) -> Result<Self> {
8686
let session = core.xet_session().await?;
87+
let refresh_url = core.repo.xet_token_url(&core.endpoint, "read");
88+
let refresh_headers = core.xet_token_refresh_headers();
89+
90+
let group = session
91+
.new_download_stream_group()
92+
.map_err(|err| {
93+
Error::new(
94+
ErrorKind::Unexpected,
95+
"failed to create download stream group",
96+
)
97+
.set_source(err)
98+
})?
99+
.with_token_refresh_url(refresh_url, refresh_headers)
100+
.build()
101+
.await
102+
.map_err(|err| {
103+
Error::new(
104+
ErrorKind::Unexpected,
105+
"failed to build download stream group",
106+
)
107+
.set_source(err)
108+
})?;
109+
87110
let xet_range = if range.is_full() {
88111
None
89112
} else {
90113
let start = range.offset();
91114
let end = range.size().map(|s| start + s).unwrap_or(u64::MAX);
92115
Some(start..end)
93116
};
94-
let mut stream = session
117+
118+
let mut stream = group
95119
.download_stream(file_info.clone(), xet_range)
96120
.await
97121
.map_err(|err| {
98-
Error::new(ErrorKind::Unexpected, "failed to create xet download stream")
99-
.set_source(err)
122+
Error::new(
123+
ErrorKind::Unexpected,
124+
"failed to create xet download stream",
125+
)
126+
.set_source(err)
100127
})?;
101128
stream.start();
102129
Ok(Self::Xet(stream))

core/services/hf/src/writer.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,22 @@ impl HfWriter {
5252

5353
let writer = if use_xet {
5454
let session = core.xet_session().await?;
55-
let commit = session.new_upload_commit().await.map_err(|err| {
56-
Error::new(ErrorKind::Unexpected, "failed to create xet upload commit")
57-
.set_source(err)
58-
})?;
55+
let refresh_url = core.repo.xet_token_url(&core.endpoint, "write");
56+
let refresh_headers = core.xet_token_refresh_headers();
57+
58+
let commit = session
59+
.new_upload_commit()
60+
.map_err(|err| {
61+
Error::new(ErrorKind::Unexpected, "failed to create xet upload commit")
62+
.set_source(err)
63+
})?
64+
.with_token_refresh_url(refresh_url, refresh_headers)
65+
.build()
66+
.await
67+
.map_err(|err| {
68+
Error::new(ErrorKind::Unexpected, "failed to build xet upload commit")
69+
.set_source(err)
70+
})?;
5971
let stream = commit
6072
.upload_stream(None, Sha256Policy::Compute)
6173
.await
@@ -131,9 +143,9 @@ impl oio::Write for HfWriter {
131143
commit,
132144
stream,
133145
} => {
134-
let stream = stream
135-
.take()
136-
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "xet writer already closed"))?;
146+
let stream = stream.take().ok_or_else(|| {
147+
Error::new(ErrorKind::Unexpected, "xet writer already closed")
148+
})?;
137149

138150
let file_meta = stream.finish().await.map_err(|err| {
139151
Error::new(ErrorKind::Unexpected, "failed to finish xet upload").set_source(err)
@@ -145,8 +157,7 @@ impl oio::Write for HfWriter {
145157
let meta = Metadata::default().with_content_length(content_length);
146158

147159
commit.commit().await.map_err(|err| {
148-
Error::new(ErrorKind::Unexpected, "failed to commit xet upload")
149-
.set_source(err)
160+
Error::new(ErrorKind::Unexpected, "failed to commit xet upload").set_source(err)
150161
})?;
151162

152163
if core.repo.repo_type == RepoType::Bucket {

0 commit comments

Comments
 (0)