Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,27 @@ class CatalogSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelpe
}
}

test("createTable(tableName, path) uses spark.sql.sources.default") {
val tableName = "default_source_table"
withSQLConf("spark.sql.sources.default" -> "json") {
withTable(tableName) {
withTempPath { dir =>
val session = spark
import session.implicits._
// Write the data as JSON. If createTable hardcoded the parquet provider, reading the
// table back would fail because the files are not parquet.
Seq((1, "a")).toDF("id", "value").write.json(dir.getPath)
spark.catalog.createTable(tableName, dir.getPath)
assert(spark.catalog.tableExists(tableName))
val ddl = spark.catalog.getCreateTableString(tableName)
assert(ddl.toLowerCase(java.util.Locale.ROOT).contains("using json"))
// Reading the table back succeeds only if it was created with the json provider.
assert(spark.table(tableName).count() == 1)
}
}
}
}

test("Cache Table APIs") {
val parquetTableName = "parquet_table"
withTable(parquetTableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,15 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog {
* @since 3.5.0
*/
override def createTable(tableName: String, path: String): DataFrame = {
createTable(tableName, path, "parquet")
// Leave the source unset so the server resolves spark.sql.sources.default, as documented
// above. Routing through createTable(tableName, path, "parquet") would hardcode the provider
// and ignore that configuration.
createTable(
tableName = tableName,
source = None,
schema = new StructType,
description = "",
options = Map("path" -> path))
}

/**
Expand Down Expand Up @@ -478,12 +486,26 @@ class Catalog(sparkSession: SparkSession) extends catalog.Catalog {
schema: StructType,
description: String,
options: Map[String, String]): DataFrame = {
createTable(tableName, Some(source), schema, description, options)
}

/**
* Shared implementation for the public `createTable` overloads. When `source` is `None`, the
* proto's `source` field is left unset so the server resolves `spark.sql.sources.default`;
* otherwise the provided source is pinned via `setSource`.
*/
private def createTable(
tableName: String,
source: Option[String],
schema: StructType,
description: String,
options: Map[String, String]): DataFrame = {
sparkSession.execute { builder =>
val createTableBuilder = builder.getCatalogBuilder.getCreateTableBuilder
.setTableName(tableName)
.setSource(source)
.setSchema(DataTypeProtoConverter.toConnectProtoType(schema))
.setDescription(description)
source.foreach(createTableBuilder.setSource)
options.foreach { case (k, v) =>
createTableBuilder.putOptions(k, v)
}
Expand Down