Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ pub struct AmazonS3Builder {
conditional_put: ConfigValue<S3ConditionalPut>,
/// Ignore tags
disable_tagging: ConfigValue<bool>,
/// Disable bulk delete
disable_bulk_delete: ConfigValue<bool>,
/// Encryption (See [`S3EncryptionConfigKey`])
encryption_type: Option<ConfigValue<S3EncryptionType>>,
encryption_kms_key_id: Option<String>,
Expand Down Expand Up @@ -429,6 +431,19 @@ pub enum AmazonS3ConfigKey {
/// - `disable_tagging`
DisableTagging,

/// Disable bulk delete (`DeleteObjects`, `POST /?delete`)
///
/// If set to `true`, [`delete`](crate::ObjectStoreExt::delete) and
/// [`delete_stream`](crate::ObjectStore::delete_stream) will issue
/// single-object `DELETE /key` requests instead of the bulk `DeleteObjects`
/// API (`POST /?delete`). Use this for S3-compatible providers that do not
/// implement `DeleteObjects` (e.g. Alibaba Cloud OSS).
///
/// Supported keys:
/// - `aws_disable_bulk_delete`
/// - `disable_bulk_delete`
DisableBulkDelete,

/// Enable Support for S3 Express One Zone
///
/// Supported keys:
Expand Down Expand Up @@ -478,6 +493,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::CopyIfNotExists => "aws_copy_if_not_exists",
Self::ConditionalPut => "aws_conditional_put",
Self::DisableTagging => "aws_disable_tagging",
Self::DisableBulkDelete => "aws_disable_bulk_delete",
Self::RequestPayer => "aws_request_payer",
Self::Client(opt) => opt.as_ref(),
Self::Encryption(opt) => opt.as_ref(),
Expand Down Expand Up @@ -525,6 +541,7 @@ impl FromStr for AmazonS3ConfigKey {
"aws_copy_if_not_exists" | "copy_if_not_exists" => Ok(Self::CopyIfNotExists),
"aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut),
"aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
"aws_disable_bulk_delete" | "disable_bulk_delete" => Ok(Self::DisableBulkDelete),
"aws_request_payer" | "request_payer" => Ok(Self::RequestPayer),
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
Expand Down Expand Up @@ -672,6 +689,7 @@ impl AmazonS3Builder {
}
AmazonS3ConfigKey::SkipSignature => self.skip_signature.parse(value),
AmazonS3ConfigKey::DisableTagging => self.disable_tagging.parse(value),
AmazonS3ConfigKey::DisableBulkDelete => self.disable_bulk_delete.parse(value),
AmazonS3ConfigKey::CopyIfNotExists => {
self.copy_if_not_exists = Some(ConfigValue::Deferred(value.into()))
}
Expand Down Expand Up @@ -745,6 +763,7 @@ impl AmazonS3Builder {
}
AmazonS3ConfigKey::ConditionalPut => Some(self.conditional_put.to_string()),
AmazonS3ConfigKey::DisableTagging => Some(self.disable_tagging.to_string()),
AmazonS3ConfigKey::DisableBulkDelete => Some(self.disable_bulk_delete.to_string()),
AmazonS3ConfigKey::RequestPayer => Some(self.request_payer.to_string()),
AmazonS3ConfigKey::Encryption(key) => match key {
S3EncryptionConfigKey::ServerSideEncryption => {
Expand Down Expand Up @@ -1018,6 +1037,21 @@ impl AmazonS3Builder {
self
}

/// If set to `true`, [`delete`](crate::ObjectStoreExt::delete) and
/// [`delete_stream`](crate::ObjectStore::delete_stream) will issue
/// single-object `DELETE /key` requests instead of the bulk `DeleteObjects`
/// API (`POST /?delete`).
///
/// The bulk `DeleteObjects` API is more efficient but is not implemented by
/// all S3-compatible providers (e.g. Alibaba Cloud OSS). Setting this to
/// `true` restores the single-object delete behaviour that works against
/// every S3-compatible provider, at the cost of throughput when deleting
/// many objects via [`delete_stream`](crate::ObjectStore::delete_stream).
pub fn with_disable_bulk_delete(mut self, disable: bool) -> Self {
self.disable_bulk_delete = disable.into();
self
}

/// Use SSE-KMS for server side encryption.
pub fn with_sse_kms_encryption(mut self, kms_key_id: impl Into<String>) -> Self {
self.encryption_type = Some(ConfigValue::Parsed(S3EncryptionType::SseKms));
Expand Down Expand Up @@ -1241,6 +1275,7 @@ impl AmazonS3Builder {
sign_payload: !self.unsigned_payload.get()?,
skip_signature: self.skip_signature.get()?,
disable_tagging: self.disable_tagging.get()?,
disable_bulk_delete: self.disable_bulk_delete.get()?,
checksum,
copy_if_not_exists,
conditional_put: self.conditional_put.get()?,
Expand Down
173 changes: 173 additions & 0 deletions src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ pub(crate) struct S3Config {
pub sign_payload: bool,
pub skip_signature: bool,
pub disable_tagging: bool,
pub disable_bulk_delete: bool,
pub checksum: Option<Checksum>,
pub copy_if_not_exists: Option<S3CopyIfNotExists>,
pub conditional_put: S3ConditionalPut,
Expand Down Expand Up @@ -613,6 +614,16 @@ impl S3Client {
Ok(results)
}

/// Make a single-object S3 Delete request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html>
///
/// Unlike [`bulk_delete_request`](Self::bulk_delete_request), this issues a
/// plain `DELETE /key` request, which is part of the core S3 API and is
/// supported by every S3-compatible provider.
pub(crate) async fn delete_request(&self, path: &Path) -> Result<()> {
self.request(Method::DELETE, path).send().await?;
Ok(())
}

/// Make an S3 Copy request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
pub(crate) fn copy_request<'a>(&'a self, from: &Path, to: &'a Path) -> Request<'a> {
let source = format!("{}/{}", self.config.bucket, encode_path(from));
Expand Down Expand Up @@ -1006,10 +1017,13 @@ fn encode_path(path: &Path) -> PercentEncode<'_> {
mod tests {
use super::*;
use crate::GetOptions;
use crate::ObjectStore;
use crate::aws::{AmazonS3, AmazonS3Builder};
use crate::client::HttpClient;
use crate::client::get::GetClient;
use crate::client::mock_server::MockServer;
use crate::client::retry::RetryContext;
use futures_util::{StreamExt, TryStreamExt};
use http::Response;
use http::header::{AUTHORIZATION, CONTENT_LENGTH};
use hyper::Request;
Expand Down Expand Up @@ -1047,6 +1061,7 @@ mod tests {
retry_config: Default::default(),
sign_payload: false,
disable_tagging: false,
disable_bulk_delete: false,
checksum: None,
copy_if_not_exists: None,
conditional_put: Default::default(),
Expand Down Expand Up @@ -1102,6 +1117,7 @@ mod tests {
retry_config: Default::default(),
sign_payload: false,
disable_tagging: false,
disable_bulk_delete: false,
checksum: None,
copy_if_not_exists: None,
conditional_put: Default::default(),
Expand Down Expand Up @@ -1153,6 +1169,163 @@ mod tests {
mock.shutdown().await;
}

/// `(method, path, query)` captured for assertion outside the mock closure.
///
/// `MockServer` swallows panics raised inside its response handler (the
/// connection just resets and the S3 retry logic can still surface an `Ok`
/// result), so assertions placed inside the closure are silently ignored.
/// We capture into shared state and assert in the test body instead.
type CapturedRequest = (Method, String, Option<String>);

fn capture(captured: &Arc<std::sync::Mutex<Vec<CapturedRequest>>>, req: &Request<Incoming>) {
captured.lock().unwrap().push((
req.method().clone(),
req.uri().path().to_string(),
req.uri().query().map(|s| s.to_string()),
));
}

/// Build an `AmazonS3` via the public builder so that `bucket_endpoint`
/// is computed by the library from the addressing-style option — i.e.
/// the option under test actually drives the URL the client emits.
fn make_store(mock: &MockServer, virtual_hosted: bool, disable_bulk_delete: bool) -> AmazonS3 {
AmazonS3Builder::new()
.with_endpoint(mock.url())
.with_bucket_name("test-bucket")
.with_region("us-east-1")
.with_allow_http(true)
.with_skip_signature(true)
.with_virtual_hosted_style_request(virtual_hosted)
.with_disable_bulk_delete(disable_bulk_delete)
.build()
.unwrap()
}

#[tokio::test]
async fn test_delete_default() {
// Default: path-style + bulk delete enabled.
// `delete_stream` must issue a single `POST /{bucket}?delete`.
let mock = MockServer::new().await;
let captured: Arc<std::sync::Mutex<Vec<CapturedRequest>>> = Default::default();
let c = Arc::clone(&captured);
mock.push_fn(move |req| {
capture(&c, &req);
Response::builder()
.status(200)
.body("<DeleteResult><Deleted><Key>foo</Key></Deleted></DeleteResult>".to_string())
.unwrap()
});

let store = make_store(&mock, false, false);
let locations = futures_util::stream::iter(vec![Ok(Path::from("foo"))]).boxed();
let deleted: Vec<_> = store.delete_stream(locations).try_collect().await.unwrap();
assert_eq!(deleted.len(), 1);

let captured = captured.lock().unwrap().clone();
assert_eq!(captured.len(), 1, "expected one bulk delete request");
assert_eq!(captured[0].0, Method::POST);
assert_eq!(captured[0].1, "/test-bucket");
assert_eq!(captured[0].2.as_deref(), Some("delete"));

mock.shutdown().await;
}

#[tokio::test]
async fn test_delete_default_with_disable_bulk() {
// Path-style + bulk delete disabled.
// `delete_stream` must fan out into `DELETE /{bucket}/{key}` (one per
// object, no `?delete` query).
let mock = MockServer::new().await;
let captured: Arc<std::sync::Mutex<Vec<CapturedRequest>>> = Default::default();
for _ in 0..2 {
let c = Arc::clone(&captured);
mock.push_fn(move |req| {
capture(&c, &req);
Response::builder().status(204).body(String::new()).unwrap()
});
}

let store = make_store(&mock, false, true);
let locations =
futures_util::stream::iter(vec![Ok(Path::from("foo")), Ok(Path::from("bar"))]).boxed();
let deleted: Vec<_> = store.delete_stream(locations).try_collect().await.unwrap();
assert_eq!(deleted.len(), 2);

let mut captured = captured.lock().unwrap().clone();
captured.sort_by(|a, b| a.1.cmp(&b.1));
assert_eq!(captured.len(), 2, "expected one DELETE per object");
assert_eq!(
captured[0],
(Method::DELETE, "/test-bucket/bar".to_string(), None)
);
assert_eq!(
captured[1],
(Method::DELETE, "/test-bucket/foo".to_string(), None)
);

mock.shutdown().await;
}

#[tokio::test]
async fn test_delete_virtual_hosted() {
// Virtual-hosted style + bulk delete enabled.
// `delete_stream` must issue a single `POST /?delete` (bucket is in
// the host, not the path).
let mock = MockServer::new().await;
let captured: Arc<std::sync::Mutex<Vec<CapturedRequest>>> = Default::default();
let c = Arc::clone(&captured);
mock.push_fn(move |req| {
capture(&c, &req);
Response::builder()
.status(200)
.body("<DeleteResult><Deleted><Key>foo</Key></Deleted></DeleteResult>".to_string())
.unwrap()
});

let store = make_store(&mock, true, false);
let locations = futures_util::stream::iter(vec![Ok(Path::from("foo"))]).boxed();
let deleted: Vec<_> = store.delete_stream(locations).try_collect().await.unwrap();
assert_eq!(deleted.len(), 1);

let captured = captured.lock().unwrap().clone();
assert_eq!(captured.len(), 1, "expected one bulk delete request");
assert_eq!(captured[0].0, Method::POST);
assert_eq!(captured[0].1, "/");
assert_eq!(captured[0].2.as_deref(), Some("delete"));

mock.shutdown().await;
}

#[tokio::test]
async fn test_delete_virtual_hosted_with_disable_bulk() {
// Virtual-hosted style + bulk delete disabled.
// `delete_stream` must fan out into `DELETE /{key}` (no bucket in
// path, no `?delete` query).
let mock = MockServer::new().await;
let captured: Arc<std::sync::Mutex<Vec<CapturedRequest>>> = Default::default();
for _ in 0..2 {
let c = Arc::clone(&captured);
mock.push_fn(move |req| {
capture(&c, &req);
Response::builder().status(204).body(String::new()).unwrap()
});
}

let store = make_store(&mock, true, true);
let locations =
futures_util::stream::iter(vec![Ok(Path::from("foo")), Ok(Path::from("bar"))]).boxed();
let deleted: Vec<_> = store.delete_stream(locations).try_collect().await.unwrap();
assert_eq!(deleted.len(), 2);

let mut captured = captured.lock().unwrap().clone();
captured.sort_by(|a, b| a.1.cmp(&b.1));
assert_eq!(captured.len(), 2, "expected one DELETE per object");
assert_eq!(captured[0], (Method::DELETE, "/bar".to_string(), None));
assert_eq!(captured[1], (Method::DELETE, "/foo".to_string(), None));

mock.shutdown().await;
}

#[tokio::test]
async fn test_default_headers_signed_get_request() {
let mock = MockServer::new().await;
Expand Down
19 changes: 19 additions & 0 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,25 @@ impl ObjectStore for AmazonS3 {
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
let client = Arc::clone(&self.client);

// Some S3-compatible providers do not implement
// the bulk `DeleteObjects` API (`POST /?delete`). When bulk delete is
// disabled, fall back to parallel single-object `DELETE /key` requests,
// which are part of the core S3 API supported by every provider.
if client.config.disable_bulk_delete {
return locations
.map(move |location| {
let client = Arc::clone(&client);
async move {
let location = location?;
client.delete_request(&location).await?;
Ok(location)
}
})
.buffered(20)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondered how you picked 20 as the concurrency but it seems to mirror the buffered(20) below

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. it is from bellow.

.boxed();
}

locations
.try_chunks(1_000)
.map(move |locations| {
Expand Down
Loading