Skip to content

Commit c9aaa77

Browse files
authored
Feature/sql array ref (#92)
* Don't omit fields before the sink to allow foreign keys to use omit fields * Allow for deeply nested SQL references for arrays and objects, allow for unwrapping top level arrays for JSON output
1 parent afa9fb4 commit c9aaa77

15 files changed

Lines changed: 1335 additions & 134 deletions

File tree

api/src/main/scala/io/github/datacatering/datacaterer/api/TaskBuilder.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -966,6 +966,14 @@ case class FieldBuilder(field: Field = Field()) {
966966
def omit(omit: Boolean): FieldBuilder =
967967
this.modify(_.field.options).setTo(getGenBuilder.omit(omit).options)
968968

969+
/**
970+
* Marks this field such that if it is the sole top-level field and is an array in a JSON task, the sink should
971+
* output a bare JSON array (no enclosing object with the field name).
972+
* Has no effect for non-JSON sinks or non-top-level contexts.
973+
*/
974+
def unwrapTopLevelArray(enable: Boolean): FieldBuilder =
975+
this.modify(_.field.options).setTo(getGenBuilder.unwrapTopLevel(enable).options)
976+
969977
/**
970978
* Sets the primary key flag for the current field.
971979
*
@@ -1536,6 +1544,12 @@ case class GeneratorBuilder(options: Map[String, Any] = Map()) {
15361544
def omit(omit: Boolean): GeneratorBuilder =
15371545
this.modify(_.options)(_ ++ Map(OMIT -> omit.toString))
15381546

1547+
/**
1548+
* Instruct JSON sink to unwrap the top-level field if it is a single array field.
1549+
*/
1550+
def unwrapTopLevel(enable: Boolean): GeneratorBuilder =
1551+
this.modify(_.options)(_ ++ Map(UNWRAP_TOP_LEVEL -> enable.toString))
1552+
15391553
/**
15401554
* Field is a primary key of the data source.
15411555
*

api/src/main/scala/io/github/datacatering/datacaterer/api/ValidationBuilder.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -996,9 +996,25 @@ case class FieldValidationBuilder(validationBuilder: ValidationBuilder = Validat
996996
private def removeTicksField: String = field.replaceAll("`", "")
997997

998998
private def seqToString(seq: Seq[Any]): String = {
999-
seq.head match {
1000-
case _: String => seq.mkString("'", "','", "'")
1001-
case _ => seq.mkString(",")
999+
// If all values are numeric or numeric-looking strings, render as unquoted numbers to form ARRAY<DOUBLE/NUMERIC>
1000+
val allNumericLike = seq.forall {
1001+
case s: String => Try(s.trim.toDouble).isSuccess
1002+
case n: java.lang.Number => true
1003+
case n: Number => true
1004+
case _ => false
1005+
}
1006+
if (allNumericLike) {
1007+
seq.map {
1008+
case s: String => BigDecimal(s.trim).toString()
1009+
case n: java.lang.Number => n.toString
1010+
case n: Number => n.toString
1011+
case other => other.toString
1012+
}.mkString(",")
1013+
} else {
1014+
seq.head match {
1015+
case _: String => seq.mkString("'", "','", "'")
1016+
case _ => seq.mkString(",")
1017+
}
10021018
}
10031019
}
10041020
}

api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ object Constants {
6161
lazy val DATA_CONTRACT_FILE = "dataContractFile"
6262
lazy val DATA_CONTRACT_SCHEMA = "dataContractSchema"
6363
lazy val ROWS_PER_SECOND = "rowsPerSecond"
64+
// When applied on a top-level array field in a JSON task, instructs the sink to output a bare JSON array
65+
// instead of an object { fieldName: [...] }.
66+
lazy val UNWRAP_TOP_LEVEL = "unwrapTopLevel"
6467
lazy val HUDI_TABLE_NAME = "hoodie.table.name"
6568
lazy val ICEBERG_CATALOG_TYPE = "catalogType"
6669
lazy val ICEBERG_CATALOG_URI = "catalogUri"

app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.github.datacatering.datacaterer.core.sink
22

33
import com.google.common.util.concurrent.RateLimiter
4-
import io.github.datacatering.datacaterer.api.model.Constants.{DELTA, DELTA_LAKE_SPARK_CONF, DRIVER, FORMAT, ICEBERG, ICEBERG_SPARK_CONF, JDBC, OMIT, PARTITIONS, PARTITION_BY, POSTGRES_DRIVER, RATE, ROWS_PER_SECOND, SAVE_MODE, TABLE}
4+
import io.github.datacatering.datacaterer.api.model.Constants.{DELTA, DELTA_LAKE_SPARK_CONF, DRIVER, FORMAT, ICEBERG, ICEBERG_SPARK_CONF, JDBC, JSON, OMIT, PARTITIONS, PARTITION_BY, PATH, POSTGRES_DRIVER, RATE, ROWS_PER_SECOND, SAVE_MODE, TABLE, UNWRAP_TOP_LEVEL}
55
import io.github.datacatering.datacaterer.api.model.{FlagsConfig, FoldersConfig, MetadataConfig, SinkResult, Step}
66
import io.github.datacatering.datacaterer.api.util.ConfigUtil
77
import io.github.datacatering.datacaterer.core.exception.{FailedSaveDataDataFrameV2Exception, FailedSaveDataException}
@@ -91,6 +91,10 @@ class SinkFactory(
9191
.foreach(conf => df.sqlContext.setConf(conf._1, conf._2))
9292
val trySaveData = if (format == ICEBERG) {
9393
Try(tryPartitionAndSaveDfV2(df, saveMode, connectionConfig))
94+
} else if (format == JSON) {
95+
// Special-case: allow unwrapping top-level array to emit a bare JSON array file
96+
val tryMaybeUnwrap = Try(trySaveJsonPossiblyUnwrapped(df, saveMode, connectionConfig))
97+
tryMaybeUnwrap
9498
} else {
9599
val partitionedDf = partitionDf(df, connectionConfig)
96100
Try(partitionedDf
@@ -112,6 +116,36 @@ class SinkFactory(
112116
mapToSinkResult(dataSourceName, df, saveMode, connectionConfig, count, format, trySaveData.isSuccess, startTime, optException)
113117
}
114118

119+
private def trySaveJsonPossiblyUnwrapped(df: DataFrame, saveMode: SaveMode, connectionConfig: Map[String, String]): Unit = {
120+
val shouldUnwrap = detectTopLevelArrayToUnwrap(df)
121+
shouldUnwrap match {
122+
case Some(arrayFieldName) =>
123+
// Write a single file containing the JSON array string using Spark text writer
124+
// We keep directory semantics consistent with other sinks
125+
val path = connectionConfig.getOrElse(PATH, throw new IllegalArgumentException("Missing path for JSON sink"))
126+
val jsonArrayDf = df.selectExpr(s"TO_JSON(`" + arrayFieldName + "`) AS value").coalesce(1)
127+
jsonArrayDf.write.mode(saveMode).text(path)
128+
case None =>
129+
// Default JSON behavior
130+
val partitionedDf = partitionDf(df, connectionConfig)
131+
partitionedDf
132+
.format(JSON)
133+
.mode(saveMode)
134+
.options(connectionConfig)
135+
.save()
136+
}
137+
}
138+
139+
private def detectTopLevelArrayToUnwrap(df: DataFrame): Option[String] = {
140+
val fields = df.schema.fields
141+
if (fields.length == 1) {
142+
val f = fields.head
143+
val hasFlag = f.metadata.contains(UNWRAP_TOP_LEVEL) && f.metadata.getString(UNWRAP_TOP_LEVEL).equalsIgnoreCase("true")
144+
val isArray = f.dataType.typeName == "array"
145+
if (hasFlag && isArray) Some(f.name) else None
146+
} else None
147+
}
148+
115149
private def partitionDf(df: DataFrame, stepOptions: Map[String, String]): DataFrameWriter[Row] = {
116150
val partitionDf = stepOptions.get(PARTITIONS)
117151
.map(partitionNum => df.repartition(partitionNum.toInt)).getOrElse(df)

0 commit comments

Comments
 (0)