Skip to content

Commit eec01af

Browse files
committed
Resolve PR comments
1 parent 3598bc3 commit eec01af

2 files changed

Lines changed: 31 additions & 14 deletions

File tree

app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -530,22 +530,36 @@ class SinkFactory(
530530
private def consolidateCsvWithHeaders(partFiles: Array[java.nio.file.Path], targetPath: java.nio.file.Path): Unit = {
531531
LOGGER.debug(s"Consolidating CSV files with header handling")
532532
val targetFile = Files.newOutputStream(targetPath)
533+
val lineSeparator = System.lineSeparator().getBytes("UTF-8")
534+
533535
try {
534-
partFiles.zipWithIndex.foreach { case (partFile, index) =>
536+
var headerWritten = false
537+
538+
partFiles.foreach { partFile =>
535539
val lines = Files.readAllLines(partFile)
536-
if (index == 0) {
537-
// First file: write all lines including header
538-
lines.forEach(line => {
539-
targetFile.write(line.getBytes("UTF-8"))
540-
targetFile.write('\n')
541-
})
540+
541+
if (lines.isEmpty) {
542+
LOGGER.debug(s"Skipping empty part file: ${partFile.getFileName}")
543+
} else if (!headerWritten) {
544+
// First non-empty file: write all lines including header
545+
for (i <- 0 until lines.size()) {
546+
targetFile.write(lines.get(i).getBytes("UTF-8"))
547+
if (i < lines.size() - 1 || partFiles.length > 1) {
548+
// Write line separator except for the last line of the last file
549+
targetFile.write(lineSeparator)
550+
}
551+
}
552+
headerWritten = true
542553
} else {
543554
// Subsequent files: skip first line (header) and write the rest
544555
if (lines.size() > 1) {
545-
lines.subList(1, lines.size()).forEach(line => {
546-
targetFile.write(line.getBytes("UTF-8"))
547-
targetFile.write('\n')
548-
})
556+
for (i <- 1 until lines.size()) {
557+
targetFile.write(lines.get(i).getBytes("UTF-8"))
558+
if (i < lines.size() - 1 || partFiles.indexOf(partFile) < partFiles.length - 1) {
559+
// Write line separator except for the last line of the last file
560+
targetFile.write(lineSeparator)
561+
}
562+
}
549563
}
550564
}
551565
}

app/src/test/scala/io/github/datacatering/datacaterer/core/generator/metadata/datasource/opendatacontractstandard/OpenDataContractStandardDataSourceMetadataTest.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ class OpenDataContractStandardDataSourceMetadataTest extends SparkSuite {
3737
assertResult(true)(resultCols.exists(_.field == "txn_ref_dt"))
3838
val txnDateCol = resultCols.filter(_.field == "txn_ref_dt").head
3939
val txnCluster = if (isVersion2) Map(CLUSTERING_POSITION -> "-1") else Map()
40-
// v2 uses isNullable directly; v3 inverts required field
40+
// v2: isNullable=false in test data → IS_NULLABLE="false"
41+
// v3: required=false in test data → IS_NULLABLE="true" (mapper inverts: !false = true)
4142
val isNullableValue = if (isVersion2) "false" else "true"
4243
val expectedTxnDateMetadata = Map(
4344
IS_PRIMARY_KEY -> "false",
@@ -59,7 +60,8 @@ class OpenDataContractStandardDataSourceMetadataTest extends SparkSuite {
5960
assertResult(true)(resultCols.exists(_.field == "rcvr_id"))
6061
val rcvrIdCol = resultCols.filter(_.field == "rcvr_id").head
6162
val rcvrIdCluster = if (isVersion2) Map(CLUSTERING_POSITION -> "1") else Map()
62-
// v2 uses isNullable directly; v3 inverts required field
63+
// v2: isNullable=false in test data → IS_NULLABLE="false"
64+
// v3: required=false in test data → IS_NULLABLE="true" (mapper inverts: !false = true)
6365
val expectedRcvrIdMetadata = Map(
6466
IS_PRIMARY_KEY -> "true",
6567
IS_NULLABLE -> isNullableValue,
@@ -77,7 +79,8 @@ class OpenDataContractStandardDataSourceMetadataTest extends SparkSuite {
7779
assertResult(true)(resultCols.exists(_.field == "rcvr_cntry_code"))
7880
val countryCodeCol = resultCols.filter(_.field == "rcvr_cntry_code").head
7981
val countryCodeCluster = if (isVersion2) Map(CLUSTERING_POSITION -> "-1") else Map()
80-
// v2 uses isNullable directly; v3 inverts required field
82+
// v2: isNullable=false in test data → IS_NULLABLE="false"
83+
// v3: required=false in test data → IS_NULLABLE="true" (mapper inverts: !false = true)
8184
val expectedCountryCodeMetadata = Map(
8285
IS_PRIMARY_KEY -> "false",
8386
IS_NULLABLE -> isNullableValue,

0 commit comments

Comments
 (0)