|
1 | 1 | package io.github.datacatering.datacaterer.core.plan |
2 | 2 |
|
3 | 3 | import io.github.datacatering.datacaterer.api.PlanRun |
4 | | -import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CATERER_INTERFACE_JAVA, DATA_CATERER_INTERFACE_SCALA, DATA_CATERER_INTERFACE_YAML, PLAN_CLASS, PLAN_STAGE_EXTRACT_METADATA, PLAN_STAGE_PARSE_PLAN} |
| 4 | +import io.github.datacatering.datacaterer.api.model.Constants.{DATA_CATERER_INTERFACE_JAVA, DATA_CATERER_INTERFACE_SCALA, DATA_CATERER_INTERFACE_YAML, DEFAULT_STEP_TYPE, FORMAT, PLAN_CLASS, PLAN_STAGE_EXTRACT_METADATA, PLAN_STAGE_PARSE_PLAN} |
5 | 5 | import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Plan, Task, ValidationConfiguration} |
6 | 6 | import io.github.datacatering.datacaterer.core.activity.{PlanRunPostPlanProcessor, PlanRunPrePlanProcessor} |
7 | 7 | import io.github.datacatering.datacaterer.core.config.ConfigParser |
@@ -118,7 +118,7 @@ object PlanProcessor { |
118 | 118 | basePlan, baseTasks, baseValidations, dataCatererConfiguration, resolvedInterface |
119 | 119 | ) |
120 | 120 |
|
121 | | - LOGGER.info(s"After pre-processors: num-tasks=${finalTasks.size}") |
| 121 | + LOGGER.info(s"After pre-processors: num-tasks=${finalTasks.size}, task-names=${finalTasks.map(_.name).mkString(", ")}") |
122 | 122 |
|
123 | 123 | // Step 4: Generate data with the final modified plan/tasks |
124 | 124 | val dataGeneratorProcessor = new DataGeneratorProcessor(dataCatererConfiguration) |
@@ -358,7 +358,13 @@ class YamlPlanRun( |
358 | 358 |
|
359 | 359 | // Merge connection config into each step's options (connection config as base, step options override) |
360 | 360 | val stepsWithConnectionConfig = task.steps.map(step => { |
361 | | - step.copy(options = connectionConfig ++ step.options) |
| 361 | + val mergedOptions = connectionConfig ++ step.options |
| 362 | + val optionsWithFormat = if (!mergedOptions.contains(FORMAT) && step.`type` != DEFAULT_STEP_TYPE) { |
| 363 | + mergedOptions + (FORMAT -> step.`type`) |
| 364 | + } else { |
| 365 | + mergedOptions |
| 366 | + } |
| 367 | + step.copy(options = optionsWithFormat) |
362 | 368 | }) |
363 | 369 |
|
364 | 370 | task.copy(steps = stepsWithConnectionConfig) |
@@ -386,7 +392,13 @@ class UnifiedPlanRun( |
386 | 392 | val connectionConfig = dataCatererConfig.connectionConfigByName.getOrElse(dataSourceName, Map()) |
387 | 393 |
|
388 | 394 | val stepsWithConnectionConfig = task.steps.map(step => { |
389 | | - step.copy(options = connectionConfig ++ step.options) |
| 395 | + val mergedOptions = connectionConfig ++ step.options |
| 396 | + val optionsWithFormat = if (!mergedOptions.contains(FORMAT) && step.`type` != DEFAULT_STEP_TYPE) { |
| 397 | + mergedOptions + (FORMAT -> step.`type`) |
| 398 | + } else { |
| 399 | + mergedOptions |
| 400 | + } |
| 401 | + step.copy(options = optionsWithFormat) |
390 | 402 | }) |
391 | 403 |
|
392 | 404 | task.copy(steps = stepsWithConnectionConfig) |
|
0 commit comments