diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 184c25ee..eeae73a8 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -30,7 +30,17 @@ version.workspace = true [features] default = ["storage-memory", "storage-fs", "storage-oss"] -storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3", "storage-hdfs"] +storage-all = [ + "storage-memory", + "storage-fs", + "storage-oss", + "storage-s3", + "storage-cos", + "storage-azdls", + "storage-obs", + "storage-gcs", + "storage-hdfs", +] fulltext = ["tantivy", "tempfile"] vortex = ["dep:vortex", "dep:kanal"] @@ -38,6 +48,10 @@ storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] storage-oss = ["opendal/services-oss"] storage-s3 = ["opendal/services-s3"] +storage-cos = ["opendal/services-cos"] +storage-azdls = ["opendal/services-azdls"] +storage-obs = ["opendal/services-obs"] +storage-gcs = ["opendal/services-gcs"] storage-hdfs = ["opendal/services-hdfs-native"] [dependencies] diff --git a/crates/paimon/src/catalog/rest/rest_token_file_io.rs b/crates/paimon/src/catalog/rest/rest_token_file_io.rs index 6233eb10..502a0d99 100644 --- a/crates/paimon/src/catalog/rest/rest_token_file_io.rs +++ b/crates/paimon/src/catalog/rest/rest_token_file_io.rs @@ -29,7 +29,6 @@ use crate::api::rest_api::RESTApi; use crate::api::rest_util::RESTUtil; use crate::catalog::Identifier; use crate::common::{CatalogOptions, Options}; -use crate::io::storage_oss::OSS_ENDPOINT; use crate::io::FileIO; use crate::Result; @@ -37,6 +36,7 @@ use super::rest_token::RESTToken; /// Safe time margin (in milliseconds) before token expiration to trigger refresh. const TOKEN_EXPIRATION_SAFE_TIME_MILLIS: i64 = 3_600_000; +const OSS_ENDPOINT: &str = "fs.oss.endpoint"; /// A FileIO wrapper that supports getting data access tokens from a REST Server. /// diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs index 7e78004c..2144ed75 100644 --- a/crates/paimon/src/io/file_io.rs +++ b/crates/paimon/src/io/file_io.rs @@ -148,7 +148,7 @@ impl FileIO { statuses.push(FileStatus { size: meta.content_length(), is_dir: meta.is_dir(), - path: format!("{base_path}{entry_path}"), + path: status_path(base_path, entry_path), last_modified: meta .last_modified() .map(|v| DateTime::::from(SystemTime::from(v))), @@ -186,7 +186,7 @@ impl FileIO { statuses.push(FileStatus { size: meta.content_length(), is_dir: false, - path: format!("{base_path}{entry_path}"), + path: status_path(base_path, entry_path), last_modified: meta .last_modified() .map(|v| DateTime::::from(SystemTime::from(v))), @@ -280,6 +280,14 @@ impl FileIO { } } +fn status_path(base_path: &str, entry_path: &str) -> String { + if base_path.ends_with('/') || entry_path.starts_with('/') { + format!("{base_path}{entry_path}") + } else { + format!("{base_path}/{entry_path}") + } +} + fn looks_like_windows_drive_path(path: &str) -> bool { let bytes = path.as_bytes(); bytes.len() >= 3 @@ -708,6 +716,150 @@ mod file_action_test { } } +#[cfg(all( + test, + any( + feature = "storage-cos", + feature = "storage-obs", + feature = "storage-gcs", + feature = "storage-azdls" + ) +))] +mod object_storage_path_test { + use super::*; + + fn assert_relative_paths(file_io: &FileIO, path: &str, expected_relative_path: &str) { + let input = file_io.new_input(path).unwrap(); + assert_eq!(input.location(), path); + assert_eq!( + &input.path[input.relative_path_pos..], + expected_relative_path + ); + + let output = file_io.new_output(path).unwrap(); + assert_eq!(output.location(), path); + assert_eq!( + &output.path[output.relative_path_pos..], + expected_relative_path + ); + + let (_op, relative_path) = file_io.storage.create(path).unwrap(); + assert_eq!(relative_path, expected_relative_path); + + let base_path = &path[..path.len() - relative_path.len()]; + assert_eq!(format!("{base_path}{relative_path}"), path); + } + + #[cfg(feature = "storage-azdls")] + #[test] + fn test_azdls_root_status_path_without_trailing_slash() { + assert_eq!( + status_path( + "abfs://filesystem@account.dfs.core.windows.net", + "warehouse/" + ), + "abfs://filesystem@account.dfs.core.windows.net/warehouse/" + ); + assert_eq!( + status_path( + "abfs://filesystem@account.dfs.core.windows.net/", + "warehouse/" + ), + "abfs://filesystem@account.dfs.core.windows.net/warehouse/" + ); + } + + #[cfg(feature = "storage-cos")] + #[test] + fn test_cos_file_io_relative_paths_and_scheme_aliases() { + for scheme in ["cosn", "cos"] { + let path = format!("{scheme}://bucket/warehouse/table/data.parquet"); + let dir_path = format!("{scheme}://bucket/warehouse/table/"); + let file_io = FileIO::from_path(&path) + .unwrap() + .with_props([ + ("fs.cosn.endpoint", "https://cos.ap-shanghai.myqcloud.com"), + ("fs.cosn.userinfo.secretId", "secret-id"), + ("fs.cosn.userinfo.secretKey", "secret-key"), + ("fs.cosn.disable-config-load", "true"), + ]) + .build() + .unwrap(); + + assert_relative_paths(&file_io, &path, "warehouse/table/data.parquet"); + assert_relative_paths(&file_io, &dir_path, "warehouse/table/"); + } + } + + #[cfg(feature = "storage-obs")] + #[test] + fn test_obs_file_io_relative_paths() { + let file_io = FileIO::from_path("obs://bucket/warehouse") + .unwrap() + .with_props([ + ( + "fs.obs.endpoint", + "https://obs.cn-north-4.myhuaweicloud.com", + ), + ("fs.obs.access.key", "access-key"), + ("fs.obs.secret.key", "secret-key"), + ]) + .build() + .unwrap(); + + assert_relative_paths( + &file_io, + "obs://bucket/warehouse/table/data.parquet", + "warehouse/table/data.parquet", + ); + assert_relative_paths( + &file_io, + "obs://bucket/warehouse/table/", + "warehouse/table/", + ); + } + + #[cfg(feature = "storage-gcs")] + #[test] + fn test_gcs_file_io_relative_paths_and_scheme_aliases() { + for scheme in ["gs", "gcs"] { + let path = format!("{scheme}://bucket/warehouse/table/data.parquet"); + let dir_path = format!("{scheme}://bucket/warehouse/table/"); + let file_io = FileIO::from_path(&path) + .unwrap() + .with_props([ + ("gcs.allow-anonymous", "true"), + ("gcs.disable-config-load", "true"), + ("gcs.disable-vm-metadata", "true"), + ]) + .build() + .unwrap(); + + assert_relative_paths(&file_io, &path, "warehouse/table/data.parquet"); + assert_relative_paths(&file_io, &dir_path, "warehouse/table/"); + } + } + + #[cfg(feature = "storage-azdls")] + #[test] + fn test_azdls_file_io_relative_paths_and_scheme_aliases() { + for scheme in ["abfs", "abfss"] { + let path = format!( + "{scheme}://filesystem@account.dfs.core.windows.net/warehouse/data.parquet" + ); + let dir_path = format!("{scheme}://filesystem@account.dfs.core.windows.net/warehouse/"); + let file_io = FileIO::from_path(&path) + .unwrap() + .with_prop("azure.account-key", "account-key") + .build() + .unwrap(); + + assert_relative_paths(&file_io, &path, "warehouse/data.parquet"); + assert_relative_paths(&file_io, &dir_path, "warehouse/"); + } + } +} + #[cfg(test)] mod input_output_test { use super::*; diff --git a/crates/paimon/src/io/mod.rs b/crates/paimon/src/io/mod.rs index 7e49c3c8..31f7c0dd 100644 --- a/crates/paimon/src/io/mod.rs +++ b/crates/paimon/src/io/mod.rs @@ -21,6 +21,15 @@ pub use file_io::*; mod storage; pub use storage::*; +#[cfg(any( + feature = "storage-s3", + feature = "storage-cos", + feature = "storage-azdls", + feature = "storage-obs", + feature = "storage-gcs" +))] +mod storage_config; + #[cfg(feature = "storage-fs")] mod storage_fs; #[cfg(feature = "storage-fs")] @@ -41,6 +50,26 @@ mod storage_s3; #[cfg(feature = "storage-s3")] use storage_s3::*; +#[cfg(feature = "storage-cos")] +mod storage_cos; +#[cfg(feature = "storage-cos")] +use storage_cos::*; + +#[cfg(feature = "storage-azdls")] +mod storage_azdls; +#[cfg(feature = "storage-azdls")] +use storage_azdls::*; + +#[cfg(feature = "storage-obs")] +mod storage_obs; +#[cfg(feature = "storage-obs")] +use storage_obs::*; + +#[cfg(feature = "storage-gcs")] +mod storage_gcs; +#[cfg(feature = "storage-gcs")] +use storage_gcs::*; + #[cfg(feature = "storage-hdfs")] mod storage_hdfs; #[cfg(feature = "storage-hdfs")] diff --git a/crates/paimon/src/io/storage.rs b/crates/paimon/src/io/storage.rs index a57fcfc2..59d2740e 100644 --- a/crates/paimon/src/io/storage.rs +++ b/crates/paimon/src/io/storage.rs @@ -17,22 +17,47 @@ use std::collections::HashMap; #[cfg(any( + feature = "storage-azdls", + feature = "storage-cos", + feature = "storage-gcs", feature = "storage-oss", + feature = "storage-obs", feature = "storage-s3", feature = "storage-hdfs" ))] use std::sync::Mutex; -#[cfg(any(feature = "storage-oss", feature = "storage-s3"))] +#[cfg(any( + feature = "storage-azdls", + feature = "storage-cos", + feature = "storage-gcs", + feature = "storage-oss", + feature = "storage-obs", + feature = "storage-s3" +))] use std::sync::MutexGuard; +#[cfg(feature = "storage-azdls")] +use super::AzdlsStorageConfig; +#[cfg(feature = "storage-cos")] +use opendal::services::CosConfig; +#[cfg(feature = "storage-gcs")] +use opendal::services::GcsConfig; #[cfg(feature = "storage-hdfs")] use opendal::services::HdfsNativeConfig; +#[cfg(feature = "storage-obs")] +use opendal::services::ObsConfig; #[cfg(feature = "storage-oss")] use opendal::services::OssConfig; #[cfg(feature = "storage-s3")] use opendal::services::S3Config; use opendal::{Operator, Scheme}; -#[cfg(any(feature = "storage-oss", feature = "storage-s3"))] +#[cfg(any( + feature = "storage-cos", + feature = "storage-gcs", + feature = "storage-oss", + feature = "storage-obs", + feature = "storage-s3" +))] use url::Url; use crate::error; @@ -56,6 +81,26 @@ pub enum Storage { config: Box, operators: Mutex>, }, + #[cfg(feature = "storage-cos")] + Cos { + config: Box, + operators: Mutex>, + }, + #[cfg(feature = "storage-azdls")] + Azdls { + config: Box, + operators: Mutex>, + }, + #[cfg(feature = "storage-obs")] + Obs { + config: Box, + operators: Mutex>, + }, + #[cfg(feature = "storage-gcs")] + Gcs { + config: Box, + operators: Mutex>, + }, #[cfg(feature = "storage-hdfs")] Hdfs { config: Box, @@ -93,6 +138,38 @@ impl Storage { operators: Mutex::new(HashMap::new()), }) } + #[cfg(feature = "storage-cos")] + Scheme::Cos => { + let config = super::cos_config_parse(props)?; + Ok(Self::Cos { + config: Box::new(config), + operators: Mutex::new(HashMap::new()), + }) + } + #[cfg(feature = "storage-azdls")] + Scheme::Azdls => { + let config = super::azdls_config_parse(props)?; + Ok(Self::Azdls { + config: Box::new(config), + operators: Mutex::new(HashMap::new()), + }) + } + #[cfg(feature = "storage-obs")] + Scheme::Obs => { + let config = super::obs_config_parse(props)?; + Ok(Self::Obs { + config: Box::new(config), + operators: Mutex::new(HashMap::new()), + }) + } + #[cfg(feature = "storage-gcs")] + Scheme::Gcs => { + let config = super::gcs_config_parse(props)?; + Ok(Self::Gcs { + config: Box::new(config), + operators: Mutex::new(HashMap::new()), + }) + } #[cfg(feature = "storage-hdfs")] Scheme::HdfsNative => { let config = super::hdfs_config_parse(props)?; @@ -115,16 +192,54 @@ impl Storage { Storage::LocalFs { op } => Ok((op.clone(), Self::fs_relative_path(path)?)), #[cfg(feature = "storage-oss")] Storage::Oss { config, operators } => { - let (bucket, relative_path) = Self::oss_bucket_and_relative_path(path)?; + let (bucket, relative_path) = + Self::bucket_and_relative_path(path, "OSS", &["oss"])?; let op = Self::cached_oss_operator(config, operators, path, &bucket)?; Ok((op, relative_path)) } #[cfg(feature = "storage-s3")] Storage::S3 { config, operators } => { - let (bucket, relative_path) = Self::s3_bucket_and_relative_path(path)?; + let (bucket, relative_path) = + Self::bucket_and_relative_path(path, "S3", &["s3", "s3a"])?; let op = Self::cached_s3_operator(config, operators, path, &bucket)?; Ok((op, relative_path)) } + #[cfg(feature = "storage-cos")] + Storage::Cos { config, operators } => { + let (bucket, relative_path) = + Self::bucket_and_relative_path(path, "COS", &["cos", "cosn"])?; + let op = Self::cached_operator(operators, "COS", &bucket, || { + super::cos_config_build(config, path) + })?; + Ok((op, relative_path)) + } + #[cfg(feature = "storage-azdls")] + Storage::Azdls { config, operators } => { + let relative_path = super::azdls_relative_path(path)?; + let cache_key = super::azdls_operator_cache_key(config, path)?; + let op = Self::cached_operator(operators, "Azure", &cache_key, || { + super::azdls_config_build(config, path) + })?; + Ok((op, relative_path)) + } + #[cfg(feature = "storage-obs")] + Storage::Obs { config, operators } => { + let (bucket, relative_path) = + Self::bucket_and_relative_path(path, "OBS", &["obs"])?; + let op = Self::cached_operator(operators, "OBS", &bucket, || { + super::obs_config_build(config, path) + })?; + Ok((op, relative_path)) + } + #[cfg(feature = "storage-gcs")] + Storage::Gcs { config, operators } => { + let (bucket, relative_path) = + Self::bucket_and_relative_path(path, "GCS", &["gcs", "gs"])?; + let op = Self::cached_operator(operators, "GCS", &bucket, || { + super::gcs_config_build(config, path) + })?; + Ok((op, relative_path)) + } #[cfg(feature = "storage-hdfs")] Storage::Hdfs { config, op } => { let relative_path = super::hdfs_relative_path(path)?; @@ -166,59 +281,52 @@ impl Storage { } } - #[cfg(feature = "storage-oss")] - fn oss_bucket_and_relative_path(path: &str) -> crate::Result<(String, &str)> { - let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid { - message: format!("Invalid OSS url: {path}"), - })?; - let bucket = url - .host_str() - .ok_or_else(|| error::Error::ConfigInvalid { - message: format!("Invalid OSS url: {path}, missing bucket"), - })? - .to_string(); - let prefix = format!("oss://{bucket}/"); - let relative_path = - path.strip_prefix(&prefix) - .ok_or_else(|| error::Error::ConfigInvalid { - message: format!("Invalid OSS url: {path}, should start with {prefix}"), - })?; - Ok((bucket, relative_path)) - } - - #[cfg(feature = "storage-s3")] - fn s3_bucket_and_relative_path(path: &str) -> crate::Result<(String, &str)> { + #[cfg(any( + feature = "storage-cos", + feature = "storage-gcs", + feature = "storage-obs", + feature = "storage-oss", + feature = "storage-s3" + ))] + fn bucket_and_relative_path<'a>( + path: &'a str, + storage_name: &str, + allowed_schemes: &[&str], + ) -> crate::Result<(String, &'a str)> { let url = Url::parse(path).map_err(|_| error::Error::ConfigInvalid { - message: format!("Invalid S3 url: {path}"), + message: format!("Invalid {storage_name} url: {path}"), })?; let bucket = url .host_str() .ok_or_else(|| error::Error::ConfigInvalid { - message: format!("Invalid S3 url: {path}, missing bucket"), + message: format!("Invalid {storage_name} url: {path}, missing bucket"), })? .to_string(); let scheme = url.scheme(); - let prefix = match scheme { - "s3" | "s3a" => format!("{scheme}://{bucket}/"), - _ => { - return Err(error::Error::ConfigInvalid { - message: format!( - "Invalid S3 url: {path}, should start with s3://{bucket}/ or s3a://{bucket}/" - ), - }); - } - }; + if !allowed_schemes.contains(&scheme) { + return Err(error::Error::ConfigInvalid { + message: format!("Invalid {storage_name} url: {path}, unsupported scheme {scheme}"), + }); + } + let prefix = format!("{scheme}://{bucket}/"); let relative_path = path.strip_prefix(&prefix) .ok_or_else(|| error::Error::ConfigInvalid { message: format!( - "Invalid S3 url: {path}, should start with s3://{bucket}/ or s3a://{bucket}/" - ), + "Invalid {storage_name} url: {path}, should start with {prefix}" + ), })?; Ok((bucket, relative_path)) } - #[cfg(any(feature = "storage-oss", feature = "storage-s3"))] + #[cfg(any( + feature = "storage-azdls", + feature = "storage-cos", + feature = "storage-gcs", + feature = "storage-oss", + feature = "storage-obs", + feature = "storage-s3" + ))] fn lock_operator_cache<'a>( operators: &'a Mutex>, storage_name: &str, @@ -229,6 +337,30 @@ impl Storage { }) } + #[cfg(any( + feature = "storage-azdls", + feature = "storage-cos", + feature = "storage-gcs", + feature = "storage-oss", + feature = "storage-obs", + feature = "storage-s3" + ))] + fn cached_operator( + operators: &Mutex>, + storage_name: &str, + cache_key: &str, + build: impl FnOnce() -> crate::Result, + ) -> crate::Result { + let mut operators = Self::lock_operator_cache(operators, storage_name)?; + if let Some(op) = operators.get(cache_key) { + return Ok(op.clone()); + } + + let op = build()?; + operators.insert(cache_key.to_string(), op.clone()); + Ok(op) + } + #[cfg(feature = "storage-oss")] fn cached_oss_operator( config: &OssConfig, @@ -236,14 +368,9 @@ impl Storage { path: &str, bucket: &str, ) -> crate::Result { - let mut operators = Self::lock_operator_cache(operators, "OSS")?; - if let Some(op) = operators.get(bucket) { - return Ok(op.clone()); - } - - let op = super::oss_config_build(config, path)?; - operators.insert(bucket.to_string(), op.clone()); - Ok(op) + Self::cached_operator(operators, "OSS", bucket, || { + super::oss_config_build(config, path) + }) } #[cfg(feature = "storage-s3")] @@ -253,14 +380,9 @@ impl Storage { path: &str, bucket: &str, ) -> crate::Result { - let mut operators = Self::lock_operator_cache(operators, "S3")?; - if let Some(op) = operators.get(bucket) { - return Ok(op.clone()); - } - - let op = super::s3_config_build(config, path)?; - operators.insert(bucket.to_string(), op.clone()); - Ok(op) + Self::cached_operator(operators, "S3", bucket, || { + super::s3_config_build(config, path) + }) } fn parse_scheme(scheme: &str) -> crate::Result { @@ -268,6 +390,9 @@ impl Storage { "memory" => Ok(Scheme::Memory), "file" | "" => Ok(Scheme::Fs), "s3" | "s3a" => Ok(Scheme::S3), + "cosn" => Ok(Scheme::Cos), + "abfs" | "abfss" | "az" | "azure" => Ok(Scheme::Azdls), + "gs" => Ok(Scheme::Gcs), "hdfs" => Ok(Scheme::HdfsNative), s => Ok(s.parse::()?), } diff --git a/crates/paimon/src/io/storage_azdls.rs b/crates/paimon/src/io/storage_azdls.rs new file mode 100644 index 00000000..8a71cdd4 --- /dev/null +++ b/crates/paimon/src/io/storage_azdls.rs @@ -0,0 +1,405 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use opendal::services::AzdlsConfig; +use opendal::{Configurator, Operator}; +use url::Url; + +use crate::error::Error; +use crate::Result; + +use super::storage_config::normalize_storage_config; + +const AZURE_ENDPOINT: &str = "azure.endpoint"; +const AZURE_ACCOUNT_NAME: &str = "azure.account-name"; +const AZURE_ACCOUNT_KEY: &str = "azure.account-key"; +const AZURE_SAS_TOKEN: &str = "azure.sas-token"; + +const CONFIG_PREFIXES: &[&str] = &["fs.azure.", "fs.abfs.", "abfs.", "abfss.", "azure."]; +const MIRRORED_KEYS: &[(&str, &str)] = &[ + ("azure.account-name", "azure.account.name"), + ("azure.account_name", "azure.account.name"), + ("azure.account-key", "azure.account.key"), + ("azure.account_key", "azure.account.key"), + ("azure.sas-token", "azure.sas.token"), + ("azure.sas_token", "azure.sas.token"), + ("azure.client-id", "azure.client.id"), + ("azure.client_id", "azure.client.id"), + ("azure.client-secret", "azure.client.secret"), + ("azure.client_secret", "azure.client.secret"), + ("azure.tenant-id", "azure.tenant.id"), + ("azure.tenant_id", "azure.tenant.id"), + ("azure.authority-host", "azure.authority.host"), + ("azure.authority_host", "azure.authority.host"), +]; + +#[derive(Debug, Clone)] +pub struct AzdlsStorageConfig { + config: AzdlsConfig, + normalized: HashMap, +} + +pub(crate) fn azdls_config_parse(props: HashMap) -> Result { + let normalized = normalize_storage_config(props, CONFIG_PREFIXES, "azure.", MIRRORED_KEYS); + let config = config_from_normalized(&normalized); + + Ok(AzdlsStorageConfig { config, normalized }) +} + +pub(crate) fn azdls_config_build(cfg: &AzdlsStorageConfig, path: &str) -> Result { + let (cfg, relative_path) = azdls_config_for_path(cfg, path)?; + + let builder = cfg.into_builder(); + let op = Operator::new(builder)?.finish(); + + debug_assert_eq!( + relative_path, + azdls_relative_path(path).unwrap_or(relative_path) + ); + Ok(op) +} + +pub(crate) fn azdls_operator_cache_key(cfg: &AzdlsStorageConfig, path: &str) -> Result { + let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { + message: format!("Invalid Azure url: {path}"), + })?; + let filesystem = if cfg.config.filesystem.is_empty() { + filesystem_from_url(&url, path)? + } else { + cfg.config.filesystem.clone() + }; + let endpoint = effective_endpoint(&cfg.config, &url)?; + + Ok(format!("{}|{}", endpoint.trim_end_matches('/'), filesystem)) +} + +fn azdls_config_for_path<'a>( + storage_cfg: &AzdlsStorageConfig, + path: &'a str, +) -> Result<(AzdlsConfig, &'a str)> { + let (filesystem, relative_path) = azdls_filesystem_and_relative_path(path)?; + let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { + message: format!("Invalid Azure url: {path}"), + })?; + + let mut cfg = storage_cfg.config.clone(); + if cfg.filesystem.is_empty() { + cfg.filesystem = filesystem; + } + + let endpoint = effective_endpoint(&cfg, &url)?; + apply_account_scoped_config(&mut cfg, &storage_cfg.normalized, &endpoint); + cfg.endpoint = Some(endpoint); + cfg.root = Some("/".to_string()); + + Ok((cfg, relative_path)) +} + +fn config_from_normalized(normalized: &HashMap) -> AzdlsConfig { + AzdlsConfig { + endpoint: normalized.get(AZURE_ENDPOINT).cloned(), + account_name: normalized.get(AZURE_ACCOUNT_NAME).cloned(), + account_key: normalized.get(AZURE_ACCOUNT_KEY).cloned(), + sas_token: normalized.get(AZURE_SAS_TOKEN).cloned(), + client_id: normalized.get("azure.client-id").cloned(), + client_secret: normalized.get("azure.client-secret").cloned(), + tenant_id: normalized.get("azure.tenant-id").cloned(), + authority_host: normalized.get("azure.authority-host").cloned(), + ..Default::default() + } +} + +fn effective_endpoint(cfg: &AzdlsConfig, url: &Url) -> Result { + cfg.endpoint + .as_ref() + .map(|endpoint| endpoint.trim_end_matches('/').to_string()) + .map(Ok) + .unwrap_or_else(|| default_endpoint(url)) +} + +pub(crate) fn azdls_filesystem_and_relative_path(path: &str) -> Result<(String, &str)> { + let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { + message: format!("Invalid Azure url: {path}"), + })?; + + let filesystem = filesystem_from_url(&url, path)?; + + Ok((filesystem, azdls_relative_path(path)?)) +} + +pub(crate) fn azdls_relative_path(path: &str) -> Result<&str> { + let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { + message: format!("Invalid Azure url: {path}"), + })?; + + let path_start = path + .find("://") + .map(|pos| pos + 3) + .ok_or_else(|| Error::ConfigInvalid { + message: format!("Invalid Azure url: {path}"), + })?; + let after_scheme = &path[path_start..]; + let path_start = after_scheme.find('/').map(|pos| path_start + pos + 1); + let url_path = path_start.map(|pos| &path[pos..]).unwrap_or(""); + + if !url.username().is_empty() + || !url + .host_str() + .ok_or_else(|| Error::ConfigInvalid { + message: format!("Invalid Azure url: {path}, missing filesystem"), + })? + .contains('.') + { + Ok(url_path) + } else { + let (_filesystem, relative_path) = url_path.split_once('/').unwrap_or((url_path, "")); + Ok(relative_path) + } +} + +fn filesystem_from_url(url: &Url, path: &str) -> Result { + if !url.username().is_empty() { + return Ok(url.username().to_string()); + } + + let host = url.host_str().ok_or_else(|| Error::ConfigInvalid { + message: format!("Invalid Azure url: {path}, missing filesystem"), + })?; + + if !host.contains('.') { + return Ok(host.to_string()); + } + + url.path() + .strip_prefix('/') + .unwrap_or(url.path()) + .split('/') + .next() + .filter(|v| !v.is_empty()) + .ok_or_else(|| Error::ConfigInvalid { + message: format!("Invalid Azure url: {path}, missing filesystem"), + }) + .map(|v| v.to_string()) +} + +fn default_endpoint(url: &Url) -> Result { + if !url.username().is_empty() { + let host = url.host_str().ok_or_else(|| Error::ConfigInvalid { + message: format!("Invalid Azure url: {url}, missing account host"), + })?; + return Ok(format!("https://{host}")); + } + + let host = url.host_str().ok_or_else(|| Error::ConfigInvalid { + message: format!("Invalid Azure url: {url}, missing account"), + })?; + + if host.contains('.') { + Ok(format!("https://{host}")) + } else { + Err(Error::ConfigInvalid { + message: format!( + "Invalid Azure url: {url}, missing account host; set azure.endpoint for {host}" + ), + }) + } +} + +fn apply_account_scoped_config( + cfg: &mut AzdlsConfig, + normalized: &HashMap, + endpoint: &str, +) { + let Some(host) = endpoint_host(endpoint) else { + return; + }; + let account = host.split('.').next().unwrap_or(host.as_str()); + + if cfg.account_key.is_none() { + cfg.account_key = first_scoped_value( + normalized, + &[ + "azure.account.key", + "azure.account-key", + "azure.account_key", + ], + &[host.as_str(), account], + ); + } + + if cfg.sas_token.is_none() { + cfg.sas_token = first_scoped_value( + normalized, + &[ + "azure.sas.token", + "azure.sas-token", + "azure.sas_token", + "azure.sas.fixed.token", + "azure.fixed.sas.token", + ], + &[host.as_str(), account], + ); + } +} + +fn endpoint_host(endpoint: &str) -> Option { + Url::parse(endpoint) + .ok() + .and_then(|url| url.host_str().map(|host| host.to_string())) +} + +fn first_scoped_value( + normalized: &HashMap, + prefixes: &[&str], + suffixes: &[&str], +) -> Option { + prefixes.iter().find_map(|prefix| { + suffixes + .iter() + .find_map(|suffix| normalized.get(&format!("{prefix}.{suffix}")).cloned()) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_props(pairs: &[(&str, &str)]) -> HashMap { + pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + #[test] + fn test_azdls_config_parse_keys() { + let props = make_props(&[ + ("fs.azure.account.key", "key"), + ("fs.azure.sas.token", "sas"), + ("azure.endpoint", "https://account.dfs.core.windows.net"), + ]); + + let cfg = azdls_config_parse(props).unwrap(); + assert_eq!( + cfg.config.endpoint.as_deref(), + Some("https://account.dfs.core.windows.net") + ); + assert_eq!(cfg.config.account_key.as_deref(), Some("key")); + assert_eq!(cfg.config.sas_token.as_deref(), Some("sas")); + } + + #[test] + fn test_azdls_config_parse_aliases() { + let props = make_props(&[ + ("azure.account-name", "account"), + ("azure.client-secret", "secret"), + ("azure.tenant-id", "tenant"), + ]); + + let cfg = azdls_config_parse(props).unwrap(); + assert_eq!(cfg.config.account_name.as_deref(), Some("account")); + assert_eq!(cfg.config.client_secret.as_deref(), Some("secret")); + assert_eq!(cfg.config.tenant_id.as_deref(), Some("tenant")); + } + + #[test] + fn test_azdls_config_uses_account_scoped_hadoop_key() { + let cfg = azdls_config_parse(make_props(&[( + "fs.azure.account.key.account.dfs.core.windows.net", + "account-key", + )])) + .unwrap(); + + let (cfg, _) = + azdls_config_for_path(&cfg, "abfs://fs@account.dfs.core.windows.net/path/to/file") + .unwrap(); + + assert_eq!(cfg.account_key.as_deref(), Some("account-key")); + } + + #[test] + fn test_azdls_path_hadoop_authority_form() { + let (filesystem, relative_path) = + azdls_filesystem_and_relative_path("abfs://fs@account.dfs.core.windows.net/a/b") + .unwrap(); + assert_eq!(filesystem, "fs"); + assert_eq!(relative_path, "a/b"); + } + + #[test] + fn test_azdls_path_fsspec_form() { + let (filesystem, relative_path) = + azdls_filesystem_and_relative_path("abfs://fs/a/b").unwrap(); + assert_eq!(filesystem, "fs"); + assert_eq!(relative_path, "a/b"); + } + + #[test] + fn test_azdls_config_build_hadoop_form() { + let cfg = azdls_config_parse(HashMap::new()).unwrap(); + + let op = azdls_config_build(&cfg, "abfs://fs@account.dfs.core.windows.net/a/b").unwrap(); + assert_eq!(op.info().name(), "fs"); + } + + #[test] + fn test_azdls_config_build_fsspec_form_requires_endpoint() { + let cfg = azdls_config_parse(HashMap::new()).unwrap(); + let result = azdls_config_build(&cfg, "abfs://fs/a/b"); + assert!(result.is_err()); + } + + #[test] + fn test_azdls_config_build_fsspec_form_with_endpoint() { + let cfg = azdls_config_parse(make_props(&[( + "azure.endpoint", + "https://account.dfs.core.windows.net", + )])) + .unwrap(); + + let op = azdls_config_build(&cfg, "abfs://fs/a/b").unwrap(); + assert_eq!(op.info().name(), "fs"); + } + + #[test] + fn test_azdls_cache_key_includes_account_host() { + let cfg = azdls_config_parse(HashMap::new()).unwrap(); + + let account_a = azdls_operator_cache_key( + &cfg, + "abfs://fs@account-a.dfs.core.windows.net/path/to/file", + ) + .unwrap(); + let account_b = azdls_operator_cache_key( + &cfg, + "abfs://fs@account-b.dfs.core.windows.net/path/to/file", + ) + .unwrap(); + + assert_ne!(account_a, account_b); + assert_eq!(account_a, "https://account-a.dfs.core.windows.net|fs"); + } + + #[test] + fn test_azdls_config_build_missing_filesystem() { + let cfg = azdls_config_parse(HashMap::new()).unwrap(); + let result = azdls_config_build(&cfg, "abfs:///path/without/filesystem"); + assert!(result.is_err()); + } +} diff --git a/crates/paimon/src/io/storage_config.rs b/crates/paimon/src/io/storage_config.rs new file mode 100644 index 00000000..90206db0 --- /dev/null +++ b/crates/paimon/src/io/storage_config.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +pub(super) fn normalize_storage_config( + props: HashMap, + config_prefixes: &[&str], + canonical_prefix: &str, + mirrored_keys: &[(&str, &str)], +) -> HashMap { + let mut result = HashMap::new(); + + for prefix in config_prefixes { + for (key, value) in &props { + if let Some(suffix) = key.strip_prefix(prefix) { + result.insert(format!("{canonical_prefix}{suffix}"), value.clone()); + } + } + } + + let mirrored_additions: Vec<(String, String)> = mirrored_keys + .iter() + .flat_map(|(a, b)| { + let mut pairs = Vec::new(); + + if !result.contains_key(*b) { + if let Some(v) = result.get(*a) { + pairs.push((b.to_string(), v.clone())); + } + } + if !result.contains_key(*a) { + if let Some(v) = result.get(*b) { + pairs.push((a.to_string(), v.clone())); + } + } + pairs + }) + .collect(); + + for (k, v) in mirrored_additions { + result.insert(k, v); + } + + result +} diff --git a/crates/paimon/src/io/storage_cos.rs b/crates/paimon/src/io/storage_cos.rs new file mode 100644 index 00000000..b8bb3ea4 --- /dev/null +++ b/crates/paimon/src/io/storage_cos.rs @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use opendal::services::CosConfig; +use opendal::{Configurator, Operator}; +use url::Url; + +use crate::error::Error; +use crate::Result; + +use super::storage_config::normalize_storage_config; + +const COS_ENDPOINT: &str = "fs.cosn.endpoint"; +const COS_SECRET_ID: &str = "fs.cosn.userinfo.secretId"; +const COS_SECRET_KEY: &str = "fs.cosn.userinfo.secretKey"; + +const CONFIG_PREFIXES: &[&str] = &["fs.cosn.", "cosn.", "cos."]; +const MIRRORED_KEYS: &[(&str, &str)] = &[ + ("fs.cosn.endpoint", "fs.cosn.userinfo.endpoint"), + ("fs.cosn.secret_id", "fs.cosn.userinfo.secretId"), + ("fs.cosn.secret-id", "fs.cosn.userinfo.secretId"), + ("fs.cosn.secret_key", "fs.cosn.userinfo.secretKey"), + ("fs.cosn.secret-key", "fs.cosn.userinfo.secretKey"), +]; + +pub(crate) fn cos_config_parse(props: HashMap) -> Result { + let normalized = normalize_storage_config(props, CONFIG_PREFIXES, "fs.cosn.", MIRRORED_KEYS); + + let cfg = CosConfig { + endpoint: normalized.get(COS_ENDPOINT).cloned(), + secret_id: normalized.get(COS_SECRET_ID).cloned(), + secret_key: normalized.get(COS_SECRET_KEY).cloned(), + enable_versioning: normalized + .get("fs.cosn.enable-versioning") + .is_some_and(|v| v.eq_ignore_ascii_case("true")), + disable_config_load: normalized + .get("fs.cosn.disable-config-load") + .is_some_and(|v| v.eq_ignore_ascii_case("true")), + ..Default::default() + }; + + Ok(cfg) +} + +pub(crate) fn cos_config_build(cfg: &CosConfig, path: &str) -> Result { + let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { + message: format!("Invalid COS url: {path}"), + })?; + + let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid { + message: format!("Invalid COS url: {path}, missing bucket"), + })?; + + let builder = cfg.clone().into_builder().bucket(bucket); + Ok(Operator::new(builder)?.finish()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_props(pairs: &[(&str, &str)]) -> HashMap { + pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + #[test] + fn test_cos_config_parse_hadoop_keys() { + let props = make_props(&[ + ("fs.cosn.endpoint", "https://cos.ap-shanghai.myqcloud.com"), + ("fs.cosn.userinfo.secretId", "sid"), + ("fs.cosn.userinfo.secretKey", "skey"), + ]); + + let cfg = cos_config_parse(props).unwrap(); + assert_eq!( + cfg.endpoint.as_deref(), + Some("https://cos.ap-shanghai.myqcloud.com") + ); + assert_eq!(cfg.secret_id.as_deref(), Some("sid")); + assert_eq!(cfg.secret_key.as_deref(), Some("skey")); + } + + #[test] + fn test_cos_config_parse_canonical_aliases() { + let props = make_props(&[ + ("cos.endpoint", "https://cos.ap-singapore.myqcloud.com"), + ("cos.secret-id", "sid"), + ("cos.secret-key", "skey"), + ]); + + let cfg = cos_config_parse(props).unwrap(); + assert_eq!( + cfg.endpoint.as_deref(), + Some("https://cos.ap-singapore.myqcloud.com") + ); + assert_eq!(cfg.secret_id.as_deref(), Some("sid")); + assert_eq!(cfg.secret_key.as_deref(), Some("skey")); + } + + #[test] + fn test_cos_config_build_extracts_bucket() { + let cfg = CosConfig { + endpoint: Some("https://cos.ap-shanghai.myqcloud.com".to_string()), + ..Default::default() + }; + + let op = cos_config_build(&cfg, "cosn://my-bucket/some/path").unwrap(); + assert_eq!(op.info().name(), "my-bucket"); + } + + #[test] + fn test_cos_config_build_missing_bucket() { + let cfg = CosConfig::default(); + let result = cos_config_build(&cfg, "cosn:///path/without/bucket"); + assert!(result.is_err()); + } +} diff --git a/crates/paimon/src/io/storage_gcs.rs b/crates/paimon/src/io/storage_gcs.rs new file mode 100644 index 00000000..575dae3f --- /dev/null +++ b/crates/paimon/src/io/storage_gcs.rs @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use opendal::services::GcsConfig; +use opendal::{Configurator, Operator}; +use url::Url; + +use crate::error::Error; +use crate::Result; + +use super::storage_config::normalize_storage_config; + +const GCS_ENDPOINT: &str = "gcs.endpoint"; +const GCS_CREDENTIAL: &str = "gcs.credential"; +const GCS_CREDENTIAL_PATH: &str = "gcs.credential-path"; +const GCS_SERVICE_ACCOUNT: &str = "gcs.service-account"; +const GCS_ALLOW_ANONYMOUS: &str = "gcs.allow-anonymous"; + +const CONFIG_PREFIXES: &[&str] = &["fs.gs.", "fs.gcs.", "gs.", "gcs."]; +const MIRRORED_KEYS: &[(&str, &str)] = &[ + ("gcs.credential-path", "gcs.google_application_credentials"), + ("gcs.credential-path", "gcs.google-application-credentials"), + ("gcs.credential-path", "gcs.application-credentials"), + ("gcs.credential", "gcs.google_service_account_key"), + ("gcs.credential", "gcs.google-service-account-key"), + ("gcs.credential", "gcs.service-account-key"), + ("gcs.credential", "gcs.service_account_key"), + ("gcs.service-account", "gcs.google_service_account"), + ("gcs.service-account", "gcs.google-service-account"), + ("gcs.service-account", "gcs.service_account"), + ("gcs.predefined-acl", "gcs.predefined_acl"), + ("gcs.default-storage-class", "gcs.default_storage_class"), + ("gcs.allow-anonymous", "gcs.google_skip_signature"), + ("gcs.allow-anonymous", "gcs.google-skip-signature"), + ("gcs.allow_anonymous", "gcs.google_skip_signature"), + ("gcs.allow-anonymous", "gcs.allow_anonymous"), + ("gcs.allow-anonymous", "gcs.skip-signature"), + ("gcs.allow-anonymous", "gcs.skip_signature"), + ("gcs.skip-signature", "gcs.google_skip_signature"), + ("gcs.skip_signature", "gcs.google_skip_signature"), + ("gcs.disable-vm-metadata", "gcs.disable_vm_metadata"), + ("gcs.disable-config-load", "gcs.disable_config_load"), +]; + +#[allow(clippy::field_reassign_with_default)] +pub(crate) fn gcs_config_parse(props: HashMap) -> Result { + let normalized = normalize_storage_config(props, CONFIG_PREFIXES, "gcs.", MIRRORED_KEYS); + + let mut cfg = GcsConfig::default(); + cfg.endpoint = normalized.get(GCS_ENDPOINT).cloned(); + cfg.credential = normalized.get(GCS_CREDENTIAL).cloned(); + cfg.credential_path = normalized.get(GCS_CREDENTIAL_PATH).cloned(); + cfg.service_account = normalized.get(GCS_SERVICE_ACCOUNT).cloned(); + cfg.scope = normalized.get("gcs.scope").cloned(); + cfg.predefined_acl = normalized.get("gcs.predefined-acl").cloned(); + cfg.default_storage_class = normalized.get("gcs.default-storage-class").cloned(); + cfg.token = normalized.get("gcs.token").cloned(); + cfg.allow_anonymous = normalized + .get(GCS_ALLOW_ANONYMOUS) + .is_some_and(|v| v.eq_ignore_ascii_case("true")); + cfg.disable_vm_metadata = normalized + .get("gcs.disable-vm-metadata") + .is_some_and(|v| v.eq_ignore_ascii_case("true")); + cfg.disable_config_load = normalized + .get("gcs.disable-config-load") + .is_some_and(|v| v.eq_ignore_ascii_case("true")); + + Ok(cfg) +} + +pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) -> Result { + let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { + message: format!("Invalid GCS url: {path}"), + })?; + + let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid { + message: format!("Invalid GCS url: {path}, missing bucket"), + })?; + + let builder = cfg.clone().into_builder().bucket(bucket); + Ok(Operator::new(builder)?.finish()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_props(pairs: &[(&str, &str)]) -> HashMap { + pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + #[test] + fn test_gcs_config_parse_keys() { + let props = make_props(&[ + ("fs.gs.endpoint", "https://storage.googleapis.com"), + ("fs.gs.google_application_credentials", "/tmp/gcs.json"), + ("fs.gs.google_service_account_key", "credential-json"), + ( + "fs.gs.google_service_account", + "sa@example.iam.gserviceaccount.com", + ), + ("fs.gs.predefined_acl", "bucketOwnerFullControl"), + ("fs.gs.default_storage_class", "NEARLINE"), + ]); + + let cfg = gcs_config_parse(props).unwrap(); + assert_eq!( + cfg.endpoint.as_deref(), + Some("https://storage.googleapis.com") + ); + assert_eq!(cfg.credential_path.as_deref(), Some("/tmp/gcs.json")); + assert_eq!(cfg.credential.as_deref(), Some("credential-json")); + assert_eq!( + cfg.service_account.as_deref(), + Some("sa@example.iam.gserviceaccount.com") + ); + assert_eq!( + cfg.predefined_acl.as_deref(), + Some("bucketOwnerFullControl") + ); + assert_eq!(cfg.default_storage_class.as_deref(), Some("NEARLINE")); + } + + #[test] + fn test_gcs_config_parse_canonical_aliases() { + let props = make_props(&[ + ("gcs.credential-path", "/tmp/gcs.json"), + ("gcs.allow-anonymous", "true"), + ]); + + let cfg = gcs_config_parse(props).unwrap(); + assert_eq!(cfg.credential_path.as_deref(), Some("/tmp/gcs.json")); + assert!(cfg.allow_anonymous); + } + + #[test] + fn test_gcs_config_parse_opendal_aliases() { + let props = make_props(&[ + ( + "gcs.google_application_credentials", + "/tmp/opendal-gcs.json", + ), + ("gcs.google_service_account_key", "credential-json"), + ( + "gcs.google_service_account", + "opendal-sa@example.iam.gserviceaccount.com", + ), + ("gcs.google_skip_signature", "true"), + ("gcs.disable_vm_metadata", "true"), + ("gcs.disable_config_load", "true"), + ]); + + let cfg = gcs_config_parse(props).unwrap(); + assert_eq!( + cfg.credential_path.as_deref(), + Some("/tmp/opendal-gcs.json") + ); + assert_eq!(cfg.credential.as_deref(), Some("credential-json")); + assert_eq!( + cfg.service_account.as_deref(), + Some("opendal-sa@example.iam.gserviceaccount.com") + ); + assert!(cfg.allow_anonymous); + assert!(cfg.disable_vm_metadata); + assert!(cfg.disable_config_load); + } + + #[test] + fn test_gcs_config_build_extracts_bucket() { + let cfg = GcsConfig::default(); + + let op = gcs_config_build(&cfg, "gs://my-bucket/some/path").unwrap(); + assert_eq!(op.info().name(), "my-bucket"); + } + + #[test] + fn test_gcs_config_build_missing_bucket() { + let cfg = GcsConfig::default(); + let result = gcs_config_build(&cfg, "gs:///path/without/bucket"); + assert!(result.is_err()); + } +} diff --git a/crates/paimon/src/io/storage_obs.rs b/crates/paimon/src/io/storage_obs.rs new file mode 100644 index 00000000..2fde849e --- /dev/null +++ b/crates/paimon/src/io/storage_obs.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use opendal::services::ObsConfig; +use opendal::{Configurator, Operator}; +use url::Url; + +use crate::error::Error; +use crate::Result; + +use super::storage_config::normalize_storage_config; + +const OBS_ENDPOINT: &str = "fs.obs.endpoint"; +const OBS_ACCESS_KEY_ID: &str = "fs.obs.access.key"; +const OBS_SECRET_ACCESS_KEY: &str = "fs.obs.secret.key"; + +const CONFIG_PREFIXES: &[&str] = &["fs.obs.", "obs."]; +const MIRRORED_KEYS: &[(&str, &str)] = &[ + ("fs.obs.access-key-id", "fs.obs.access.key"), + ("fs.obs.access_key_id", "fs.obs.access.key"), + ("fs.obs.secret-access-key", "fs.obs.secret.key"), + ("fs.obs.secret_access_key", "fs.obs.secret.key"), +]; + +#[allow(clippy::field_reassign_with_default)] +pub(crate) fn obs_config_parse(props: HashMap) -> Result { + let normalized = normalize_storage_config(props, CONFIG_PREFIXES, "fs.obs.", MIRRORED_KEYS); + + let mut cfg = ObsConfig::default(); + cfg.endpoint = normalized.get(OBS_ENDPOINT).cloned(); + cfg.access_key_id = normalized.get(OBS_ACCESS_KEY_ID).cloned(); + cfg.secret_access_key = normalized.get(OBS_SECRET_ACCESS_KEY).cloned(); + cfg.enable_versioning = normalized + .get("fs.obs.enable-versioning") + .is_some_and(|v| v.eq_ignore_ascii_case("true")); + + Ok(cfg) +} + +pub(crate) fn obs_config_build(cfg: &ObsConfig, path: &str) -> Result { + let url = Url::parse(path).map_err(|_| Error::ConfigInvalid { + message: format!("Invalid OBS url: {path}"), + })?; + + let bucket = url.host_str().ok_or_else(|| Error::ConfigInvalid { + message: format!("Invalid OBS url: {path}, missing bucket"), + })?; + + let builder = cfg.clone().into_builder().bucket(bucket); + Ok(Operator::new(builder)?.finish()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_props(pairs: &[(&str, &str)]) -> HashMap { + pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + #[test] + fn test_obs_config_parse_hadoop_keys() { + let props = make_props(&[ + ( + "fs.obs.endpoint", + "https://obs.cn-north-4.myhuaweicloud.com", + ), + ("fs.obs.access.key", "ak"), + ("fs.obs.secret.key", "sk"), + ]); + + let cfg = obs_config_parse(props).unwrap(); + assert_eq!( + cfg.endpoint.as_deref(), + Some("https://obs.cn-north-4.myhuaweicloud.com") + ); + assert_eq!(cfg.access_key_id.as_deref(), Some("ak")); + assert_eq!(cfg.secret_access_key.as_deref(), Some("sk")); + } + + #[test] + fn test_obs_config_parse_canonical_aliases() { + let props = make_props(&[ + ("obs.endpoint", "https://obs.cn-north-4.myhuaweicloud.com"), + ("obs.access-key-id", "ak"), + ("obs.secret-access-key", "sk"), + ]); + + let cfg = obs_config_parse(props).unwrap(); + assert_eq!( + cfg.endpoint.as_deref(), + Some("https://obs.cn-north-4.myhuaweicloud.com") + ); + assert_eq!(cfg.access_key_id.as_deref(), Some("ak")); + assert_eq!(cfg.secret_access_key.as_deref(), Some("sk")); + } + + #[test] + #[allow(clippy::field_reassign_with_default)] + fn test_obs_config_build_extracts_bucket() { + let mut cfg = ObsConfig::default(); + cfg.endpoint = Some("https://obs.cn-north-4.myhuaweicloud.com".to_string()); + + let op = obs_config_build(&cfg, "obs://my-bucket/some/path").unwrap(); + assert_eq!(op.info().name(), "my-bucket"); + } + + #[test] + fn test_obs_config_build_missing_bucket() { + let cfg = ObsConfig::default(); + let result = obs_config_build(&cfg, "obs:///path/without/bucket"); + assert!(result.is_err()); + } +} diff --git a/crates/paimon/src/io/storage_s3.rs b/crates/paimon/src/io/storage_s3.rs index 57b77c75..6fe20b87 100644 --- a/crates/paimon/src/io/storage_s3.rs +++ b/crates/paimon/src/io/storage_s3.rs @@ -24,6 +24,8 @@ use url::Url; use crate::error::Error; use crate::Result; +use super::storage_config::normalize_storage_config; + /// Configuration key for S3 endpoint. /// /// Compatible with paimon-java's `s3.endpoint` / `fs.s3a.endpoint`. @@ -79,14 +81,18 @@ const MIRRORED_KEYS: &[(&str, &str)] = &[ /// By default, virtual-hosted style addressing is enabled (matching AWS /// and Java Paimon behavior). Set `s3.path-style-access=true` to switch /// to path-style for S3-compatible stores like MinIO. +#[allow(clippy::field_reassign_with_default)] pub(crate) fn s3_config_parse(props: HashMap) -> Result { - let normalized = normalize_config(props); + let normalized = normalize_storage_config(props, JAVA_CONFIG_PREFIXES, "s3.", MIRRORED_KEYS); let mut cfg = S3Config::default(); // Default to virtual-hosted style, matching AWS and Java Paimon. // Only disable when path-style-access is explicitly set to true. - cfg.enable_virtual_host_style = true; + let path_style_access = normalized + .get(S3_PATH_STYLE_ACCESS) + .is_some_and(|v| v.eq_ignore_ascii_case("true")); + cfg.enable_virtual_host_style = !path_style_access; // Core connection settings. cfg.endpoint = normalized.get(S3_ENDPOINT).cloned(); @@ -94,12 +100,6 @@ pub(crate) fn s3_config_parse(props: HashMap) -> Result) -> Result Result { Ok(Operator::new(builder)?.finish()) } -/// Normalize Java-compatible config keys to canonical `s3.*` form. -/// -/// 1. Strips known prefixes (`fs.s3a.`, `s3a.`, `s3.`) and remaps to `s3.*`. -/// 2. Applies mirrored key mappings for cross-compatibility. -/// 3. Earlier prefixes in the list take lower priority (later ones overwrite). -fn normalize_config(props: HashMap) -> HashMap { - let mut result = HashMap::new(); - - // First pass: normalize prefixes. Process in priority order — - // `fs.s3a.` (lowest) → `s3a.` → `s3.` (highest, canonical). - for prefix in JAVA_CONFIG_PREFIXES { - for (key, value) in &props { - if let Some(suffix) = key.strip_prefix(prefix) { - let canonical = format!("s3.{suffix}"); - result.insert(canonical, value.clone()); - } - } - } - - // Second pass: apply mirrored keys bidirectionally (only if target not already set). - let mirrored_additions: Vec<(String, String)> = MIRRORED_KEYS - .iter() - .flat_map(|(a, b)| { - let mut pairs = Vec::new(); - // a → b - if !result.contains_key(*b) { - if let Some(v) = result.get(*a) { - pairs.push((b.to_string(), v.clone())); - } - } - // b → a - if !result.contains_key(*a) { - if let Some(v) = result.get(*b) { - pairs.push((a.to_string(), v.clone())); - } - } - pairs - }) - .collect(); - - for (k, v) in mirrored_additions { - result.insert(k, v); - } - - result -} - #[cfg(test)] mod tests { use super::*; @@ -353,6 +304,7 @@ mod tests { } #[test] + #[allow(clippy::field_reassign_with_default)] fn test_s3_config_build_extracts_bucket() { let mut cfg = S3Config::default(); cfg.endpoint = Some("https://s3.us-east-1.amazonaws.com".to_string()); @@ -363,6 +315,7 @@ mod tests { } #[test] + #[allow(clippy::field_reassign_with_default)] fn test_s3_config_build_s3a_scheme() { let mut cfg = S3Config::default(); cfg.endpoint = Some("https://s3.us-east-1.amazonaws.com".to_string()); @@ -390,7 +343,8 @@ mod tests { fn test_mirrored_keys() { // `s3.access.key` (dot form) should be mirrored from `s3.access-key` (dash form) let props = make_props(&[("s3.access-key", "AKID")]); - let normalized = normalize_config(props); + let normalized = + normalize_storage_config(props, JAVA_CONFIG_PREFIXES, "s3.", MIRRORED_KEYS); assert_eq!( normalized.get("s3.access.key").map(|s| s.as_str()), Some("AKID") diff --git a/docs/src/architecture.md b/docs/src/architecture.md index f12950e8..e47fb75b 100644 --- a/docs/src/architecture.md +++ b/docs/src/architecture.md @@ -33,7 +33,7 @@ The core crate implements the Paimon table format, including: - **Table** — Table abstraction for reading Paimon tables - **Snapshot & Manifest** — Reading snapshot and manifest metadata - **Schema** — Table schema management and evolution -- **File IO** — Abstraction layer for storage backends (local filesystem, S3) +- **File IO** — Abstraction layer for storage backends (local filesystem, object stores, HDFS) - **File Format** — Parquet file reading and writing via Apache Arrow ### `crates/integrations/datafusion` — DataFusion Integration diff --git a/docs/src/getting-started.md b/docs/src/getting-started.md index 0b6d88aa..90ec311e 100644 --- a/docs/src/getting-started.md +++ b/docs/src/getting-started.md @@ -44,6 +44,11 @@ Available storage features: | `storage-memory` | In-memory | | `storage-s3` | Amazon S3 | | `storage-oss` | Alibaba Cloud OSS| +| `storage-cos` | Tencent Cloud COS| +| `storage-azdls` | Azure Data Lake Storage Gen2 | +| `storage-obs` | Huawei Cloud OBS | +| `storage-gcs` | Google Cloud Storage | +| `storage-hdfs` | HDFS | | `storage-all` | All of the above | ## Catalog Management @@ -78,6 +83,37 @@ options.set("fs.oss.accessKeySecret", "your-access-key-secret"); options.set("fs.oss.endpoint", "oss-cn-hangzhou.aliyuncs.com"); let catalog = CatalogFactory::create(options).await?; +// Tencent Cloud COS +let mut options = Options::new(); +options.set(CatalogOptions::WAREHOUSE, "cosn://bucket/warehouse"); +options.set("fs.cosn.userinfo.secretId", "your-secret-id"); +options.set("fs.cosn.userinfo.secretKey", "your-secret-key"); +options.set("fs.cosn.endpoint", "https://cos.ap-shanghai.myqcloud.com"); +let catalog = CatalogFactory::create(options).await?; + +// Azure Data Lake Storage Gen2 +let mut options = Options::new(); +options.set(CatalogOptions::WAREHOUSE, "abfs://filesystem@account.dfs.core.windows.net/warehouse"); +options.set("azure.account-key", "your-account-key"); +let catalog = CatalogFactory::create(options).await?; + +// If you use the short form "abfs://filesystem/warehouse", set the endpoint explicitly: +// options.set("azure.endpoint", "https://account.dfs.core.windows.net"); + +// Huawei Cloud OBS +let mut options = Options::new(); +options.set(CatalogOptions::WAREHOUSE, "obs://bucket/warehouse"); +options.set("fs.obs.access.key", "your-access-key-id"); +options.set("fs.obs.secret.key", "your-secret-access-key"); +options.set("fs.obs.endpoint", "https://obs.cn-north-4.myhuaweicloud.com"); +let catalog = CatalogFactory::create(options).await?; + +// Google Cloud Storage +let mut options = Options::new(); +options.set(CatalogOptions::WAREHOUSE, "gs://bucket/warehouse"); +options.set("gcs.credential-path", "/path/to/service-account.json"); +let catalog = CatalogFactory::create(options).await?; + // REST catalog let mut options = Options::new(); options.set(CatalogOptions::METASTORE, "rest"); diff --git a/docs/src/index.md b/docs/src/index.md index b9c14723..252c9dd7 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -28,7 +28,7 @@ Apache Paimon Rust provides native Rust libraries for reading and writing Paimon Key features: - Native Rust reader for Paimon table format -- Support for local filesystem, S3, and OSS storage backends +- Support for local filesystem, S3, OSS, COS, Azure, OBS, GCS, and HDFS storage backends - REST Catalog integration - Apache DataFusion integration for SQL queries