Skip to content

Commit d79a0e0

Browse files
committed
Update version to 0.16.10, enhance YAML plan execution via UI/API, and improve connection configuration merging. Add comprehensive documentation and integration tests for YAML plans.
1 parent 6c31dd6 commit d79a0e0

27 files changed

Lines changed: 803 additions & 43 deletions

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ and deep dive into issues [from the generated report](https://data.catering/late
3838

3939
1. Docker
4040
```shell
41-
docker run -d -i -p 9898:9898 -e DEPLOY_MODE=standalone --name datacaterer datacatering/data-caterer:0.16.9
41+
docker run -d -i -p 9898:9898 -e DEPLOY_MODE=standalone --name datacaterer datacatering/data-caterer:10
4242
```
4343
[Open localhost:9898](http://localhost:9898).
4444
1. [Run Scala/Java examples](#run-scalajava-examples)

api/build.gradle.kts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,10 @@ repositories {
3838
}
3939

4040
val basicImpl: Configuration by configurations.creating
41-
val advancedImpl: Configuration by configurations.creating
4241

4342
configurations {
4443
implementation {
4544
extendsFrom(basicImpl)
46-
extendsFrom(advancedImpl)
4745
}
4846
}
4947

@@ -77,7 +75,7 @@ testing {
7775
tasks.test {
7876
jvmArgs("-Djava.security.manager=allow", "-Djdk.module.illegalAccess=deny", "--add-opens=java.base/java.lang=ALL-UNNAMED", "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED", "--add-opens=java.base/java.io=ALL-UNNAMED", "--add-opens=java.base/java.net=ALL-UNNAMED", "--add-opens=java.base/java.nio=ALL-UNNAMED", "--add-opens=java.base/java.util=ALL-UNNAMED", "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED", "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED", "--add-opens=java.base/sun.security.action=ALL-UNNAMED", "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED")
7977

80-
// Only finalize with Scoverage report when configuration cache is disabled
78+
// Only finalize with Scoverage report when configuration cache is disable
8179
if (!isConfigCacheEnabled) {
8280
finalizedBy(tasks.named("reportScoverage"))
8381
}

app/build.gradle.kts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,15 @@ tasks.register<JavaExec>("runUI") {
262262
classpath = sourceSets.main.get().runtimeClasspath
263263
mainClass.set("io.github.datacatering.datacaterer.core.ui.DataCatererUI")
264264

265+
// Set environment variables for YAML plan and task paths (absolute paths)
266+
val projectDir = project.projectDir.absolutePath
267+
val rootProjectDir = project.rootProject.projectDir.absolutePath
268+
environment("PLAN_FILE_PATH", "$rootProjectDir/app/src/test/resources/sample/plan/customer-create-plan.yaml")
269+
environment("TASK_FOLDER_PATH", "$rootProjectDir/app/src/test/resources/sample/task")
270+
271+
// Set working directory to project root to help with relative path resolution
272+
workingDir(rootProjectDir)
273+
265274
// Add JVM arguments similar to docker script
266275
jvmArgs(
267276
"-Djava.security.manager=allow",

app/src/main/scala/io/github/datacatering/datacaterer/core/parser/PlanParser.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ object PlanParser {
1717
private val LOGGER = Logger.getLogger(getClass.getName)
1818
private val OBJECT_MAPPER = ObjectMapperUtil.yamlObjectMapper
1919

20-
def getPlanTasksFromYaml(dataCatererConfiguration: DataCatererConfiguration)
20+
def getPlanTasksFromYaml(dataCatererConfiguration: DataCatererConfiguration, enabledOnly: Boolean = true)
2121
(implicit sparkSession: SparkSession): (Plan, List[Task], Option[List[ValidationConfiguration]]) = {
2222
val parsedPlan = PlanParser.parsePlan(dataCatererConfiguration.foldersConfig.planFilePath)
23-
val enabledPlannedTasks = parsedPlan.tasks.filter(_.enabled)
23+
val enabledPlannedTasks = if (enabledOnly) parsedPlan.tasks.filter(_.enabled) else parsedPlan.tasks
2424
val enabledTaskMap = enabledPlannedTasks.map(t => (t.name, t)).toMap
2525
val planWithEnabledTasks = parsedPlan.copy(tasks = enabledPlannedTasks)
2626

@@ -40,7 +40,7 @@ object PlanParser {
4040
val planFile = FileUtil.getFile(planFilePath)
4141
OBJECT_MAPPER.readValue(planFile, classOf[Plan])
4242
}
43-
LOGGER.info(s"Found plan file and parsed successfully, plan-file-path=$planFilePath, plan-name=${parsedPlan.name}, plan-description=${parsedPlan.description}")
43+
LOGGER.debug(s"Found plan file and parsed successfully, plan-file-path=$planFilePath, plan-name=${parsedPlan.name}, plan-description=${parsedPlan.description}")
4444
parsedPlan
4545
}
4646

app/src/main/scala/io/github/datacatering/datacaterer/core/plan/PlanProcessor.scala

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,36 @@ object PlanProcessor {
103103
private def parsePlan(dataCatererConfiguration: DataCatererConfiguration, optPlan: Option[PlanRun], interface: String)(implicit sparkSession: SparkSession): (PlanRun, String) = {
104104
try {
105105
if (optPlan.isDefined) {
106-
(optPlan.get, interface)
106+
// Check if we need to load from YAML by looking at plan task summaries vs actual tasks
107+
val existingTaskNames = optPlan.get._tasks.map(_.name).toSet
108+
val planTaskNames = optPlan.get._plan.tasks.map(_.name).toSet
109+
val missingTaskNames = planTaskNames.diff(existingTaskNames)
110+
111+
if (missingTaskNames.nonEmpty) {
112+
// Tasks are missing - this means the plan is from a YAML file, not the UI
113+
// Load the entire plan and all tasks from the YAML file
114+
val yamlConfig = ConfigParser.toDataCatererConfiguration
115+
116+
// Find the correct YAML plan file by name in the configured plan directory
117+
val requestedPlanName = optPlan.get._plan.name
118+
val yamlPlanFilePath = findYamlPlanFile(yamlConfig.foldersConfig.planFilePath, requestedPlanName)
119+
120+
// Load the plan from the specific YAML file if found, otherwise use default
121+
val planConfigForParsing = yamlPlanFilePath.map { planPath =>
122+
yamlConfig.copy(
123+
foldersConfig = yamlConfig.foldersConfig.copy(planFilePath = planPath)
124+
)
125+
}.getOrElse(yamlConfig)
126+
127+
// Load everything from YAML - use only YAML configuration, no UI data
128+
val (parsedPlan, enabledTasks, validations) = PlanParser.getPlanTasksFromYaml(planConfigForParsing, false)
129+
130+
val yamlPlanRun = new YamlPlanRun(parsedPlan, enabledTasks, validations, planConfigForParsing)
131+
(yamlPlanRun, DATA_CATERER_INTERFACE_YAML)
132+
} else {
133+
// All tasks are provided by UI, this is a pure UI plan
134+
(optPlan.get, interface)
135+
}
107136
} else {
108137
val (parsedPlan, enabledTasks, validations) = PlanParser.getPlanTasksFromYaml(dataCatererConfiguration)
109138
(new YamlPlanRun(parsedPlan, enabledTasks, validations, dataCatererConfiguration), DATA_CATERER_INTERFACE_YAML)
@@ -114,6 +143,31 @@ object PlanProcessor {
114143
throw parsePlanException
115144
}
116145
}
146+
147+
private def findYamlPlanFile(configuredPlanPath: String, planName: String)(implicit sparkSession: SparkSession): Option[String] = {
148+
import java.io.File
149+
150+
// Get the parent directory from the configured plan file path
151+
val planFile = new File(configuredPlanPath)
152+
val planDir = if (planFile.isDirectory) planFile else new File(planFile.getParent)
153+
154+
if (planDir.exists() && planDir.isDirectory) {
155+
// Look for a YAML file matching the plan name
156+
val matchingFiles = planDir.listFiles()
157+
.filter(f => f.isFile && f.getName.endsWith(".yaml"))
158+
.filter(f => {
159+
// Try to parse the plan and match by name
160+
Try {
161+
val parsed = PlanParser.parsePlan(f.getAbsolutePath)
162+
parsed.name.equalsIgnoreCase(planName)
163+
}.getOrElse(false)
164+
})
165+
166+
matchingFiles.headOption.map(_.getAbsolutePath)
167+
} else {
168+
None
169+
}
170+
}
117171

118172
private def handleException(
119173
exception: Exception,
@@ -158,8 +212,8 @@ class YamlPlanRun(
158212
dataCatererConfiguration: DataCatererConfiguration
159213
) extends PlanRun {
160214
_plan = yamlPlan
161-
_tasks = yamlTasks
162215
_validations = validations.getOrElse(List())
216+
163217
//get any metadata configuration from tasks for data sources and add to configuration
164218
private val tasksWithMetadataOptions = yamlTasks.filter(t => t.steps.nonEmpty)
165219
.map(t => {
@@ -168,5 +222,20 @@ class YamlPlanRun(
168222
}).toMap
169223
private val updatedConnectionConfig = dataCatererConfiguration.connectionConfigByName
170224
.map(c => c._1 -> (tasksWithMetadataOptions.getOrElse(c._1, Map()) ++ c._2))
225+
226+
// Merge connection configuration into task step options
227+
// This ensures that step.options contains all necessary connection details like 'format'
228+
_tasks = yamlTasks.map(task => {
229+
val dataSourceName = yamlPlan.tasks.find(ts => ts.name.equalsIgnoreCase(task.name)).get.dataSourceName
230+
val connectionConfig = updatedConnectionConfig.getOrElse(dataSourceName, Map())
231+
232+
// Merge connection config into each step's options (connection config as base, step options override)
233+
val stepsWithConnectionConfig = task.steps.map(step => {
234+
step.copy(options = connectionConfig ++ step.options)
235+
})
236+
237+
task.copy(steps = stepsWithConnectionConfig)
238+
})
239+
171240
_configuration = dataCatererConfiguration.copy(connectionConfigByName = updatedConnectionConfig)
172241
}

app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/ConnectionRepository.scala

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.github.datacatering.datacaterer.core.ui.plan
22

3+
import io.github.datacatering.datacaterer.api.model.Constants.FORMAT
4+
import io.github.datacatering.datacaterer.core.config.ConfigParser
35
import io.github.datacatering.datacaterer.core.ui.config.UiConfiguration.INSTALL_DIRECTORY
46
import io.github.datacatering.datacaterer.core.ui.model.{Connection, GetConnectionsResponse, SaveConnectionsRequest}
57
import org.apache.log4j.Logger
@@ -60,19 +62,58 @@ object ConnectionRepository {
6062

6163
def getConnection(name: String, masking: Boolean = true): Connection = {
6264
LOGGER.debug(s"Getting connection details, connection-name=$name")
65+
66+
// First try to get from saved connections file
6367
val connectionFile = Path.of(s"$connectionSaveFolder/$name.csv")
64-
val tryConnection = Try(Connection.fromString(Files.readString(connectionFile), name, masking))
65-
tryConnection match {
68+
val tryConnectionFromFile = if (connectionFile.toFile.exists()) {
69+
Try(Connection.fromString(Files.readString(connectionFile), name, masking))
70+
} else {
71+
Failure(new IllegalArgumentException(s"Connection file not found: $name"))
72+
}
73+
74+
tryConnectionFromFile match {
6675
case Success(connection) => connection.copy(options = connection.options)
67-
case Failure(exception) => throw exception
76+
case Failure(fileException) =>
77+
// If not found in file, try to get from application.conf
78+
LOGGER.debug(s"Connection not found in file, checking application.conf, connection-name=$name")
79+
Try(getConnectionFromConfig(name, masking)) match {
80+
case Success(conn) => conn
81+
case Failure(configException) =>
82+
// If not found in either location, throw the original exception
83+
throw new IllegalArgumentException(s"Connection not found: $name", fileException)
84+
}
85+
}
86+
}
87+
88+
/**
89+
* Get connection details from application.conf
90+
*/
91+
private def getConnectionFromConfig(name: String, masking: Boolean = true): Connection = {
92+
val connectionConfigs = ConfigParser.connectionConfigsByName
93+
connectionConfigs.get(name) match {
94+
case Some(config) =>
95+
val format = config.getOrElse(FORMAT, "unknown")
96+
val options = if (masking) {
97+
config.map {
98+
case (key, value) if key.contains("password") || key.contains("token") => (key, "***")
99+
case (key, value) => (key, value)
100+
}
101+
} else {
102+
config
103+
}
104+
Connection(name, format, Some("data-source"), options - FORMAT)
105+
case None =>
106+
throw new IllegalArgumentException(s"Connection not found in application.conf: $name")
68107
}
69108
}
70109

71110
private def getAllConnections(optConnectionGroupType: Option[String], masking: Boolean = true): GetConnectionsResponse = {
72111
LOGGER.debug(s"Getting all connection details, connection-group=${optConnectionGroupType.getOrElse("")}")
112+
113+
// Get connections from files
73114
val connectionPath = Path.of(connectionSaveFolder)
74115
if (!connectionPath.toFile.exists()) connectionPath.toFile.mkdirs()
75-
val connections = Files.list(connectionPath)
116+
val fileConnections = Files.list(connectionPath)
76117
.iterator()
77118
.asScala
78119
.map(file => {
@@ -85,9 +126,38 @@ object ConnectionRepository {
85126
.filter(_.isSuccess)
86127
.map(_.get)
87128
.toList
129+
130+
// Get connections from application.conf
131+
val configConnections = getConnectionsFromConfig(masking)
132+
133+
// Merge connections, with file connections taking priority (deduplicating by name)
134+
val allConnections = (fileConnections ++ configConnections)
135+
.groupBy(_.name)
136+
.map(_._2.head) // Take first occurrence (file connection if exists, otherwise config)
137+
.toList
88138
.filter(conn => optConnectionGroupType.forall(conn.groupType.contains))
89139
.sortBy(_.name)
90-
GetConnectionsResponse(connections)
140+
141+
GetConnectionsResponse(allConnections)
142+
}
143+
144+
/**
145+
* Get all connections from application.conf
146+
*/
147+
private def getConnectionsFromConfig(masking: Boolean = true): List[Connection] = {
148+
val connectionConfigs = ConfigParser.connectionConfigsByName
149+
connectionConfigs.map { case (name, config) =>
150+
val format = config.getOrElse(FORMAT, "unknown")
151+
val options = if (masking) {
152+
config.map {
153+
case (key, value) if key.contains("password") || key.contains("token") => (key, "***")
154+
case (key, value) => (key, value)
155+
}
156+
} else {
157+
config
158+
}
159+
Connection(name, format, Some("data-source"), options - FORMAT)
160+
}.toList
91161
}
92162

93163
private def removeConnection(connectionName: String): Boolean = {

0 commit comments

Comments
 (0)