@@ -11,6 +11,7 @@ import io.github.datacatering.datacaterer.core.parser.PlanParser
1111import io .github .datacatering .datacaterer .core .plan .{PlanProcessor , YamlPlanRun }
1212import io .github .datacatering .datacaterer .core .ui .config .UiConfiguration .INSTALL_DIRECTORY
1313import io .github .datacatering .datacaterer .core .ui .mapper .ConfigurationMapper .configurationMapping
14+ import io .github .datacatering .datacaterer .core .ui .mapper .StepOptionsMapper
1415import io .github .datacatering .datacaterer .core .ui .model .{ConfigurationRequest , Connection , EnhancedPlanRunRequest , PlanRunExecution , PlanRunRequest , PlanRunRequests , SampleResponse , SchemaSampleRequest }
1516import io .github .datacatering .datacaterer .core .ui .plan .PlanResponseHandler .{KO , OK , Response }
1617import io .github .datacatering .datacaterer .core .ui .resource .SparkSessionManager
@@ -195,7 +196,7 @@ object PlanRepository {
195196 .map(t => t.name -> t.dataSourceName)
196197 .toMap
197198
198- val dataSourceConnectionInfo = getConnectionDetails(taskToDataSourceMap)
199+ val dataSourceConnectionInfo = getConnectionDetails(taskToDataSourceMap, baseDirectory )
199200 .map(c => {
200201 val additionalConfig = c.`type` match {
201202 case POSTGRES => Map (FORMAT -> JDBC , DRIVER -> POSTGRES_DRIVER )
@@ -207,8 +208,8 @@ object PlanRepository {
207208 .toMap
208209
209210 // find tasks and validation using data source connection
210- val updatedValidation = validationWithConnectionInfo(parsedRequest, dataSourceConnectionInfo)
211- val updatedTasks = tasksWithConnectionInfo(parsedRequest, taskToDataSourceMap, dataSourceConnectionInfo)
211+ val updatedValidation = validationWithConnectionInfo(parsedRequest, dataSourceConnectionInfo, baseDirectory )
212+ val updatedTasks = tasksWithConnectionInfo(parsedRequest, taskToDataSourceMap, dataSourceConnectionInfo, baseDirectory )
212213 val updatedConfiguration = parsedRequest.configuration
213214 .map(c => configurationMapping(c, baseDirectory))
214215 .getOrElse(DataCatererConfigurationBuilder ())
@@ -218,14 +219,15 @@ object PlanRepository {
218219
219220 private def validationWithConnectionInfo (
220221 parsedRequest : PlanRunRequest ,
221- dataSourceConnectionInfo : Map [String , Map [String , String ]]
222+ dataSourceConnectionInfo : Map [String , Map [String , String ]],
223+ baseDirectory : String
222224 ): List [ValidationConfiguration ] = {
223225 parsedRequest.validation.map(yamlV => {
224226 val updatedDataSources = yamlV.dataSources.map(ds => {
225227 val dataSourceName = ds._1
226228 val connectionInfo = dataSourceConnectionInfo(dataSourceName)
227229 val updatedValidationOptions = ds._2.map(yamlDs => {
228- val metadataOpts = getMetadataSourceInfo(dataSourceConnectionInfo, yamlDs.options)
230+ val metadataOpts = getMetadataSourceInfo(dataSourceConnectionInfo, yamlDs.options, baseDirectory )
229231 val allOpts = yamlDs.options ++ connectionInfo ++ metadataOpts
230232 val listValidationBuilders = yamlDs.validations.map {
231233 case yamlUpstreamDataSourceValidation : YamlUpstreamDataSourceValidation =>
@@ -245,7 +247,8 @@ object PlanRepository {
245247 private def tasksWithConnectionInfo (
246248 parsedRequest : PlanRunRequest ,
247249 taskToDataSourceMap : Map [String , String ],
248- dataSourceConnectionInfo : Map [String , Map [String , String ]]
250+ dataSourceConnectionInfo : Map [String , Map [String , String ]],
251+ baseDirectory : String
249252 ): List [Task ] = {
250253 val updatedTasks = parsedRequest.tasks.map(s => {
251254 val taskName = s.name
@@ -254,23 +257,29 @@ object PlanRepository {
254257 }
255258 val dataSourceName = taskToDataSourceMap(taskName)
256259 val connectionInfo = dataSourceConnectionInfo(dataSourceName)
257- val metadataOpts = getMetadataSourceInfo(dataSourceConnectionInfo, s.options)
258- val updatedStep = s.copy(options = s.options ++ connectionInfo ++ metadataOpts)
260+ val metadataOpts = getMetadataSourceInfo(dataSourceConnectionInfo, s.options, baseDirectory)
261+
262+ // Map UI options to data source format using StepOptionsMapper
263+ val format = connectionInfo.getOrElse(FORMAT , " " )
264+
265+ val baseOptions = s.options ++ connectionInfo ++ metadataOpts
266+ val mappedOptions = StepOptionsMapper .mapStepOptions(baseOptions, format)
267+ val updatedStep = s.copy(options = mappedOptions)
259268 Task (taskName, List (updatedStep))
260269 })
261270 updatedTasks
262271 }
263272
264- private def getMetadataSourceInfo (dataSourceConnectionInfo : Map [String , Map [String , String ]], options : Map [String , String ]): Map [String , String ] = {
273+ private def getMetadataSourceInfo (dataSourceConnectionInfo : Map [String , Map [String , String ]], options : Map [String , String ], baseDirectory : String ): Map [String , String ] = {
265274 if (options.contains(METADATA_SOURCE_NAME )) {
266- ConnectionService .getMetadataSourceInfo(options(METADATA_SOURCE_NAME ), dataSourceConnectionInfo)
275+ ConnectionService .getMetadataSourceInfo(options(METADATA_SOURCE_NAME ), dataSourceConnectionInfo, baseDirectory )
267276 } else {
268277 Map ()
269278 }
270279 }
271280
272- private def getConnectionDetails (taskToDataSourceMap : Map [String , String ]): List [Connection ] = {
273- ConnectionService .getConnections(taskToDataSourceMap.values.toList, masking = false )
281+ private def getConnectionDetails (taskToDataSourceMap : Map [String , String ], baseDirectory : String ): List [Connection ] = {
282+ ConnectionService .getConnections(taskToDataSourceMap.values.toList, masking = false , baseDirectory )
274283 }
275284
276285 private def savePlanRunExecution (
0 commit comments