1818#include < cstring>
1919#include < memory>
2020#include < optional>
21+ #include < sstream>
2122#include < string>
2223#include < string_view>
24+ #include < vector>
2325
2426#include " common/cast_set.h"
2527#include " common/status.h"
3436#include " core/data_type/file_schema_descriptor.h"
3537#include " exprs/function/function.h"
3638#include " exprs/function/simple_function_factory.h"
39+ #include " io/fs/obj_storage_client.h"
3740#include " util/jsonb_writer.h"
41+ #include " util/s3_uri.h"
42+ #include " util/s3_util.h"
3843
3944namespace doris {
4045
@@ -84,8 +89,47 @@ class FunctionToFile : public IFunction {
8489 std::string content_type =
8590 S::extension_to_content_type (S::extract_file_extension (file_name));
8691
92+ // Ensure endpoint has http:// prefix for S3 SDK.
93+ std::string normalized_endpoint = _normalize_endpoint (endpoint);
94+ std::string region = _infer_region (normalized_endpoint);
95+
96+ // Validate the object exists via HEAD request and get actual size.
97+ S3ClientConf s3_conf;
98+ s3_conf.endpoint = normalized_endpoint;
99+ s3_conf.region = region;
100+ s3_conf.ak = ak;
101+ s3_conf.sk = sk;
102+ auto s3_client = S3ClientFactory::instance ().create (s3_conf);
103+ if (!s3_client) {
104+ return Status::InternalError (
105+ " to_file: failed to create S3 client for endpoint '{}'" , endpoint);
106+ }
107+ // Normalize oss:// etc. to s3:// for S3URI parser and storage.
108+ std::string normalized_uri = _normalize_uri_scheme (uri);
109+ S3URI s3_uri (normalized_uri);
110+ RETURN_IF_ERROR (s3_uri.parse ());
111+ auto head_resp = s3_client->head_object (
112+ {.bucket = s3_uri.get_bucket (), .key = s3_uri.get_key ()});
113+ if (head_resp.resp .status .code != 0 ) {
114+ return Status::InvalidArgument (" to_file: object '{}' is not accessible: {}" , uri,
115+ head_resp.resp .status .msg );
116+ }
117+ int64_t file_size = head_resp.file_size ;
118+
87119 writer.reset ();
88- _write_file_jsonb (writer, schema, uri, file_name, content_type, endpoint, ak, sk);
120+ FileMetadata metadata {
121+ .uri = normalized_uri,
122+ .file_name = file_name,
123+ .content_type = content_type,
124+ .size = file_size,
125+ .region = region,
126+ .endpoint = normalized_endpoint,
127+ .ak = ak,
128+ .sk = sk,
129+ .role_arn = {},
130+ .external_id = {},
131+ };
132+ S::write_file_jsonb (writer, metadata);
89133 jsonb_col.insert_data (writer.getOutput ()->getBuffer (), writer.getOutput ()->getSize ());
90134 }
91135 block.replace_by_position (result, std::move (result_col));
@@ -102,40 +146,55 @@ class FunctionToFile : public IFunction {
102146 return &assert_cast<const ColumnString&>(*holder);
103147 }
104148
105- static void _write_file_jsonb (JsonbWriter& writer, const FileSchemaDescriptor& schema,
106- const std::string& uri, const std::string& file_name,
107- const std::string& content_type,
108- const std::string& endpoint,
109- const std::string& ak, const std::string& sk) {
110- using S = FileSchemaDescriptor;
111- auto write_nullable_str = [&](S::Field field, const std::string& s) {
112- S::write_jsonb_key (writer, schema.field_name (field));
113- if (s.empty ()) {
114- writer.writeNull ();
115- } else {
116- S::write_jsonb_string (writer, s);
117- }
118- };
119-
120- writer.writeStartObject ();
121- S::write_jsonb_key (writer, schema.field_name (S::Field::URI));
122- S::write_jsonb_string (writer, uri);
123- S::write_jsonb_key (writer, schema.field_name (S::Field::FILE_NAME));
124- S::write_jsonb_string (writer, file_name);
125- S::write_jsonb_key (writer, schema.field_name (S::Field::CONTENT_TYPE));
126- S::write_jsonb_string (writer, content_type);
127- S::write_jsonb_key (writer, schema.field_name (S::Field::SIZE));
128- writer.writeInt64 (-1 );
129- write_nullable_str (S::Field::REGION, " " );
130- write_nullable_str (S::Field::ENDPOINT, endpoint);
131- write_nullable_str (S::Field::AK, ak);
132- write_nullable_str (S::Field::SK, sk);
133- // role_arn and external_id not used in to_file()
134- S::write_jsonb_key (writer, schema.field_name (S::Field::ROLE_ARN));
135- writer.writeNull ();
136- S::write_jsonb_key (writer, schema.field_name (S::Field::EXTERNAL_ID));
137- writer.writeNull ();
138- writer.writeEndObject ();
149+ // Ensure endpoint has http:// scheme prefix.
150+ static std::string _normalize_endpoint (const std::string& endpoint) {
151+ if (endpoint.substr (0 , 7 ) == " http://" || endpoint.substr (0 , 8 ) == " https://" ) {
152+ return endpoint;
153+ }
154+ return " http://" + endpoint;
155+ }
156+
157+ // Normalize oss:// or other S3-compatible schemes to s3:// for S3URI parser.
158+ static std::string _normalize_uri_scheme (const std::string& uri) {
159+ if (uri.substr (0 , 6 ) == " oss://" ) {
160+ return " s3://" + uri.substr (6 );
161+ }
162+ if (uri.substr (0 , 6 ) == " cos://" ) {
163+ return " s3://" + uri.substr (6 );
164+ }
165+ if (uri.substr (0 , 6 ) == " obs://" ) {
166+ return " s3://" + uri.substr (6 );
167+ }
168+ return uri;
169+ }
170+
171+ // Infer region from endpoint.
172+ // oss-cn-shanghai.aliyuncs.com → cn-shanghai
173+ // s3.us-east-1.amazonaws.com → us-east-1
174+ static std::string _infer_region (const std::string& endpoint) {
175+ // Strip scheme
176+ std::string host = endpoint;
177+ if (host.substr (0 , 8 ) == " https://" ) {
178+ host = host.substr (8 );
179+ } else if (host.substr (0 , 7 ) == " http://" ) {
180+ host = host.substr (7 );
181+ }
182+ // Split by '.'
183+ std::vector<std::string> parts;
184+ std::istringstream iss (host);
185+ std::string part;
186+ while (std::getline (iss, part, ' .' )) {
187+ parts.push_back (part);
188+ }
189+ if (parts.size () < 2 ) {
190+ return " us-east-1" ;
191+ }
192+ // OSS: oss-cn-shanghai.aliyuncs.com → first part starts with "oss-"
193+ if (parts[0 ].find (" oss-" ) == 0 ) {
194+ return parts[0 ].substr (4 ); // strip "oss-" prefix, e.g. "cn-shanghai"
195+ }
196+ // AWS S3: s3.us-east-1.amazonaws.com → second part
197+ return parts[1 ];
139198 }
140199};
141200
0 commit comments