Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion crates/paimon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,28 @@ version.workspace = true

[features]
default = ["storage-memory", "storage-fs", "storage-oss"]
storage-all = ["storage-memory", "storage-fs", "storage-oss", "storage-s3", "storage-hdfs"]
storage-all = [
"storage-memory",
"storage-fs",
"storage-oss",
"storage-s3",
"storage-cos",
"storage-azdls",
"storage-obs",
"storage-gcs",
"storage-hdfs",
]
fulltext = ["tantivy", "tempfile"]
vortex = ["dep:vortex", "dep:kanal"]

storage-memory = ["opendal/services-memory"]
storage-fs = ["opendal/services-fs"]
storage-oss = ["opendal/services-oss"]
storage-s3 = ["opendal/services-s3"]
storage-cos = ["opendal/services-cos"]
storage-azdls = ["opendal/services-azdls"]
storage-obs = ["opendal/services-obs"]
storage-gcs = ["opendal/services-gcs"]
storage-hdfs = ["opendal/services-hdfs-native"]

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/paimon/src/catalog/rest/rest_token_file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ use crate::api::rest_api::RESTApi;
use crate::api::rest_util::RESTUtil;
use crate::catalog::Identifier;
use crate::common::{CatalogOptions, Options};
use crate::io::storage_oss::OSS_ENDPOINT;
use crate::io::FileIO;
use crate::Result;

use super::rest_token::RESTToken;

/// Safe time margin (in milliseconds) before token expiration to trigger refresh.
const TOKEN_EXPIRATION_SAFE_TIME_MILLIS: i64 = 3_600_000;
const OSS_ENDPOINT: &str = "fs.oss.endpoint";

/// A FileIO wrapper that supports getting data access tokens from a REST Server.
///
Expand Down
156 changes: 154 additions & 2 deletions crates/paimon/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl FileIO {
statuses.push(FileStatus {
size: meta.content_length(),
is_dir: meta.is_dir(),
path: format!("{base_path}{entry_path}"),
path: status_path(base_path, entry_path),
last_modified: meta
.last_modified()
.map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
Expand Down Expand Up @@ -186,7 +186,7 @@ impl FileIO {
statuses.push(FileStatus {
size: meta.content_length(),
is_dir: false,
path: format!("{base_path}{entry_path}"),
path: status_path(base_path, entry_path),
last_modified: meta
.last_modified()
.map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
Expand Down Expand Up @@ -280,6 +280,14 @@ impl FileIO {
}
}

fn status_path(base_path: &str, entry_path: &str) -> String {
if base_path.ends_with('/') || entry_path.starts_with('/') {
format!("{base_path}{entry_path}")
} else {
format!("{base_path}/{entry_path}")
}
}

fn looks_like_windows_drive_path(path: &str) -> bool {
let bytes = path.as_bytes();
bytes.len() >= 3
Expand Down Expand Up @@ -708,6 +716,150 @@ mod file_action_test {
}
}

#[cfg(all(
test,
any(
feature = "storage-cos",
feature = "storage-obs",
feature = "storage-gcs",
feature = "storage-azdls"
)
))]
mod object_storage_path_test {
use super::*;

fn assert_relative_paths(file_io: &FileIO, path: &str, expected_relative_path: &str) {
let input = file_io.new_input(path).unwrap();
assert_eq!(input.location(), path);
assert_eq!(
&input.path[input.relative_path_pos..],
expected_relative_path
);

let output = file_io.new_output(path).unwrap();
assert_eq!(output.location(), path);
assert_eq!(
&output.path[output.relative_path_pos..],
expected_relative_path
);

let (_op, relative_path) = file_io.storage.create(path).unwrap();
assert_eq!(relative_path, expected_relative_path);

let base_path = &path[..path.len() - relative_path.len()];
assert_eq!(format!("{base_path}{relative_path}"), path);
}

#[cfg(feature = "storage-azdls")]
#[test]
fn test_azdls_root_status_path_without_trailing_slash() {
assert_eq!(
status_path(
"abfs://filesystem@account.dfs.core.windows.net",
"warehouse/"
),
"abfs://filesystem@account.dfs.core.windows.net/warehouse/"
);
assert_eq!(
status_path(
"abfs://filesystem@account.dfs.core.windows.net/",
"warehouse/"
),
"abfs://filesystem@account.dfs.core.windows.net/warehouse/"
);
}

#[cfg(feature = "storage-cos")]
#[test]
fn test_cos_file_io_relative_paths_and_scheme_aliases() {
for scheme in ["cosn", "cos"] {
let path = format!("{scheme}://bucket/warehouse/table/data.parquet");
let dir_path = format!("{scheme}://bucket/warehouse/table/");
let file_io = FileIO::from_path(&path)
.unwrap()
.with_props([
("fs.cosn.endpoint", "https://cos.ap-shanghai.myqcloud.com"),
("fs.cosn.userinfo.secretId", "secret-id"),
("fs.cosn.userinfo.secretKey", "secret-key"),
("fs.cosn.disable-config-load", "true"),
])
.build()
.unwrap();

assert_relative_paths(&file_io, &path, "warehouse/table/data.parquet");
assert_relative_paths(&file_io, &dir_path, "warehouse/table/");
}
}

#[cfg(feature = "storage-obs")]
#[test]
fn test_obs_file_io_relative_paths() {
let file_io = FileIO::from_path("obs://bucket/warehouse")
.unwrap()
.with_props([
(
"fs.obs.endpoint",
"https://obs.cn-north-4.myhuaweicloud.com",
),
("fs.obs.access.key", "access-key"),
("fs.obs.secret.key", "secret-key"),
])
.build()
.unwrap();

assert_relative_paths(
&file_io,
"obs://bucket/warehouse/table/data.parquet",
"warehouse/table/data.parquet",
);
assert_relative_paths(
&file_io,
"obs://bucket/warehouse/table/",
"warehouse/table/",
);
}

#[cfg(feature = "storage-gcs")]
#[test]
fn test_gcs_file_io_relative_paths_and_scheme_aliases() {
for scheme in ["gs", "gcs"] {
let path = format!("{scheme}://bucket/warehouse/table/data.parquet");
let dir_path = format!("{scheme}://bucket/warehouse/table/");
let file_io = FileIO::from_path(&path)
.unwrap()
.with_props([
("gcs.allow-anonymous", "true"),
("gcs.disable-config-load", "true"),
("gcs.disable-vm-metadata", "true"),
])
.build()
.unwrap();

assert_relative_paths(&file_io, &path, "warehouse/table/data.parquet");
assert_relative_paths(&file_io, &dir_path, "warehouse/table/");
}
}

#[cfg(feature = "storage-azdls")]
#[test]
fn test_azdls_file_io_relative_paths_and_scheme_aliases() {
for scheme in ["abfs", "abfss"] {
let path = format!(
"{scheme}://filesystem@account.dfs.core.windows.net/warehouse/data.parquet"
);
let dir_path = format!("{scheme}://filesystem@account.dfs.core.windows.net/warehouse/");
let file_io = FileIO::from_path(&path)
.unwrap()
.with_prop("azure.account-key", "account-key")
.build()
.unwrap();

assert_relative_paths(&file_io, &path, "warehouse/data.parquet");
assert_relative_paths(&file_io, &dir_path, "warehouse/");
}
}
}

#[cfg(test)]
mod input_output_test {
use super::*;
Expand Down
29 changes: 29 additions & 0 deletions crates/paimon/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ pub use file_io::*;
mod storage;
pub use storage::*;

#[cfg(any(
feature = "storage-s3",
feature = "storage-cos",
feature = "storage-azdls",
feature = "storage-obs",
feature = "storage-gcs"
))]
mod storage_config;

#[cfg(feature = "storage-fs")]
mod storage_fs;
#[cfg(feature = "storage-fs")]
Expand All @@ -41,6 +50,26 @@ mod storage_s3;
#[cfg(feature = "storage-s3")]
use storage_s3::*;

#[cfg(feature = "storage-cos")]
mod storage_cos;
#[cfg(feature = "storage-cos")]
use storage_cos::*;

#[cfg(feature = "storage-azdls")]
mod storage_azdls;
#[cfg(feature = "storage-azdls")]
use storage_azdls::*;

#[cfg(feature = "storage-obs")]
mod storage_obs;
#[cfg(feature = "storage-obs")]
use storage_obs::*;

#[cfg(feature = "storage-gcs")]
mod storage_gcs;
#[cfg(feature = "storage-gcs")]
use storage_gcs::*;

#[cfg(feature = "storage-hdfs")]
mod storage_hdfs;
#[cfg(feature = "storage-hdfs")]
Expand Down
Loading
Loading