@@ -12,9 +12,10 @@ import io.github.datacatering.datacaterer.core.model.Constants.METADATA_CONNECTI
1212import io .github .datacatering .datacaterer .core .model .PlanRunResults
1313import io .github .datacatering .datacaterer .core .parser .PlanParser
1414import io .github .datacatering .datacaterer .core .util .SparkProvider
15+ import org .apache .log4j .Logger
1516import org .apache .spark .sql .SparkSession
1617
17- import scala .util .{Success , Try }
18+ import scala .util .{Failure , Success , Try }
1819
1920object PlanProcessor {
2021
@@ -103,7 +104,36 @@ object PlanProcessor {
103104 private def parsePlan (dataCatererConfiguration : DataCatererConfiguration , optPlan : Option [PlanRun ], interface : String )(implicit sparkSession : SparkSession ): (PlanRun , String ) = {
104105 try {
105106 if (optPlan.isDefined) {
106- (optPlan.get, interface)
107+ // Check if we need to load from YAML by looking at plan task summaries vs actual tasks
108+ val existingTaskNames = optPlan.get._tasks.map(_.name).toSet
109+ val planTaskNames = optPlan.get._plan.tasks.map(_.name).toSet
110+ val missingTaskNames = planTaskNames.diff(existingTaskNames)
111+
112+ if (missingTaskNames.nonEmpty) {
113+ // Tasks are missing - this means the plan is from a YAML file, not the UI
114+ // Load the entire plan and all tasks from the YAML file
115+ val yamlConfig = ConfigParser .toDataCatererConfiguration
116+
117+ // Find the correct YAML plan file by name in the configured plan directory
118+ val requestedPlanName = optPlan.get._plan.name
119+ val yamlPlanFilePath = findYamlPlanFile(yamlConfig.foldersConfig.planFilePath, requestedPlanName)
120+
121+ // Load the plan from the specific YAML file if found, otherwise use default
122+ val planConfigForParsing = yamlPlanFilePath.map { planPath =>
123+ yamlConfig.copy(
124+ foldersConfig = yamlConfig.foldersConfig.copy(planFilePath = planPath)
125+ )
126+ }.getOrElse(yamlConfig)
127+
128+ // Load everything from YAML - use only YAML configuration, no UI data
129+ val (parsedPlan, enabledTasks, validations) = PlanParser .getPlanTasksFromYaml(planConfigForParsing, false )
130+
131+ val yamlPlanRun = new YamlPlanRun (parsedPlan, enabledTasks, validations, planConfigForParsing)
132+ (yamlPlanRun, DATA_CATERER_INTERFACE_YAML )
133+ } else {
134+ // All tasks are provided by UI, this is a pure UI plan
135+ (optPlan.get, interface)
136+ }
107137 } else {
108138 val (parsedPlan, enabledTasks, validations) = PlanParser .getPlanTasksFromYaml(dataCatererConfiguration)
109139 (new YamlPlanRun (parsedPlan, enabledTasks, validations, dataCatererConfiguration), DATA_CATERER_INTERFACE_YAML )
@@ -114,6 +144,38 @@ object PlanProcessor {
114144 throw parsePlanException
115145 }
116146 }
147+
148+ private def findYamlPlanFile (configuredPlanPath : String , planName : String )(implicit sparkSession : SparkSession ): Option [String ] = {
149+ import java .io .File
150+
151+ val logger = Logger .getLogger(getClass.getName)
152+
153+ // Get the parent directory from the configured plan file path
154+ val planFile = new File (configuredPlanPath)
155+ val planDir = if (planFile.isDirectory) planFile else new File (planFile.getParent)
156+
157+ if (planDir.exists() && planDir.isDirectory) {
158+ // Look for a YAML file matching the plan name
159+ val matchingFiles = planDir.listFiles()
160+ .filter(f => f.isFile && f.getName.endsWith(" .yaml" ))
161+ .filter(f => {
162+ // Try to parse the plan and match by name
163+ Try {
164+ val parsed = PlanParser .parsePlan(f.getAbsolutePath)
165+ parsed.name.equalsIgnoreCase(planName)
166+ } match {
167+ case Success (matches) => matches
168+ case Failure (ex) =>
169+ logger.warn(s " Failed to parse YAML plan file: ${f.getAbsolutePath}" , ex)
170+ false
171+ }
172+ })
173+
174+ matchingFiles.headOption.map(_.getAbsolutePath)
175+ } else {
176+ None
177+ }
178+ }
117179
118180 private def handleException (
119181 exception : Exception ,
@@ -158,8 +220,8 @@ class YamlPlanRun(
158220 dataCatererConfiguration : DataCatererConfiguration
159221 ) extends PlanRun {
160222 _plan = yamlPlan
161- _tasks = yamlTasks
162223 _validations = validations.getOrElse(List ())
224+
163225 // get any metadata configuration from tasks for data sources and add to configuration
164226 private val tasksWithMetadataOptions = yamlTasks.filter(t => t.steps.nonEmpty)
165227 .map(t => {
@@ -168,5 +230,20 @@ class YamlPlanRun(
168230 }).toMap
169231 private val updatedConnectionConfig = dataCatererConfiguration.connectionConfigByName
170232 .map(c => c._1 -> (tasksWithMetadataOptions.getOrElse(c._1, Map ()) ++ c._2))
233+
234+ // Merge connection configuration into task step options
235+ // This ensures that step.options contains all necessary connection details like 'format'
236+ _tasks = yamlTasks.map(task => {
237+ val dataSourceName = yamlPlan.tasks.find(ts => ts.name.equalsIgnoreCase(task.name)).get.dataSourceName
238+ val connectionConfig = updatedConnectionConfig.getOrElse(dataSourceName, Map ())
239+
240+ // Merge connection config into each step's options (connection config as base, step options override)
241+ val stepsWithConnectionConfig = task.steps.map(step => {
242+ step.copy(options = connectionConfig ++ step.options)
243+ })
244+
245+ task.copy(steps = stepsWithConnectionConfig)
246+ })
247+
171248 _configuration = dataCatererConfiguration.copy(connectionConfigByName = updatedConnectionConfig)
172249}
0 commit comments