2020#include < optional>
2121#include < string>
2222#include < string_view>
23+ #include < vector>
2324
2425#include " common/cast_set.h"
2526#include " common/status.h"
3435#include " core/data_type/file_schema_descriptor.h"
3536#include " exprs/function/function.h"
3637#include " exprs/function/simple_function_factory.h"
38+ #include " io/fs/obj_storage_client.h"
3739#include " util/jsonb_writer.h"
40+ #include " util/s3_uri.h"
41+ #include " util/s3_util.h"
3842
3943namespace doris {
4044
@@ -48,25 +52,27 @@ class FunctionToFile : public IFunction {
4852
4953 bool is_variadic () const override { return false ; }
5054
51- size_t get_number_of_arguments () const override { return 4 ; }
55+ size_t get_number_of_arguments () const override { return 5 ; }
5256
5357 DataTypePtr get_return_type_impl (const DataTypes& arguments) const override {
5458 return std::make_shared<DataTypeFile>();
5559 }
5660
5761 Status execute_impl (FunctionContext* context, Block& block, const ColumnNumbers& arguments,
5862 uint32_t result, size_t input_rows_count) const override {
59- DCHECK_EQ (arguments.size (), 4 );
63+ DCHECK_EQ (arguments.size (), 5 );
6064
61- ColumnPtr uri_holder, endpoint_holder, ak_holder, sk_holder;
65+ ColumnPtr uri_holder, region_holder, endpoint_holder, ak_holder, sk_holder;
6266 const ColumnString* uri_col =
6367 _unwrap_string_column (block.get_by_position (arguments[0 ]), uri_holder);
68+ const ColumnString* region_col =
69+ _unwrap_string_column (block.get_by_position (arguments[1 ]), region_holder);
6470 const ColumnString* endpoint_col =
65- _unwrap_string_column (block.get_by_position (arguments[1 ]), endpoint_holder);
71+ _unwrap_string_column (block.get_by_position (arguments[2 ]), endpoint_holder);
6672 const ColumnString* ak_col =
67- _unwrap_string_column (block.get_by_position (arguments[2 ]), ak_holder);
73+ _unwrap_string_column (block.get_by_position (arguments[3 ]), ak_holder);
6874 const ColumnString* sk_col =
69- _unwrap_string_column (block.get_by_position (arguments[3 ]), sk_holder);
75+ _unwrap_string_column (block.get_by_position (arguments[4 ]), sk_holder);
7076
7177 using S = FileSchemaDescriptor;
7278 const auto & schema = S::instance ();
@@ -77,15 +83,54 @@ class FunctionToFile : public IFunction {
7783
7884 for (size_t row = 0 ; row < input_rows_count; ++row) {
7985 std::string uri = uri_col->get_data_at (row).to_string ();
86+ std::string region = region_col->get_data_at (row).to_string ();
8087 std::string endpoint = endpoint_col->get_data_at (row).to_string ();
8188 std::string ak = ak_col->get_data_at (row).to_string ();
8289 std::string sk = sk_col->get_data_at (row).to_string ();
8390 std::string file_name = S::extract_file_name (uri);
8491 std::string content_type =
8592 S::extension_to_content_type (S::extract_file_extension (file_name));
8693
94+ // Ensure endpoint has http:// prefix for S3 SDK.
95+ std::string normalized_endpoint = _normalize_endpoint (endpoint);
96+
97+ // Validate the object exists via HEAD request and get actual size.
98+ S3ClientConf s3_conf;
99+ s3_conf.endpoint = normalized_endpoint;
100+ s3_conf.region = region;
101+ s3_conf.ak = ak;
102+ s3_conf.sk = sk;
103+ auto s3_client = S3ClientFactory::instance ().create (s3_conf);
104+ if (!s3_client) {
105+ return Status::InternalError (
106+ " to_file: failed to create S3 client for endpoint '{}'" , endpoint);
107+ }
108+ // Normalize oss:// etc. to s3:// for S3URI parser and storage.
109+ std::string normalized_uri = _normalize_uri_scheme (uri);
110+ S3URI s3_uri (normalized_uri);
111+ RETURN_IF_ERROR (s3_uri.parse ());
112+ auto head_resp = s3_client->head_object (
113+ {.bucket = s3_uri.get_bucket (), .key = s3_uri.get_key ()});
114+ if (head_resp.resp .status .code != 0 ) {
115+ return Status::InvalidArgument (" to_file: object '{}' is not accessible: {}" , uri,
116+ head_resp.resp .status .msg );
117+ }
118+ int64_t file_size = head_resp.file_size ;
119+
87120 writer.reset ();
88- _write_file_jsonb (writer, schema, uri, file_name, content_type, endpoint, ak, sk);
121+ FileMetadata metadata {
122+ .uri = normalized_uri,
123+ .file_name = file_name,
124+ .content_type = content_type,
125+ .size = file_size,
126+ .region = region,
127+ .endpoint = normalized_endpoint,
128+ .ak = ak,
129+ .sk = sk,
130+ .role_arn = {},
131+ .external_id = {},
132+ };
133+ S::write_file_jsonb (writer, metadata);
89134 jsonb_col.insert_data (writer.getOutput ()->getBuffer (), writer.getOutput ()->getSize ());
90135 }
91136 block.replace_by_position (result, std::move (result_col));
@@ -102,40 +147,26 @@ class FunctionToFile : public IFunction {
102147 return &assert_cast<const ColumnString&>(*holder);
103148 }
104149
105- static void _write_file_jsonb (JsonbWriter& writer, const FileSchemaDescriptor& schema,
106- const std::string& uri, const std::string& file_name,
107- const std::string& content_type,
108- const std::string& endpoint,
109- const std::string& ak, const std::string& sk) {
110- using S = FileSchemaDescriptor;
111- auto write_nullable_str = [&](S::Field field, const std::string& s) {
112- S::write_jsonb_key (writer, schema.field_name (field));
113- if (s.empty ()) {
114- writer.writeNull ();
115- } else {
116- S::write_jsonb_string (writer, s);
117- }
118- };
119-
120- writer.writeStartObject ();
121- S::write_jsonb_key (writer, schema.field_name (S::Field::URI));
122- S::write_jsonb_string (writer, uri);
123- S::write_jsonb_key (writer, schema.field_name (S::Field::FILE_NAME));
124- S::write_jsonb_string (writer, file_name);
125- S::write_jsonb_key (writer, schema.field_name (S::Field::CONTENT_TYPE));
126- S::write_jsonb_string (writer, content_type);
127- S::write_jsonb_key (writer, schema.field_name (S::Field::SIZE));
128- writer.writeInt64 (-1 );
129- write_nullable_str (S::Field::REGION, " " );
130- write_nullable_str (S::Field::ENDPOINT, endpoint);
131- write_nullable_str (S::Field::AK, ak);
132- write_nullable_str (S::Field::SK, sk);
133- // role_arn and external_id not used in to_file()
134- S::write_jsonb_key (writer, schema.field_name (S::Field::ROLE_ARN));
135- writer.writeNull ();
136- S::write_jsonb_key (writer, schema.field_name (S::Field::EXTERNAL_ID));
137- writer.writeNull ();
138- writer.writeEndObject ();
150+ // Ensure endpoint has http:// scheme prefix.
151+ static std::string _normalize_endpoint (const std::string& endpoint) {
152+ if (endpoint.substr (0 , 7 ) == " http://" || endpoint.substr (0 , 8 ) == " https://" ) {
153+ return endpoint;
154+ }
155+ return " http://" + endpoint;
156+ }
157+
158+ // Normalize oss:// etc. to s3:// for S3URI parser and storage.
159+ static std::string _normalize_uri_scheme (const std::string& uri) {
160+ if (uri.substr (0 , 6 ) == " oss://" ) {
161+ return " s3://" + uri.substr (6 );
162+ }
163+ if (uri.substr (0 , 6 ) == " cos://" ) {
164+ return " s3://" + uri.substr (6 );
165+ }
166+ if (uri.substr (0 , 6 ) == " obs://" ) {
167+ return " s3://" + uri.substr (6 );
168+ }
169+ return uri;
139170 }
140171};
141172
0 commit comments