11package io .github .datacatering .datacaterer .core .sink
22
33import com .google .common .util .concurrent .RateLimiter
4- import io .github .datacatering .datacaterer .api .model .Constants .{DELTA , DELTA_LAKE_SPARK_CONF , DRIVER , FORMAT , ICEBERG , ICEBERG_SPARK_CONF , JDBC , OMIT , PARTITIONS , PARTITION_BY , POSTGRES_DRIVER , RATE , ROWS_PER_SECOND , SAVE_MODE , TABLE }
4+ import io .github .datacatering .datacaterer .api .model .Constants .{DELTA , DELTA_LAKE_SPARK_CONF , DRIVER , FORMAT , ICEBERG , ICEBERG_SPARK_CONF , JDBC , JSON , OMIT , PARTITIONS , PARTITION_BY , PATH , POSTGRES_DRIVER , RATE , ROWS_PER_SECOND , SAVE_MODE , TABLE , UNWRAP_TOP_LEVEL }
55import io .github .datacatering .datacaterer .api .model .{FlagsConfig , FoldersConfig , MetadataConfig , SinkResult , Step }
66import io .github .datacatering .datacaterer .api .util .ConfigUtil
77import io .github .datacatering .datacaterer .core .exception .{FailedSaveDataDataFrameV2Exception , FailedSaveDataException }
@@ -91,6 +91,10 @@ class SinkFactory(
9191 .foreach(conf => df.sqlContext.setConf(conf._1, conf._2))
9292 val trySaveData = if (format == ICEBERG ) {
9393 Try (tryPartitionAndSaveDfV2(df, saveMode, connectionConfig))
94+ } else if (format == JSON ) {
95+ // Special-case: allow unwrapping top-level array to emit a bare JSON array file
96+ val tryMaybeUnwrap = Try (trySaveJsonPossiblyUnwrapped(df, saveMode, connectionConfig))
97+ tryMaybeUnwrap
9498 } else {
9599 val partitionedDf = partitionDf(df, connectionConfig)
96100 Try (partitionedDf
@@ -112,6 +116,36 @@ class SinkFactory(
112116 mapToSinkResult(dataSourceName, df, saveMode, connectionConfig, count, format, trySaveData.isSuccess, startTime, optException)
113117 }
114118
119+ private def trySaveJsonPossiblyUnwrapped (df : DataFrame , saveMode : SaveMode , connectionConfig : Map [String , String ]): Unit = {
120+ val shouldUnwrap = detectTopLevelArrayToUnwrap(df)
121+ shouldUnwrap match {
122+ case Some (arrayFieldName) =>
123+ // Write a single file containing the JSON array string using Spark text writer
124+ // We keep directory semantics consistent with other sinks
125+ val path = connectionConfig.getOrElse(PATH , throw new IllegalArgumentException (" Missing path for JSON sink" ))
126+ val jsonArrayDf = df.selectExpr(s " TO_JSON(` " + arrayFieldName + " `) AS value" ).coalesce(1 )
127+ jsonArrayDf.write.mode(saveMode).text(path)
128+ case None =>
129+ // Default JSON behavior
130+ val partitionedDf = partitionDf(df, connectionConfig)
131+ partitionedDf
132+ .format(JSON )
133+ .mode(saveMode)
134+ .options(connectionConfig)
135+ .save()
136+ }
137+ }
138+
139+ private def detectTopLevelArrayToUnwrap (df : DataFrame ): Option [String ] = {
140+ val fields = df.schema.fields
141+ if (fields.length == 1 ) {
142+ val f = fields.head
143+ val hasFlag = f.metadata.contains(UNWRAP_TOP_LEVEL ) && f.metadata.getString(UNWRAP_TOP_LEVEL ).equalsIgnoreCase(" true" )
144+ val isArray = f.dataType.typeName == " array"
145+ if (hasFlag && isArray) Some (f.name) else None
146+ } else None
147+ }
148+
115149 private def partitionDf (df : DataFrame , stepOptions : Map [String , String ]): DataFrameWriter [Row ] = {
116150 val partitionDf = stepOptions.get(PARTITIONS )
117151 .map(partitionNum => df.repartition(partitionNum.toInt)).getOrElse(df)
0 commit comments