Skip to content

Commit 3598bc3

Browse files
committed
Resolve PR comments
1 parent 8f282be commit 3598bc3

5 files changed

Lines changed: 86 additions & 15 deletions

File tree

app/src/main/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardV3Mapper.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,12 @@ object OpenDataContractStandardV3Mapper {
5353
}
5454

5555
private def getBasePropertyMetadata(property: OpenDataContractStandardElementV3, dataType: DataType): Map[String, String] = {
56+
// required=true means NOT nullable; required=false means nullable
57+
val isNullable = !property.required.getOrElse(false)
5658
val baseMetadata = Map(
5759
FIELD_DATA_TYPE -> dataType.toString(),
58-
IS_NULLABLE -> property.required.getOrElse(false).toString,
59-
ENABLED_NULL -> property.required.getOrElse(false).toString,
60+
IS_NULLABLE -> isNullable.toString,
61+
ENABLED_NULL -> isNullable.toString,
6062
IS_PRIMARY_KEY -> property.primaryKey.getOrElse(false).toString,
6163
PRIMARY_KEY_POSITION -> property.primaryKeyPosition.getOrElse("-1").toString,
6264
IS_UNIQUE -> property.unique.getOrElse(false).toString,

app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,28 +127,41 @@ class SinkFactory(
127127
}
128128

129129
// If save was successful and consolidation is needed, consolidate the files
130-
if (trySaveData.isSuccess && shouldConsolidate && targetFilePath.isDefined) {
130+
val tryConsolidation = if (trySaveData.isSuccess && shouldConsolidate && targetFilePath.isDefined) {
131131
val tempPath = actualConnectionConfig(PATH)
132-
Try(consolidatePartFiles(tempPath, targetFilePath.get, format, connectionConfig)) match {
132+
val consolidationResult = Try(consolidatePartFiles(tempPath, targetFilePath.get, format, connectionConfig))
133+
consolidationResult match {
133134
case Failure(exception) =>
134135
LOGGER.error(s"Failed to consolidate part files from $tempPath to ${targetFilePath.get}", exception)
135136
// Clean up temp directory even if consolidation failed
136137
Try(cleanupDirectory(Paths.get(tempPath)))
137138
case Success(_) =>
138139
LOGGER.info(s"Successfully consolidated files to ${targetFilePath.get}")
140+
// Clean up temp directory after successful consolidation
141+
Try(cleanupDirectory(Paths.get(tempPath))) match {
142+
case Failure(cleanupException) =>
143+
LOGGER.warn(s"Failed to clean up temporary directory: $tempPath", cleanupException)
144+
case Success(_) =>
145+
LOGGER.debug(s"Cleaned up temporary directory: $tempPath")
146+
}
139147
}
148+
consolidationResult
149+
} else {
150+
Success(())
140151
}
141152

142-
val optException = trySaveData match {
143-
case Failure(exception) => Some(exception)
144-
case Success(_) => None
153+
val optException = (trySaveData, tryConsolidation) match {
154+
case (Failure(exception), _) => Some(exception)
155+
case (_, Failure(exception)) => Some(exception)
156+
case _ => None
145157
}
146158
if (trySaveData.isFailure && retry < 3) {
147159
LOGGER.info(s"Retrying save to data source, data-source-name=$dataSourceName, retry=$retry")
148160
val connectionConfigWithBatchSize = connectionConfig ++ Map("batchsize" -> "1")
149161
saveBatchData(dataSourceName, df, saveMode, connectionConfigWithBatchSize, count, startTime, retry + 1)
150162
}
151-
mapToSinkResult(dataSourceName, df, saveMode, connectionConfig, count, format, trySaveData.isSuccess, startTime, optException)
163+
val isSuccess = trySaveData.isSuccess && tryConsolidation.isSuccess
164+
mapToSinkResult(dataSourceName, df, saveMode, connectionConfig, count, format, isSuccess, startTime, optException)
152165
}
153166

154167
private def trySaveJsonPossiblyUnwrapped(df: DataFrame, saveMode: SaveMode, connectionConfig: Map[String, String]): Unit = {
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Test file to verify required field mapping to nullable metadata
2+
domain: test
3+
dataProduct: nullable-test
4+
version: 1.0.0
5+
status: current
6+
id: test-nullable-mapping
7+
8+
kind: DataContract
9+
apiVersion: v3.0.0
10+
11+
schema:
12+
- name: test_table
13+
physicalName: test_table
14+
physicalType: table
15+
description: Test table for nullable field mapping
16+
properties:
17+
- name: required_field
18+
logicalType: string
19+
physicalType: varchar(100)
20+
required: true
21+
description: This field is required, so it should NOT be nullable
22+
primaryKey: false
23+
primaryKeyPosition: -1
24+
- name: optional_field
25+
logicalType: string
26+
physicalType: varchar(100)
27+
required: false
28+
description: This field is optional, so it should be nullable
29+
primaryKey: false
30+
primaryKeyPosition: -1
31+

app/src/test/resources/sample/plan/simple_json_plan.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ tasks: []
55
sinkOptions:
66
foreignKeys: []
77
validations: []
8-
runId: "daaa3db2-0d8d-45e8-abcd-c7546137bd18"
8+
runId: "9fd1279d-c5a7-4505-b59d-819daf2df055"

app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadataTest.scala

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,12 @@ class OpenDataContractStandardDataSourceMetadataTest extends SparkSuite {
3737
assertResult(true)(resultCols.exists(_.field == "txn_ref_dt"))
3838
val txnDateCol = resultCols.filter(_.field == "txn_ref_dt").head
3939
val txnCluster = if (isVersion2) Map(CLUSTERING_POSITION -> "-1") else Map()
40+
// v2 uses isNullable directly; v3 inverts required field
41+
val isNullableValue = if (isVersion2) "false" else "true"
4042
val expectedTxnDateMetadata = Map(
4143
IS_PRIMARY_KEY -> "false",
42-
IS_NULLABLE -> "false",
43-
ENABLED_NULL -> "false",
44+
IS_NULLABLE -> isNullableValue,
45+
ENABLED_NULL -> isNullableValue,
4446
IS_UNIQUE -> "false",
4547
PRIMARY_KEY_POSITION -> "-1",
4648
FIELD_DATA_TYPE -> "date"
@@ -57,10 +59,11 @@ class OpenDataContractStandardDataSourceMetadataTest extends SparkSuite {
5759
assertResult(true)(resultCols.exists(_.field == "rcvr_id"))
5860
val rcvrIdCol = resultCols.filter(_.field == "rcvr_id").head
5961
val rcvrIdCluster = if (isVersion2) Map(CLUSTERING_POSITION -> "1") else Map()
62+
// v2 uses isNullable directly; v3 inverts required field
6063
val expectedRcvrIdMetadata = Map(
6164
IS_PRIMARY_KEY -> "true",
62-
IS_NULLABLE -> "false",
63-
ENABLED_NULL -> "false",
65+
IS_NULLABLE -> isNullableValue,
66+
ENABLED_NULL -> isNullableValue,
6467
IS_UNIQUE -> "false",
6568
PRIMARY_KEY_POSITION -> "1",
6669
FIELD_DATA_TYPE -> "string"
@@ -74,10 +77,11 @@ class OpenDataContractStandardDataSourceMetadataTest extends SparkSuite {
7477
assertResult(true)(resultCols.exists(_.field == "rcvr_cntry_code"))
7578
val countryCodeCol = resultCols.filter(_.field == "rcvr_cntry_code").head
7679
val countryCodeCluster = if (isVersion2) Map(CLUSTERING_POSITION -> "-1") else Map()
80+
// v2 uses isNullable directly; v3 inverts required field
7781
val expectedCountryCodeMetadata = Map(
7882
IS_PRIMARY_KEY -> "false",
79-
IS_NULLABLE -> "false",
80-
ENABLED_NULL -> "false",
83+
IS_NULLABLE -> isNullableValue,
84+
ENABLED_NULL -> isNullableValue,
8185
IS_UNIQUE -> "false",
8286
PRIMARY_KEY_POSITION -> "-1",
8387
FIELD_DATA_TYPE -> "string"
@@ -88,4 +92,25 @@ class OpenDataContractStandardDataSourceMetadataTest extends SparkSuite {
8892
} else Map()
8993
assertResult(expectedCountryCodeMetadata ++ v3CountryCodeMetadata)(countryCodeCol.metadata)
9094
}
95+
96+
test("Can correctly map required field to nullable metadata") {
97+
// Test that required=true means NOT nullable, and required=false means nullable
98+
val connectionConfig = Map(DATA_CONTRACT_FILE -> "src/test/resources/sample/metadata/odcs/nullable-test-v3.odcs.yaml")
99+
val odcsMetadata = OpenDataContractStandardDataSourceMetadata("odcs", "parquet", connectionConfig)
100+
val result = odcsMetadata.getSubDataSourcesMetadata
101+
102+
assertResult(1)(result.length)
103+
val resultCols = result.head.optFieldMetadata.get.collect()
104+
assertResult(2)(resultCols.length)
105+
106+
// Field with required=true should have IS_NULLABLE=false
107+
val requiredField = resultCols.filter(_.field == "required_field").head
108+
assertResult("false")(requiredField.metadata(IS_NULLABLE))
109+
assertResult("false")(requiredField.metadata(ENABLED_NULL))
110+
111+
// Field with required=false should have IS_NULLABLE=true
112+
val optionalField = resultCols.filter(_.field == "optional_field").head
113+
assertResult("true")(optionalField.metadata(IS_NULLABLE))
114+
assertResult("true")(optionalField.metadata(ENABLED_NULL))
115+
}
91116
}

0 commit comments

Comments
 (0)