diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 09347d6d7dc2c..1b40d93e750b4 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -421,14 +421,18 @@ async fn create_plan( if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { // To support custom formats, treat error as None let format = config_file_type_from_str(&cmd.file_type); - register_object_store_and_config_extensions( - ctx, - &cmd.location, - &cmd.options, - format, - resolve_region, - ) - .await?; + // A table may reference more than one location; register the object + // store for each of them. + for location in &cmd.locations { + register_object_store_and_config_extensions( + ctx, + location, + &cmd.options, + format.clone(), + resolve_region, + ) + .await?; + } } if let LogicalPlan::Copy(copy_to) = &mut plan { @@ -531,14 +535,16 @@ mod tests { if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan { let format = config_file_type_from_str(&cmd.file_type); - register_object_store_and_config_extensions( - &ctx, - &cmd.location, - &cmd.options, - format, - false, - ) - .await?; + for location in &cmd.locations { + register_object_store_and_config_extensions( + &ctx, + location, + &cmd.options, + format.clone(), + false, + ) + .await?; + } } else { return plan_err!("LogicalPlan is not a CreateExternalTable"); } diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index 8501ea65902e2..c8060456dd2a7 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -53,7 +53,15 @@ impl TableProviderFactory for StreamTableFactory { cmd: &CreateExternalTable, ) -> Result> { let schema: SchemaRef = Arc::clone(cmd.schema.inner()); - let location = cmd.location.clone(); + let location = match cmd.locations.as_slice() { + [single] => single.clone(), + _ => { + return config_err!( + "Stream tables support exactly one location; \ + use a listing table to read multiple files" + ); + } + }; let encoding = cmd.file_type.parse()?; let header = if let Ok(opt) = cmd .options diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index a1eb7ffb64b7d..03cd22c689a2b 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -27,9 +27,11 @@ use crate::datasource::listing::{ }; use crate::execution::context::SessionState; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, SchemaRef}; use datafusion_common::{Result, config_datafusion_err}; -use datafusion_common::{ToDFSchema, arrow_datafusion_err, plan_err}; +use datafusion_common::{ + ToDFSchema, arrow_datafusion_err, internal_datafusion_err, plan_err, +}; use datafusion_expr::CreateExternalTable; use async_trait::async_trait; @@ -71,20 +73,58 @@ impl TableProviderFactory for ListingTableFactory { ))? .create(session_state, &cmd.options)?; - let mut table_path = - ListingTableUrl::parse(&cmd.location)?.with_table_ref(cmd.name.clone()); - let file_extension = match table_path.is_collection() { - // Setting the extension to be empty instead of allowing the default extension seems - // odd, but was done to ensure existing behavior isn't modified. It seems like this - // could be refactored to either use the default extension or set the fully expected - // extension when compression is included (e.g. ".csv.gz") - true => "", - false => &get_extension(cmd.location.as_str()), + let table_paths = cmd + .locations + .iter() + .map(|location| { + Ok(ListingTableUrl::parse(location)?.with_table_ref(cmd.name.clone())) + }) + .collect::>>()?; + let Some(first_path) = table_paths.first() else { + return plan_err!("CREATE EXTERNAL TABLE requires at least one location"); + }; + + // `ListingTable` resolves a single object store (from the first location) + // and scans every location with it, so locations spanning different + // object stores would silently read the wrong data. Reading across + // object stores is intentionally not supported (see + // https://github.com/apache/datafusion/issues/16303); reject it here with + // a clear error rather than producing incorrect results at scan time. + let object_store_url = first_path.object_store(); + if let Some(other) = table_paths + .iter() + .find(|path| path.object_store() != object_store_url) + { + return plan_err!( + "All locations of a CREATE EXTERNAL TABLE must be on the same \ + object store, but found '{}' and '{}'", + object_store_url.as_str(), + other.object_store().as_str() + ); + } + + // With a single location the historical extension handling is kept. With + // more than one location the files may have different extensions, so the + // extension filter is left empty and the explicit paths/globs are used + // as provided. + let file_extension = if table_paths.len() == 1 { + match first_path.is_collection() { + // Setting the extension to be empty instead of allowing the default extension seems + // odd, but was done to ensure existing behavior isn't modified. It seems like this + // could be refactored to either use the default extension or set the fully expected + // extension when compression is included (e.g. ".csv.gz") + true => String::new(), + false => get_extension(&cmd.locations[0]), + } + } else { + String::new() }; let mut options = ListingOptions::new(file_format) .with_session_config_options(session_state.config()) .with_file_extension(file_extension); + // Partition columns are derived from the first location; all locations + // are expected to share the same partitioning. let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() { let infer_parts = session_state .config_options() @@ -92,7 +132,7 @@ impl TableProviderFactory for ListingTableFactory { .listing_table_factory_infer_partitions; let part_cols = if cmd.table_partition_cols.is_empty() && infer_parts { options - .infer_partitions(session_state, &table_path) + .infer_partitions(session_state, first_path) .await? .into_iter() } else { @@ -142,36 +182,75 @@ impl TableProviderFactory for ListingTableFactory { options = options.with_table_partition_cols(table_partition_cols); - options - .validate_partitions(session_state, &table_path) - .await?; + // Validate partitions against every location before any glob rewriting. + for table_path in &table_paths { + options + .validate_partitions(session_state, table_path) + .await?; + } - let resolved_schema = match provided_schema { + let (resolved_table_paths, resolved_schema) = match provided_schema { // We will need to check the table columns against the schema // this is done so that we can do an ORDER BY for external table creation // specifically for parquet file format. // See: https://github.com/apache/datafusion/issues/7317 None => { - // if the folder then rewrite a file path as 'path/*.parquet' - // to only read the files the reader can understand - if table_path.is_folder() && table_path.get_glob().is_none() { - // Since there are no files yet to infer an actual extension, - // derive the pattern based on compression type. - // So for gzipped CSV the pattern is `*.csv.gz` - let glob = match options.format.compression_type() { - Some(compression) => { - match options.format.get_ext_with_compression(&compression) { - // Use glob based on `FileFormat` extension - Ok(ext) => format!("*.{ext}"), - // Fallback to `file_type`, if not supported by `FileFormat` - Err(_) => format!("*.{}", cmd.file_type.to_lowercase()), + let mut resolved_paths = Vec::with_capacity(table_paths.len()); + let mut inferred_schema: Option<(String, SchemaRef)> = None; + for mut table_path in table_paths { + // if the folder then rewrite a file path as 'path/*.parquet' + // to only read the files the reader can understand + if table_path.is_folder() && table_path.get_glob().is_none() { + // Since there are no files yet to infer an actual extension, + // derive the pattern based on compression type. + // So for gzipped CSV the pattern is `*.csv.gz` + let glob = match options.format.compression_type() { + Some(compression) => { + match options + .format + .get_ext_with_compression(&compression) + { + // Use glob based on `FileFormat` extension + Ok(ext) => format!("*.{ext}"), + // Fallback to `file_type`, if not supported by `FileFormat` + Err(_) => { + format!("*.{}", cmd.file_type.to_lowercase()) + } + } } + None => format!("*.{}", cmd.file_type.to_lowercase()), + }; + table_path = table_path.with_glob(glob.as_ref())?; + } + let schema = options.infer_schema(session_state, &table_path).await?; + // All locations must resolve to the same fields. Schema + // and field metadata may differ between files without + // changing the fields read by the table. + let location = table_path.to_string(); + match &inferred_schema { + None => inferred_schema = Some((location, schema)), + Some((existing_location, existing)) + if !schemas_have_same_fields(existing, &schema) => + { + return plan_err!( + "All locations of a CREATE EXTERNAL TABLE must have the \ + same schema, but schema inferred from '{}' differs from \ + schema inferred from '{}'", + location, + existing_location + ); } - None => format!("*.{}", cmd.file_type.to_lowercase()), - }; - table_path = table_path.with_glob(glob.as_ref())?; + Some(_) => {} + } + resolved_paths.push(table_path); } - let schema = options.infer_schema(session_state, &table_path).await?; + // `table_paths` was guaranteed non-empty above, so the loop ran + // at least once and `inferred_schema` is always `Some` here. + let (_, schema) = inferred_schema.ok_or_else(|| { + internal_datafusion_err!( + "no schema could be inferred from the provided locations" + ) + })?; let df_schema = Arc::clone(&schema).to_dfschema()?; let column_refs: HashSet<_> = cmd .order_exprs @@ -186,11 +265,11 @@ impl TableProviderFactory for ListingTableFactory { } } - schema + (resolved_paths, schema) } - Some(s) => s, + Some(s) => (table_paths, s), }; - let config = ListingTableConfig::new(table_path) + let config = ListingTableConfig::new_with_multi_paths(resolved_table_paths) .with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone())) .with_schema(resolved_schema); let provider = ListingTable::try_new(config)? @@ -222,6 +301,19 @@ fn get_extension(path: &str) -> String { } } +fn schemas_have_same_fields(left: &SchemaRef, right: &SchemaRef) -> bool { + left.fields().len() == right.fields().len() + && left + .fields() + .iter() + .zip(right.fields()) + .all(|(left, right)| { + left.name() == right.name() + && left.data_type() == right.data_type() + && left.is_nullable() == right.is_nullable() + }) +} + #[cfg(test)] mod tests { use super::*; @@ -229,6 +321,7 @@ mod tests { datasource::file_format::csv::CsvFormat, execution::context::SessionContext, test_util::parquet_test_data, }; + use arrow::datatypes::{Field, Schema}; use datafusion_execution::cache::CacheAccessor; use datafusion_execution::cache::cache_manager::CacheManagerConfig; use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache; @@ -238,12 +331,48 @@ mod tests { use std::collections::HashMap; use std::fs; use std::fs::File; - use std::path::PathBuf; + use std::path::{Path, PathBuf}; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{DFSchema, TableReference}; use datafusion_expr::registry::ExtensionTypeRegistryRef; + fn factory_and_state() -> (ListingTableFactory, SessionState) { + let factory = ListingTableFactory::new(); + let context = SessionContext::new(); + let state = context.state(); + (factory, state) + } + + fn write_csv(path: &Path, contents: &str) { + fs::write(path, contents).unwrap(); + } + + fn csv_cmd_with_locations(paths: &[&Path]) -> CreateExternalTable { + let locations = paths + .iter() + .map(|path| path.to_str().unwrap().to_string()) + .collect::>(); + + CreateExternalTable::builder( + TableReference::bare("foo"), + locations[0].clone(), + "csv", + Arc::new(DFSchema::empty()), + ) + .with_locations(locations) + .with_options(HashMap::from([("format.has_header".into(), "true".into())])) + .build() + } + + fn assert_error_contains(error: impl std::fmt::Display, expected: &str) { + let error = error.to_string(); + assert!( + error.contains(expected), + "expected error to contain '{expected}', got: {error}" + ); + } + #[tokio::test] async fn test_create_using_non_std_file_ext() { let csv_file = tempfile::Builder::new() @@ -474,6 +603,115 @@ mod tests { assert!(listing_options.table_partition_cols.is_empty()); } + #[tokio::test] + async fn test_create_with_multiple_locations() { + let dir = tempfile::tempdir().unwrap(); + let file_a = dir.path().join("file_a.csv"); + let file_b = dir.path().join("file_b.csv"); + write_csv(&file_a, "c1,c2\n1,a\n2,b\n"); + write_csv(&file_b, "c1,c2\n3,c\n"); + + let (factory, state) = factory_and_state(); + let cmd = csv_cmd_with_locations(&[&file_a, &file_b]); + + let table_provider = factory.create(&state, &cmd).await.unwrap(); + let listing_table = table_provider.downcast_ref::().unwrap(); + + // Both locations are registered as table paths + assert_eq!(2, listing_table.table_paths().len()); + + // Schema is inferred from the files and shared across both locations + let field_names: Vec<_> = listing_table + .schema() + .fields() + .iter() + .map(|f| f.name().clone()) + .collect(); + assert_eq!(field_names, vec!["c1".to_string(), "c2".to_string()]); + } + + #[tokio::test] + async fn test_create_with_multiple_locations_mismatched_schema_errors() { + let dir = tempfile::tempdir().unwrap(); + let file_a = dir.path().join("file_a.csv"); + let file_b = dir.path().join("file_b.csv"); + write_csv(&file_a, "c1,c2\n1,a\n"); + // Different column names -> different inferred schema + write_csv(&file_b, "x1,x2\n1,a\n"); + + let (factory, state) = factory_and_state(); + let cmd = csv_cmd_with_locations(&[&file_a, &file_b]); + let err = factory.create(&state, &cmd).await.unwrap_err(); + assert_error_contains(err, "same schema"); + } + + #[test] + fn test_schema_comparison_ignores_schema_metadata() { + let fields = + vec![ + Field::new("c1", DataType::Int32, true).with_metadata(HashMap::from([( + "field_source".to_string(), + "a".to_string(), + )])), + ]; + let schema_a = Arc::new(Schema::new_with_metadata( + fields.clone(), + HashMap::from([("source".to_string(), "a".to_string())]), + )); + let schema_b = + Arc::new(Schema::new_with_metadata( + vec![Field::new("c1", DataType::Int32, true).with_metadata( + HashMap::from([("field_source".to_string(), "b".to_string())]), + )], + HashMap::from([("source".to_string(), "b".to_string())]), + )); + let schema_c = + Arc::new(Schema::new(vec![Field::new("c2", DataType::Int32, true)])); + + assert_ne!(schema_a, schema_b); + assert!(schemas_have_same_fields(&schema_a, &schema_b)); + assert!(!schemas_have_same_fields(&schema_a, &schema_c)); + } + + #[tokio::test] + async fn test_create_with_no_locations_errors() { + let (factory, state) = factory_and_state(); + + let cmd = CreateExternalTable::builder( + TableReference::bare("foo"), + "unused", + "csv", + Arc::new(DFSchema::empty()), + ) + .with_locations(vec![]) + .build(); + + let err = factory.create(&state, &cmd).await.unwrap_err(); + assert_error_contains(err, "at least one location"); + } + + #[tokio::test] + async fn test_create_with_locations_on_different_stores_errors() { + let (factory, state) = factory_and_state(); + + // Two locations on different object stores (different buckets) are not + // supported: ListingTable would scan both against the first store. + let cmd = CreateExternalTable::builder( + TableReference::bare("foo"), + "s3://bucket_a/file.parquet", + "parquet", + Arc::new(DFSchema::empty()), + ) + .with_locations(vec![ + "s3://bucket_a/file.parquet".to_string(), + "s3://bucket_b/file.parquet".to_string(), + ]) + .build(); + + let err = factory.create(&state, &cmd).await.unwrap_err(); + assert_error_contains(err, "same object store"); + } + #[tokio::test] async fn test_statistics_cache_prewarming() { let factory = ListingTableFactory::new(); diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index aad659eacbe55..d70c0d186d007 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -45,7 +45,7 @@ use crate::execution::{SendableRecordBatchStream, SessionState, SessionStateBuil use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_catalog::Session; -use datafusion_common::{DFSchemaRef, TableReference}; +use datafusion_common::{DFSchemaRef, TableReference, plan_err}; use datafusion_expr::{ CreateExternalTable, Expr, LogicalPlan, SortExpr, TableType, UserDefinedLogicalNodeCore, @@ -187,8 +187,12 @@ impl TableProviderFactory for TestTableFactory { _: &dyn Session, cmd: &CreateExternalTable, ) -> Result> { + let Some(location) = cmd.locations.first() else { + return plan_err!("TestTableFactory requires at least one location"); + }; + Ok(Arc::new(TestTableProvider { - url: cmd.location.to_string(), + url: location.clone(), schema: Arc::clone(cmd.schema.inner()), })) } diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 5779fb0c4ea5b..bd2933c81acf6 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -211,8 +211,12 @@ pub struct CreateExternalTable { pub schema: DFSchemaRef, /// The table name pub name: TableReference, - /// The physical location - pub location: String, + /// The physical locations of the table files. + /// + /// More than one location may be supplied (for example + /// `CREATE EXTERNAL TABLE ... LOCATION ('a.parquet', 'b.parquet')`), in which + /// case the files are read together as a single table. + pub locations: Vec, /// The file type of physical file pub file_type: String, /// Partition Columns @@ -266,7 +270,7 @@ impl CreateExternalTable { ) -> CreateExternalTableBuilder { CreateExternalTableBuilder { name: name.into(), - location: location.into(), + locations: vec![location.into()], file_type: file_type.into(), schema, table_partition_cols: vec![], @@ -289,7 +293,7 @@ impl CreateExternalTable { #[derive(Debug, Clone)] pub struct CreateExternalTableBuilder { name: TableReference, - location: String, + locations: Vec, file_type: String, schema: DFSchemaRef, table_partition_cols: Vec, @@ -311,6 +315,16 @@ impl CreateExternalTableBuilder { self } + /// Set the physical locations of the table files, replacing the single + /// location supplied to [`CreateExternalTable::builder`]. + /// + /// When more than one location is provided the files are read together as + /// a single table. + pub fn with_locations(mut self, locations: Vec) -> Self { + self.locations = locations; + self + } + /// Set the if_not_exists flag pub fn with_if_not_exists(mut self, if_not_exists: bool) -> Self { self.if_not_exists = if_not_exists; @@ -373,7 +387,7 @@ impl CreateExternalTableBuilder { CreateExternalTable { schema: self.schema, name: self.name, - location: self.location, + locations: self.locations, file_type: self.file_type, table_partition_cols: self.table_partition_cols, if_not_exists: self.if_not_exists, @@ -394,7 +408,7 @@ impl Hash for CreateExternalTable { fn hash(&self, state: &mut H) { self.schema.hash(state); self.name.hash(state); - self.location.hash(state); + self.locations.hash(state); self.file_type.hash(state); self.table_partition_cols.hash(state); self.if_not_exists.hash(state); @@ -413,8 +427,8 @@ impl PartialOrd for CreateExternalTable { struct ComparableCreateExternalTable<'a> { /// The table name pub name: &'a TableReference, - /// The physical location - pub location: &'a String, + /// The physical locations + pub locations: &'a Vec, /// The file type of physical file pub file_type: &'a String, /// Partition Columns @@ -432,7 +446,7 @@ impl PartialOrd for CreateExternalTable { } let comparable_self = ComparableCreateExternalTable { name: &self.name, - location: &self.location, + locations: &self.locations, file_type: &self.file_type, table_partition_cols: &self.table_partition_cols, if_not_exists: &self.if_not_exists, @@ -443,7 +457,7 @@ impl PartialOrd for CreateExternalTable { }; let comparable_other = ComparableCreateExternalTable { name: &other.name, - location: &other.location, + locations: &other.locations, file_type: &other.file_type, table_partition_cols: &other.table_partition_cols, if_not_exists: &other.if_not_exists, diff --git a/datafusion/ffi/src/table_provider_factory.rs b/datafusion/ffi/src/table_provider_factory.rs index 3ce8841614bc0..74db74891c9f5 100644 --- a/datafusion/ffi/src/table_provider_factory.rs +++ b/datafusion/ffi/src/table_provider_factory.rs @@ -368,7 +368,7 @@ mod tests { let cmd = CreateExternalTable { schema: Schema::empty().to_dfschema_ref()?, name: TableReference::bare("test_table"), - location: "test".to_string(), + locations: vec!["test".to_string()], file_type: "test".to_string(), table_partition_cols: vec![], if_not_exists: false, @@ -406,7 +406,7 @@ mod tests { let cmd = CreateExternalTable { schema: Schema::empty().to_dfschema_ref()?, name: TableReference::bare("cloned_test"), - location: "test".to_string(), + locations: vec!["test".to_string()], file_type: "test".to_string(), table_partition_cols: vec![], if_not_exists: false, diff --git a/datafusion/ffi/tests/ffi_integration.rs b/datafusion/ffi/tests/ffi_integration.rs index 6a6b6b3100cdb..f1edc447309a6 100644 --- a/datafusion/ffi/tests/ffi_integration.rs +++ b/datafusion/ffi/tests/ffi_integration.rs @@ -100,7 +100,7 @@ mod tests { let cmd = CreateExternalTable { schema: Schema::empty().to_dfschema_ref()?, name: TableReference::bare("cloned_test"), - location: "test".to_string(), + locations: vec!["test".to_string()], file_type: "test".to_string(), table_partition_cols: vec![], if_not_exists: false, diff --git a/datafusion/proto-models/proto/datafusion.proto b/datafusion/proto-models/proto/datafusion.proto index ea6d078366625..ac85861d2ae00 100644 --- a/datafusion/proto-models/proto/datafusion.proto +++ b/datafusion/proto-models/proto/datafusion.proto @@ -163,7 +163,8 @@ message EmptyRelationNode { message CreateExternalTableNode { reserved 1; // was string name TableReference name = 9; - string location = 2; + string location = 2; // deprecated; use repeated locations + repeated string locations = 16; string file_type = 3; datafusion_common.DfSchema schema = 4; repeated string table_partition_cols = 5; diff --git a/datafusion/proto-models/src/generated/pbjson.rs b/datafusion/proto-models/src/generated/pbjson.rs index 8e6997757f110..079cbe59b6629 100644 --- a/datafusion/proto-models/src/generated/pbjson.rs +++ b/datafusion/proto-models/src/generated/pbjson.rs @@ -3561,6 +3561,9 @@ impl serde::Serialize for CreateExternalTableNode { if !self.location.is_empty() { len += 1; } + if !self.locations.is_empty() { + len += 1; + } if !self.file_type.is_empty() { len += 1; } @@ -3604,6 +3607,9 @@ impl serde::Serialize for CreateExternalTableNode { if !self.location.is_empty() { struct_ser.serialize_field("location", &self.location)?; } + if !self.locations.is_empty() { + struct_ser.serialize_field("locations", &self.locations)?; + } if !self.file_type.is_empty() { struct_ser.serialize_field("fileType", &self.file_type)?; } @@ -3652,6 +3658,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { const FIELDS: &[&str] = &[ "name", "location", + "locations", "file_type", "fileType", "schema", @@ -3676,6 +3683,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { enum GeneratedField { Name, Location, + Locations, FileType, Schema, TablePartitionCols, @@ -3711,6 +3719,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { match value { "name" => Ok(GeneratedField::Name), "location" => Ok(GeneratedField::Location), + "locations" => Ok(GeneratedField::Locations), "fileType" | "file_type" => Ok(GeneratedField::FileType), "schema" => Ok(GeneratedField::Schema), "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols), @@ -3744,6 +3753,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { { let mut name__ = None; let mut location__ = None; + let mut locations__ = None; let mut file_type__ = None; let mut schema__ = None; let mut table_partition_cols__ = None; @@ -3770,6 +3780,12 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { } location__ = Some(map_.next_value()?); } + GeneratedField::Locations => { + if locations__.is_some() { + return Err(serde::de::Error::duplicate_field("locations")); + } + locations__ = Some(map_.next_value()?); + } GeneratedField::FileType => { if file_type__.is_some() { return Err(serde::de::Error::duplicate_field("fileType")); @@ -3851,6 +3867,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { Ok(CreateExternalTableNode { name: name__, location: location__.unwrap_or_default(), + locations: locations__.unwrap_or_default(), file_type: file_type__.unwrap_or_default(), schema: schema__, table_partition_cols: table_partition_cols__.unwrap_or_default(), diff --git a/datafusion/proto-models/src/generated/prost.rs b/datafusion/proto-models/src/generated/prost.rs index d8187e65a501e..d30033362fb66 100644 --- a/datafusion/proto-models/src/generated/prost.rs +++ b/datafusion/proto-models/src/generated/prost.rs @@ -243,8 +243,11 @@ pub struct EmptyRelationNode { pub struct CreateExternalTableNode { #[prost(message, optional, tag = "9")] pub name: ::core::option::Option, + /// deprecated; use repeated locations #[prost(string, tag = "2")] pub location: ::prost::alloc::string::String, + #[prost(string, repeated, tag = "16")] + pub locations: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, #[prost(string, tag = "3")] pub file_type: ::prost::alloc::string::String, #[prost(message, optional, tag = "4")] diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 542cae890d693..469bf3eeb6979 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -719,16 +719,28 @@ impl AsLogicalPlan for LogicalPlanNode { column_defaults.insert(col_name.clone(), expr); } + let locations = if !create_extern_table.locations.is_empty() { + create_extern_table.locations.clone() + } else if !create_extern_table.location.is_empty() { + vec![create_extern_table.location.clone()] + } else { + return Err(proto_error( + "CreateExternalTableNode requires at least one location", + )); + }; + let location = locations[0].clone(); + Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable( CreateExternalTable::builder( from_table_reference( create_extern_table.name.as_ref(), "CreateExternalTable", )?, - create_extern_table.location.clone(), + location, create_extern_table.file_type.clone(), pb_schema.try_into()?, ) + .with_locations(locations) .with_partition_cols(create_extern_table.table_partition_cols.clone()) .with_order_exprs(order_exprs) .with_if_not_exists(create_extern_table.if_not_exists) @@ -1674,7 +1686,7 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::Ddl(DdlStatement::CreateExternalTable( CreateExternalTable { name, - location, + locations, file_type, schema: df_schema, table_partition_cols, @@ -1710,7 +1722,8 @@ impl AsLogicalPlan for LogicalPlanNode { name: Some(protobuf::TableReference::from_proto( name.clone(), )), - location: location.clone(), + location: locations.first().cloned().unwrap_or_default(), + locations: locations.clone(), file_type: file_type.clone(), schema: Some(df_schema.try_into()?), table_partition_cols: table_partition_cols.clone(), diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 1bcc6eeb67f12..c14610563074a 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -71,8 +71,8 @@ use datafusion_common::config::TableOptions; use datafusion_common::format::ExplainFormat; use datafusion_common::scalar::ScalarStructBuilder; use datafusion_common::{ - DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, TableReference, - internal_datafusion_err, internal_err, not_impl_err, plan_err, + Constraints, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, + TableReference, internal_datafusion_err, internal_err, not_impl_err, plan_err, }; use datafusion_execution::TaskContext; use datafusion_expr::dml::CopyTo; @@ -107,7 +107,7 @@ use datafusion_proto::logical_plan::to_proto::serialize_expr; use datafusion_proto::logical_plan::{ DefaultLogicalExtensionCodec, LogicalExtensionCodec, from_proto, }; -use datafusion_proto::protobuf; +use datafusion_proto::{FromProto, protobuf}; use crate::cases::{MyAggregateUDF, MyAggregateUdfNode, MyRegexUdf, MyRegexUdfNode}; @@ -328,6 +328,93 @@ async fn roundtrip_custom_listing_tables() -> Result<()> { Ok(()) } +#[tokio::test] +async fn roundtrip_create_external_table_multiple_locations() -> Result<()> { + let ctx = SessionContext::new(); + + // Planning a CREATE EXTERNAL TABLE does not read the referenced files, so + // the paths need not exist. Multiple locations must survive the round-trip + // through the `repeated locations` proto field. + let query = "CREATE EXTERNAL TABLE t (a INTEGER, b INTEGER) + STORED AS CSV + LOCATION ('file_a.csv', 'file_b.csv') + OPTIONS ('format.has_header' 'true')"; + + let plan = ctx.state().create_logical_plan(query).await?; + let bytes = logical_plan_to_bytes(&plan)?; + let protobuf_plan = protobuf::LogicalPlanNode::decode(bytes.as_ref()) + .expect("failed to decode CreateExternalTable proto"); + let Some(protobuf::logical_plan_node::LogicalPlanType::CreateExternalTable( + create_external_table, + )) = protobuf_plan.logical_plan_type + else { + panic!("expected a CreateExternalTable proto"); + }; + assert_eq!(create_external_table.location, "file_a.csv"); + assert_eq!( + create_external_table.locations, + vec!["file_a.csv".to_string(), "file_b.csv".to_string()] + ); + + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?; + assert_eq!(plan, logical_round_trip); + + let LogicalPlan::Ddl(datafusion_expr::DdlStatement::CreateExternalTable(rt)) = + logical_round_trip + else { + panic!("expected a CreateExternalTable plan"); + }; + assert_eq!( + rt.locations, + vec!["file_a.csv".to_string(), "file_b.csv".to_string()] + ); + + Ok(()) +} + +#[tokio::test] +async fn roundtrip_create_external_table_legacy_location() -> Result<()> { + let ctx = SessionContext::new(); + let schema = DFSchema::empty(); + let create_external_table = protobuf::CreateExternalTableNode { + name: Some(protobuf::TableReference::from_proto(TableReference::bare( + "t", + ))), + location: "legacy.csv".to_string(), + locations: vec![], + file_type: "CSV".to_string(), + schema: Some((&schema).try_into()?), + table_partition_cols: vec![], + if_not_exists: false, + or_replace: false, + temporary: false, + definition: String::new(), + order_exprs: vec![], + unbounded: false, + options: HashMap::new(), + constraints: Some(Constraints::default().into()), + column_defaults: HashMap::new(), + }; + let protobuf_plan = protobuf::LogicalPlanNode { + logical_plan_type: Some( + protobuf::logical_plan_node::LogicalPlanType::CreateExternalTable( + create_external_table, + ), + ), + }; + let bytes = protobuf_plan.encode_to_vec(); + + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?; + let LogicalPlan::Ddl(datafusion_expr::DdlStatement::CreateExternalTable(rt)) = + logical_round_trip + else { + panic!("expected a CreateExternalTable plan"); + }; + assert_eq!(rt.locations, vec!["legacy.csv".to_string()]); + + Ok(()) +} + #[tokio::test] async fn roundtrip_logical_plan_aggregation_with_pk() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index ba37a2d7026a3..3e2b776533d20 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -197,7 +197,7 @@ pub(crate) type LexOrdering = Vec; /// [ PARTITIONED BY ( | ) ] /// [ WITH ORDER () /// [ OPTIONS () ] -/// LOCATION +/// LOCATION | LOCATION ([, ...]) /// /// := ( , ...) /// @@ -215,8 +215,10 @@ pub struct CreateExternalTable { pub columns: Vec, /// File type (Parquet, NDJSON, CSV, etc) pub file_type: String, - /// Path to file + /// First path to file, retained for backwards compatibility. pub location: String, + /// Paths to files + pub locations: Vec, /// Partition Columns pub table_partition_cols: Vec, /// Ordered expressions @@ -255,7 +257,22 @@ impl fmt::Display for CreateExternalTable { } write!(f, ") ")?; } - write!(f, "LOCATION {}", self.location) + if self.locations.len() > 1 { + write!(f, "LOCATION (")?; + for (idx, location) in self.locations.iter().enumerate() { + if idx > 0 { + write!(f, ", ")?; + } + write!(f, "{}", Value::SingleQuotedString(location.clone()))?; + } + write!(f, ")") + } else { + write!( + f, + "LOCATION {}", + Value::SingleQuotedString(self.location.clone()) + ) + } } } @@ -1020,7 +1037,7 @@ impl<'a> DFParser<'a> { #[derive(Default)] struct Builder { file_type: Option, - location: Option, + locations: Option>, table_partition_cols: Option>, order_exprs: Vec, options: Option>, @@ -1044,8 +1061,8 @@ impl<'a> DFParser<'a> { builder.file_type = Some(self.parse_file_format()?); } Keyword::LOCATION => { - ensure_not_set(&builder.location, "LOCATION")?; - builder.location = Some(self.parser.parse_literal_string()?); + ensure_not_set(&builder.locations, "LOCATION")?; + builder.locations = Some(self.parse_locations()?); } Keyword::WITH => { if self.parser.parse_keyword(Keyword::ORDER) { @@ -1121,17 +1138,23 @@ impl<'a> DFParser<'a> { "Missing STORED AS clause in CREATE EXTERNAL TABLE statement".into(), )); } - if builder.location.is_none() { + if builder.locations.is_none() { return sql_err!(ParserError::ParserError( "Missing LOCATION clause in CREATE EXTERNAL TABLE statement".into(), )); } + let locations = builder.locations.unwrap(); + let Some(location) = locations.first().cloned() else { + return parser_err!("LOCATION requires at least one path"); + }; + let create = CreateExternalTable { name: table_name, columns, file_type: builder.file_type.unwrap(), - location: builder.location.unwrap(), + location, + locations, table_partition_cols: builder.table_partition_cols.unwrap_or(vec![]), order_exprs: builder.order_exprs, if_not_exists, @@ -1144,6 +1167,29 @@ impl<'a> DFParser<'a> { Ok(Statement::CreateExternalTable(create)) } + /// Parses one or more external table locations. + fn parse_locations(&mut self) -> Result, DataFusionError> { + if !self.parser.consume_token(&Token::LParen) { + return Ok(vec![self.parser.parse_literal_string()?]); + } + + let mut locations = vec![]; + loop { + locations.push(self.parser.parse_literal_string()?); + let comma = self.parser.consume_token(&Token::Comma); + if self.parser.consume_token(&Token::RParen) { + // Allow a trailing comma, even though it's not in standard + break; + } else if !comma { + return self.expected( + "',' or ')' after location definition", + &self.parser.peek_token(), + ); + } + } + Ok(locations) + } + /// Parses the set of valid formats fn parse_file_format(&mut self) -> Result { let token = self.parser.next_token(); @@ -1232,17 +1278,28 @@ mod tests { } } - #[test] - fn create_external_table() -> Result<(), DataFusionError> { - // positive case - let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv'"; - let display = None; - let name = ObjectName::from(vec![Ident::from("t")]); - let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), - columns: vec![make_column_def("c1", DataType::Int(display))], + fn make_create_external_table(location: &str) -> CreateExternalTable { + make_create_external_table_with_locations(&[location]) + } + + fn make_create_external_table_with_locations( + locations: &[&str], + ) -> CreateExternalTable { + let locations = locations + .iter() + .map(|location| location.to_string()) + .collect::>(); + let location = locations + .first() + .cloned() + .expect("test CREATE EXTERNAL TABLE requires a location"); + + CreateExternalTable { + name: ObjectName::from(vec![Ident::from("t")]), + columns: vec![], file_type: "CSV".to_string(), - location: "foo.csv".into(), + location, + locations, table_partition_cols: vec![], order_exprs: vec![], if_not_exists: false, @@ -1251,24 +1308,59 @@ mod tests { unbounded: false, options: vec![], constraints: vec![], + } + } + + #[test] + fn create_external_table() -> Result<(), DataFusionError> { + // positive case + let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv'"; + let display = None; + let expected = Statement::CreateExternalTable(CreateExternalTable { + columns: vec![make_column_def("c1", DataType::Int(display))], + ..make_create_external_table("foo.csv") }); expect_parse_ok(sql, expected)?; + // positive case: literal comma remains part of a single path + let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo,bar.csv'"; + let expected = Statement::CreateExternalTable(CreateExternalTable { + columns: vec![make_column_def("c1", DataType::Int(display))], + ..make_create_external_table("foo,bar.csv") + }); + expect_parse_ok(sql, expected)?; + + // positive case: multiple locations use an explicit list + let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION ('foo.csv', 'bar.csv')"; + let expected = Statement::CreateExternalTable(CreateExternalTable { + columns: vec![make_column_def("c1", DataType::Int(display))], + ..make_create_external_table_with_locations(&["foo.csv", "bar.csv"]) + }); + expect_parse_ok(sql, expected)?; + + assert_eq!( + Statement::CreateExternalTable(make_create_external_table("foo.csv")) + .to_string(), + "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION 'foo.csv'" + ); + assert_eq!( + Statement::CreateExternalTable(make_create_external_table_with_locations(&[ + "foo.csv", "bar.csv" + ])) + .to_string(), + "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION ('foo.csv', 'bar.csv')" + ); + assert_eq!( + Statement::CreateExternalTable(make_create_external_table("foo'bar.csv")) + .to_string(), + "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION 'foo''bar.csv'" + ); + // positive case: leading space let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' "; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), columns: vec![make_column_def("c1", DataType::Int(None))], - file_type: "CSV".to_string(), - location: "foo.csv".into(), - table_partition_cols: vec![], - order_exprs: vec![], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.csv") }); expect_parse_ok(sql, expected)?; @@ -1276,18 +1368,8 @@ mod tests { let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' ;"; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), columns: vec![make_column_def("c1", DataType::Int(None))], - file_type: "CSV".to_string(), - location: "foo.csv".into(), - table_partition_cols: vec![], - order_exprs: vec![], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.csv") }); expect_parse_ok(sql, expected)?; @@ -1295,21 +1377,12 @@ mod tests { let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS (format.delimiter '|')"; let display = None; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), columns: vec![make_column_def("c1", DataType::Int(display))], - file_type: "CSV".to_string(), - location: "foo.csv".into(), - table_partition_cols: vec![], - order_exprs: vec![], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, options: vec![( "format.delimiter".into(), Value::SingleQuotedString("|".into()), )], - constraints: vec![], + ..make_create_external_table("foo.csv") }); expect_parse_ok(sql, expected)?; @@ -1317,18 +1390,9 @@ mod tests { let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1, p2) LOCATION 'foo.csv'"; let display = None; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), columns: vec![make_column_def("c1", DataType::Int(display))], - file_type: "CSV".to_string(), - location: "foo.csv".into(), table_partition_cols: vec!["p1".to_string(), "p2".to_string()], - order_exprs: vec![], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.csv") }); expect_parse_ok(sql, expected)?; @@ -1343,24 +1407,15 @@ mod tests { ('format.compression' 'XZ')", "XZ"), ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS ('format.compression' 'ZSTD')", "ZSTD"), - ]; + ]; for (sql, compression) in sqls { let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), columns: vec![make_column_def("c1", DataType::Int(display))], - file_type: "CSV".to_string(), - location: "foo.csv".into(), - table_partition_cols: vec![], - order_exprs: vec![], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, options: vec![( "format.compression".into(), Value::SingleQuotedString(compression.into()), )], - constraints: vec![], + ..make_create_external_table("foo.csv") }); expect_parse_ok(sql, expected)?; } @@ -1368,72 +1423,33 @@ mod tests { // positive case: it is ok for parquet files not to have columns specified let sql = "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet'"; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), - columns: vec![], file_type: "PARQUET".to_string(), - location: "foo.parquet".into(), - table_partition_cols: vec![], - order_exprs: vec![], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.parquet") }); expect_parse_ok(sql, expected)?; // positive case: it is ok for parquet files to be other than upper case let sql = "CREATE EXTERNAL TABLE t STORED AS parqueT LOCATION 'foo.parquet'"; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), - columns: vec![], file_type: "PARQUET".to_string(), - location: "foo.parquet".into(), - table_partition_cols: vec![], - order_exprs: vec![], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.parquet") }); expect_parse_ok(sql, expected)?; // positive case: it is ok for avro files not to have columns specified let sql = "CREATE EXTERNAL TABLE t STORED AS AVRO LOCATION 'foo.avro'"; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), - columns: vec![], file_type: "AVRO".to_string(), - location: "foo.avro".into(), - table_partition_cols: vec![], - order_exprs: vec![], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.avro") }); expect_parse_ok(sql, expected)?; // positive case: it is ok for avro files not to have columns specified let sql = "CREATE EXTERNAL TABLE IF NOT EXISTS t STORED AS PARQUET LOCATION 'foo.parquet'"; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), - columns: vec![], file_type: "PARQUET".to_string(), - location: "foo.parquet".into(), - table_partition_cols: vec![], - order_exprs: vec![], if_not_exists: true, - or_replace: false, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.parquet") }); expect_parse_ok(sql, expected)?; @@ -1441,39 +1457,21 @@ mod tests { let sql = "CREATE OR REPLACE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet'"; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), - columns: vec![], file_type: "PARQUET".to_string(), - location: "foo.parquet".into(), - table_partition_cols: vec![], - order_exprs: vec![], - if_not_exists: false, or_replace: true, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.parquet") }); expect_parse_ok(sql, expected)?; // positive case: column definition allowed in 'partition by' clause let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'"; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), columns: vec![ make_column_def("c1", DataType::Int(None)), make_column_def("p1", DataType::Int(None)), ], - file_type: "CSV".to_string(), - location: "foo.csv".into(), table_partition_cols: vec!["p1".to_string()], - order_exprs: vec![], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.csv") }); expect_parse_ok(sql, expected)?; @@ -1495,39 +1493,21 @@ mod tests { let sql = "CREATE EXTERNAL TABLE t STORED AS x OPTIONS ('k1' 'v1') LOCATION 'blahblah'"; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), - columns: vec![], file_type: "X".to_string(), - location: "blahblah".into(), - table_partition_cols: vec![], - order_exprs: vec![], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, options: vec![("k1".into(), Value::SingleQuotedString("v1".into()))], - constraints: vec![], + ..make_create_external_table("blahblah") }); expect_parse_ok(sql, expected)?; // positive case: additional options (multiple entries) can be specified let sql = "CREATE EXTERNAL TABLE t STORED AS x OPTIONS ('k1' 'v1', k2 v2) LOCATION 'blahblah'"; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), - columns: vec![], file_type: "X".to_string(), - location: "blahblah".into(), - table_partition_cols: vec![], - order_exprs: vec![], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, options: vec![ ("k1".into(), Value::SingleQuotedString("v1".into())), ("k2".into(), Value::SingleQuotedString("v2".into())), ], - constraints: vec![], + ..make_create_external_table("blahblah") }); expect_parse_ok(sql, expected)?; @@ -1556,11 +1536,7 @@ mod tests { ]; for (sql, (asc, nulls_first)) in sqls.iter().zip(expected) { let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), columns: vec![make_column_def("c1", DataType::Int(None))], - file_type: "CSV".to_string(), - location: "foo.csv".into(), - table_partition_cols: vec![], order_exprs: vec![vec![OrderByExpr { expr: Identifier(Ident { value: "c1".to_owned(), @@ -1570,12 +1546,7 @@ mod tests { options: OrderByOptions { asc, nulls_first }, with_fill: None, }]], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.csv") }); expect_parse_ok(sql, expected)?; } @@ -1584,14 +1555,10 @@ mod tests { let sql = "CREATE EXTERNAL TABLE t(c1 int, c2 int) STORED AS CSV WITH ORDER (c1 ASC, c2 DESC NULLS FIRST) LOCATION 'foo.csv'"; let display = None; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), columns: vec![ make_column_def("c1", DataType::Int(display)), make_column_def("c2", DataType::Int(display)), ], - file_type: "CSV".to_string(), - location: "foo.csv".into(), - table_partition_cols: vec![], order_exprs: vec![vec![ OrderByExpr { expr: Identifier(Ident { @@ -1618,12 +1585,7 @@ mod tests { with_fill: None, }, ]], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.csv") }); expect_parse_ok(sql, expected)?; @@ -1631,14 +1593,10 @@ mod tests { let sql = "CREATE EXTERNAL TABLE t(c1 int, c2 int) STORED AS CSV WITH ORDER (c1 - c2 ASC) LOCATION 'foo.csv'"; let display = None; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), columns: vec![ make_column_def("c1", DataType::Int(display)), make_column_def("c2", DataType::Int(display)), ], - file_type: "CSV".to_string(), - location: "foo.csv".into(), - table_partition_cols: vec![], order_exprs: vec![vec![OrderByExpr { expr: Expr::BinaryOp { left: Box::new(Identifier(Ident { @@ -1659,12 +1617,7 @@ mod tests { }, with_fill: None, }]], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.csv") }); expect_parse_ok(sql, expected)?; @@ -1681,13 +1634,11 @@ mod tests { 'TRUNCATE' 'NO', 'format.has_header' 'true')"; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), columns: vec![ make_column_def("c1", DataType::Int(None)), make_column_def("c2", DataType::Float(ExactNumberInfo::None)), ], file_type: "PARQUET".to_string(), - location: "foo.parquet".into(), table_partition_cols: vec!["c1".into()], order_exprs: vec![vec![OrderByExpr { expr: Expr::BinaryOp { @@ -1710,8 +1661,6 @@ mod tests { with_fill: None, }]], if_not_exists: true, - or_replace: false, - temporary: false, unbounded: true, options: vec![ ( @@ -1732,7 +1681,7 @@ mod tests { Value::SingleQuotedString("true".into()), ), ], - constraints: vec![], + ..make_create_external_table("foo.parquet") }); expect_parse_ok(sql, expected)?; @@ -1749,13 +1698,11 @@ mod tests { 'TRUNCATE' 'NO', 'format.has_header' 'true')"; let expected = Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), columns: vec![ make_column_def("c1", DataType::Int(None)), make_column_def("c2", DataType::Float(ExactNumberInfo::None)), ], file_type: "PARQUET".to_string(), - location: "foo.parquet".into(), table_partition_cols: vec!["c1".into()], order_exprs: vec![vec![OrderByExpr { expr: Expr::BinaryOp { @@ -1777,9 +1724,7 @@ mod tests { }, with_fill: None, }]], - if_not_exists: false, or_replace: true, - temporary: false, unbounded: true, options: vec![ ( @@ -1800,7 +1745,7 @@ mod tests { Value::SingleQuotedString("true".into()), ), ], - constraints: vec![], + ..make_create_external_table("foo.parquet") }); expect_parse_ok(sql, expected)?; @@ -2070,21 +2015,10 @@ mod tests { options: vec![], }), { - let name = ObjectName::from(vec![Ident::from("t")]); let display = None; Statement::CreateExternalTable(CreateExternalTable { - name: name.clone(), columns: vec![make_column_def("c1", DataType::Int(display))], - file_type: "CSV".to_string(), - location: "foo.csv".into(), - table_partition_cols: vec![], - order_exprs: vec![], - if_not_exists: false, - or_replace: false, - temporary: false, - unbounded: false, - options: vec![], - constraints: vec![], + ..make_create_external_table("foo.csv") }) }, { diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 8c94610f7764c..2fdff41c5febf 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1797,7 +1797,8 @@ impl SqlToRel<'_, S> { name, columns, file_type, - location, + location: _, + locations, table_partition_cols, if_not_exists, temporary, @@ -1846,8 +1847,14 @@ impl SqlToRel<'_, S> { let name = self.object_name_to_table_reference(name)?; let constraints = self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?; + + let Some(location) = locations.first().cloned() else { + return plan_err!("CREATE EXTERNAL TABLE requires at least one location"); + }; + Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable( PlanCreateExternalTable::builder(name, location, file_type, df_schema) + .with_locations(locations) .with_partition_cols(table_partition_cols) .with_if_not_exists(if_not_exists) .with_or_replace(or_replace) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index ed164a1c63ff3..b46074d92928e 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2267,6 +2267,29 @@ fn create_external_table_csv() { ); } +#[test] +fn create_external_table_multiple_locations() { + let sql = "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION ('foo.csv', 'bar.csv')"; + let plan = logical_plan(sql).unwrap(); + let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = plan else { + panic!("expected a CreateExternalTable plan"); + }; + assert_eq!( + cmd.locations, + vec!["foo.csv".to_string(), "bar.csv".to_string()] + ); +} + +#[test] +fn create_external_table_location_with_literal_comma() { + let sql = "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION 'foo,bar.csv'"; + let plan = logical_plan(sql).unwrap(); + let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = plan else { + panic!("expected a CreateExternalTable plan"); + }; + assert_eq!(cmd.locations, vec!["foo,bar.csv".to_string()]); +} + #[test] fn create_external_table_with_pk() { let sql = "CREATE EXTERNAL TABLE t(c1 int, primary key(c1)) STORED AS CSV LOCATION 'foo.csv'"; diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index f56cff2a2a2f0..a495cc518faad 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -303,3 +303,35 @@ statement error DataFusion error: SQL error: ParserError\("'IF NOT EXISTS' canno CREATE OR REPLACE EXTERNAL TABLE IF NOT EXISTS t_conflict(c1 int) STORED AS CSV LOCATION 'foo.csv'; + +# Multiple listed locations are read together as a single table. +# Each partition-N.csv has 11 rows, so listing exactly two of them (rather than +# the whole directory) yields 22 rows. +statement ok +CREATE EXTERNAL TABLE multi_loc (c1 int, c2 bigint, c3 boolean) +STORED AS CSV +LOCATION ('../core/tests/data/partitioned_csv/partition-0.csv', '../core/tests/data/partitioned_csv/partition-1.csv') +OPTIONS ('format.has_header' 'false'); + +query I +SELECT count(*) FROM multi_loc; +---- +22 + +statement ok +DROP TABLE multi_loc; + +# Whitespace around the list separators is ignored +statement ok +CREATE EXTERNAL TABLE multi_loc_ws (c1 int, c2 bigint, c3 boolean) +STORED AS CSV +LOCATION ( '../core/tests/data/partitioned_csv/partition-0.csv' , '../core/tests/data/partitioned_csv/partition-1.csv' ) +OPTIONS ('format.has_header' 'false'); + +query I +SELECT count(*) FROM multi_loc_ws; +---- +22 + +statement ok +DROP TABLE multi_loc_ws; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b0c7e3f8fe643..bd02f15f53ec6 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -763,7 +763,7 @@ OPTIONS ('format.has_header' 'true'); query TTTT SHOW CREATE TABLE abc; ---- -datafusion public abc CREATE EXTERNAL TABLE abc STORED AS CSV LOCATION ../../testing/data/csv/aggregate_test_100.csv +datafusion public abc CREATE EXTERNAL TABLE abc STORED AS CSV LOCATION '../../testing/data/csv/aggregate_test_100.csv' # show_external_create_table_with_order statement ok @@ -776,7 +776,7 @@ OPTIONS ('format.has_header' 'true'); query TTTT SHOW CREATE TABLE abc_ordered; ---- -datafusion public abc_ordered CREATE EXTERNAL TABLE abc_ordered STORED AS CSV WITH ORDER (c1) LOCATION ../../testing/data/csv/aggregate_test_100.csv +datafusion public abc_ordered CREATE EXTERNAL TABLE abc_ordered STORED AS CSV WITH ORDER (c1) LOCATION '../../testing/data/csv/aggregate_test_100.csv' statement ok DROP TABLE abc_ordered; @@ -792,7 +792,7 @@ OPTIONS ('format.has_header' 'true'); query TTTT SHOW CREATE TABLE abc_multi_order; ---- -datafusion public abc_multi_order CREATE EXTERNAL TABLE abc_multi_order STORED AS CSV WITH ORDER (c1, c2 DESC) LOCATION ../../testing/data/csv/aggregate_test_100.csv +datafusion public abc_multi_order CREATE EXTERNAL TABLE abc_multi_order STORED AS CSV WITH ORDER (c1, c2 DESC) LOCATION '../../testing/data/csv/aggregate_test_100.csv' statement ok DROP TABLE abc_multi_order; @@ -808,7 +808,7 @@ OPTIONS ('format.has_header' 'true'); query TTTT SHOW CREATE TABLE abc_order_nulls; ---- -datafusion public abc_order_nulls CREATE EXTERNAL TABLE abc_order_nulls STORED AS CSV WITH ORDER (c1 NULLS LAST, c2 DESC NULLS FIRST) LOCATION ../../testing/data/csv/aggregate_test_100.csv +datafusion public abc_order_nulls CREATE EXTERNAL TABLE abc_order_nulls STORED AS CSV WITH ORDER (c1 NULLS LAST, c2 DESC NULLS FIRST) LOCATION '../../testing/data/csv/aggregate_test_100.csv' statement ok DROP TABLE abc_order_nulls; diff --git a/docs/source/library-user-guide/upgrading/55.0.0.md b/docs/source/library-user-guide/upgrading/55.0.0.md index ad96e9b878f40..d8a7920bd03e3 100644 --- a/docs/source/library-user-guide/upgrading/55.0.0.md +++ b/docs/source/library-user-guide/upgrading/55.0.0.md @@ -25,6 +25,39 @@ in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version. +### `CreateExternalTable` supports multiple locations + +`CREATE EXTERNAL TABLE` now accepts multiple paths in a single `LOCATION` +clause, which are read together as one table: + +```sql +CREATE EXTERNAL TABLE hits +STORED AS PARQUET +LOCATION ('file_1.parquet', 'file_2.parquet'); +``` + +To support this, the `location` field of `datafusion_expr::CreateExternalTable` +changed from a `String` to a `Vec` named `locations`: + +```rust +// Before (54.0.0) +let location: String = create_external_table.location; + +// After (55.0.0) +let locations: Vec = create_external_table.locations; +``` + +The `CreateExternalTable::builder(name, location, file_type, schema)` +constructor is unchanged and still takes a single location; use the new +`CreateExternalTableBuilder::with_locations(Vec)` to set more than one. +All listed locations must resolve to the same schema and reside on the same +object store. A plain string literal remains a single location, so paths that +contain commas continue to work, for example `LOCATION 'path/with,comma.csv'`. + +The SQL parser AST also exposes `locations` for `CREATE EXTERNAL TABLE`. The +existing `location` field remains as the first location for callers that only +need the single-location value. + ### Decimal scalar formatting uses human-readable values Decimal scalar literals in `EXPLAIN` output, expression display strings, and diff --git a/docs/source/user-guide/sql/ddl.md b/docs/source/user-guide/sql/ddl.md index 3a5c934ae8156..75651aac7d731 100644 --- a/docs/source/user-guide/sql/ddl.md +++ b/docs/source/user-guide/sql/ddl.md @@ -82,6 +82,22 @@ For a comprehensive list of format-specific options that can be specified in the a path to a file or directory of partitioned files locally or on an object store. +Multiple locations can be supplied as a parenthesized list of string literals, +in which case the files are read together as one table: + +```sql +CREATE EXTERNAL TABLE hits +STORED AS PARQUET +LOCATION ( + 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/hits_1.parquet', + 's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/hits_2.parquet' +); +``` + +All listed locations must reside on the same object store and resolve to data +with the same schema. Paths that themselves contain a literal comma can still +be used as a single string literal, for example `LOCATION 'path/with,comma.csv'`. + ### Example: Parquet Parquet data sources can be registered by executing a `CREATE EXTERNAL TABLE` SQL statement such as the following. It is not necessary to