Skip to content

Commit 6df1dfb

Browse files
committed
feat(service/tos): impl tos read/write/delete
Change-Id: I73d7ff4b4ae8c50d22dfe8e45a7225dd3ce3addc
1 parent 4ea60b0 commit 6df1dfb

11 files changed

Lines changed: 1022 additions & 33 deletions

File tree

core/Cargo.lock

Lines changed: 42 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ libtest-mimic = "0.8"
315315
log = { workspace = true }
316316
logforth = { workspace = true }
317317
rand = { workspace = true }
318-
reqwest = "0.12.24"
318+
reqwest = "0.13.2"
319319
sha2 = { workspace = true }
320320
size = "0.5"
321321
tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread"] }

core/services/tos/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,17 @@ version = { workspace = true }
3131
all-features = true
3232

3333
[dependencies]
34+
bytes = { workspace = true }
35+
http = { workspace = true }
3436
opendal-core = { path = "../../core", version = "0.55.0", default-features = false }
37+
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
38+
reqsign-core = { version = "3.0.0", default-features = false }
39+
reqsign-file-read-tokio = { version = "3.0.0", default-features = false }
40+
reqsign-http-send-reqwest = { version = "4.0.0", default-features = false }
41+
reqsign-volcengine-tos = "3.0.0"
3542
serde = { workspace = true, features = ["derive"] }
3643
serde_json = { workspace = true }
44+
45+
[dev-dependencies]
46+
rand.workspace = true
47+
tokio.workspace = true

core/services/tos/src/backend.rs

Lines changed: 144 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,21 @@
1818
use std::fmt::Debug;
1919
use std::sync::Arc;
2020

21-
use crate::TosConfig;
22-
use crate::core::TosCore;
21+
use crate::config::TosConfig;
22+
use crate::core::constants::X_TOS_VERSION_ID;
23+
use crate::core::constants::{X_TOS_DIRECTORY, X_TOS_META_PREFIX};
24+
use crate::core::*;
25+
use crate::deleter::TosDeleter;
26+
use crate::error::parse_error;
27+
use crate::utils::tos_parse_into_metadata;
28+
use crate::writer::TosWriter;
29+
use http::Response;
30+
use http::StatusCode;
2331
use opendal_core::raw::*;
24-
use opendal_core::{Builder, Capability, Result};
32+
use opendal_core::{Builder, Capability, EntryMode, Error, ErrorKind, Result};
33+
use reqsign_core::{Context, OsEnv, ProvideCredentialChain, Signer};
34+
use reqsign_file_read_tokio::TokioFileRead;
35+
use reqsign_volcengine_tos::{EnvCredentialProvider, RequestSigner, StaticCredentialProvider};
2536

2637
const TOS_SCHEME: &str = "tos";
2738

@@ -145,17 +156,53 @@ impl Builder for TosBuilder {
145156
let bucket = config.bucket.clone();
146157
let root = config.root.clone().unwrap_or_else(|| "/".to_string());
147158

159+
let ctx = Context::new()
160+
.with_file_read(TokioFileRead)
161+
.with_http_send(HttpClient::with(GLOBAL_REQWEST_CLIENT.clone()))
162+
.with_env(OsEnv);
163+
164+
let mut provider = ProvideCredentialChain::new().push(EnvCredentialProvider::new());
165+
166+
if let (Some(ak), Some(sk)) = (&config.access_key_id, &config.secret_access_key) {
167+
let static_provider = if let Some(token) = config.security_token.as_deref() {
168+
StaticCredentialProvider::new(ak, sk).with_security_token(token)
169+
} else {
170+
StaticCredentialProvider::new(ak, sk)
171+
};
172+
provider = provider.push_front(static_provider);
173+
}
174+
175+
let request_signer = RequestSigner::new(&region);
176+
let signer = Signer::new(ctx, provider, request_signer);
177+
148178
let info = {
149179
let am = AccessorInfo::default();
150180
am.set_scheme(TOS_SCHEME)
151181
.set_root(&root)
152182
.set_name(&bucket)
153183
.set_native_capability(Capability {
154-
read: false,
184+
read: true,
185+
read_with_if_match: true,
186+
read_with_if_none_match: true,
187+
read_with_if_modified_since: true,
188+
read_with_if_unmodified_since: true,
189+
read_with_version: config.enable_versioning,
155190

156-
write: false,
191+
write: true,
192+
write_can_empty: true,
193+
write_can_multi: true,
194+
write_with_cache_control: true,
195+
write_with_content_type: true,
196+
write_with_content_encoding: true,
197+
write_with_if_match: true,
198+
write_with_if_not_exists: !config.enable_versioning,
199+
write_with_user_metadata: true,
200+
write_multi_min_size: Some(5 * 1024 * 1024),
201+
write_multi_max_size: Some(5 * 1024 * 1024 * 1024),
157202

158-
delete: false,
203+
delete: true,
204+
delete_max_size: Some(1000),
205+
delete_with_version: config.enable_versioning,
159206

160207
list: false,
161208

@@ -169,11 +216,24 @@ impl Builder for TosBuilder {
169216
am.into()
170217
};
171218

219+
// Extract domain from endpoint, removing http:// or https:// prefix
220+
let endpoint_domain = if let Some(stripped) = endpoint.strip_prefix("http://") {
221+
stripped
222+
} else if let Some(stripped) = endpoint.strip_prefix("https://") {
223+
stripped
224+
} else {
225+
&endpoint
226+
};
227+
172228
let core = TosCore {
173229
info,
174230
bucket,
175231
endpoint: endpoint.clone(),
232+
endpoint_domain: endpoint_domain.to_string(),
176233
root,
234+
default_storage_class: None,
235+
allow_anonymous: config.allow_anonymous,
236+
signer,
177237
};
178238

179239
Ok(TosBackend {
@@ -188,12 +248,87 @@ pub struct TosBackend {
188248
}
189249

190250
impl Access for TosBackend {
191-
type Reader = ();
192-
type Writer = ();
251+
type Reader = HttpBody;
252+
type Writer = oio::MultipartWriter<TosWriter>;
193253
type Lister = ();
194-
type Deleter = ();
254+
type Deleter = oio::BatchDeleter<TosDeleter>;
195255

196256
fn info(&self) -> Arc<AccessorInfo> {
197257
self.core.info.clone()
198258
}
259+
260+
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
261+
let resp = self.core.tos_head_object(path, args).await?;
262+
263+
let status = resp.status();
264+
265+
match status {
266+
StatusCode::OK => {
267+
let headers = resp.headers();
268+
let mut meta = tos_parse_into_metadata(path, headers)?;
269+
270+
let user_meta = parse_prefixed_headers(headers, X_TOS_META_PREFIX);
271+
if !user_meta.is_empty() {
272+
meta = meta.with_user_metadata(user_meta);
273+
}
274+
275+
if let Some(v) = parse_header_to_str(headers, X_TOS_VERSION_ID)? {
276+
meta.set_version(v);
277+
}
278+
279+
if let Some(is_dir) = parse_header_to_str(headers, X_TOS_DIRECTORY)?
280+
.map(|v| {
281+
v.parse::<bool>().map_err(|e| {
282+
Error::new(ErrorKind::Unexpected, "header value is not valid integer")
283+
.set_source(e)
284+
})
285+
})
286+
.transpose()?
287+
{
288+
meta = meta.with_mode(if is_dir {
289+
EntryMode::DIR
290+
} else {
291+
EntryMode::FILE
292+
});
293+
}
294+
295+
Ok(RpStat::new(meta))
296+
}
297+
_ => Err(parse_error(resp)),
298+
}
299+
}
300+
301+
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
302+
let resp = self.core.tos_get_object(path, args.range(), &args).await?;
303+
304+
let status = resp.status();
305+
match status {
306+
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
307+
Ok((RpRead::default(), resp.into_body()))
308+
}
309+
_ => {
310+
let (part, mut body) = resp.into_parts();
311+
let buf = body.to_buffer().await?;
312+
Err(parse_error(Response::from_parts(part, buf)))
313+
}
314+
}
315+
}
316+
317+
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
318+
let writer = TosWriter::new(self.core.clone(), path, args.clone());
319+
320+
let w = oio::MultipartWriter::new(self.core.info.clone(), writer, args.concurrent());
321+
322+
Ok((RpWrite::default(), w))
323+
}
324+
325+
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
326+
let info = self.core.info.clone();
327+
let capability = info.full_capability();
328+
let deleter = TosDeleter::new(self.core.clone());
329+
Ok((
330+
RpDelete::default(),
331+
oio::BatchDeleter::new(deleter, capability.delete_max_size),
332+
))
333+
}
199334
}

0 commit comments

Comments
 (0)