Skip to content

Commit 4e3aeb7

Browse files
committed
Enhance YAML support by adding new YamlBuilder and related functionality for task and plan processing. Update constants for YAML integration, improve metadata handling, and introduce new test cases for YAML data sources. Add sample YAML configurations for end-to-end testing.
1 parent 0b5fd84 commit 4e3aeb7

78 files changed

Lines changed: 5422 additions & 784 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ app/src/test/resources/sample/documentation
3232
app/src/test/resources/sample/java
3333
app/src/test/resources/sample/report
3434
app/src/test/resources/sample/plan-gen
35+
app/src/test/resources/sample/e2e
3536

3637
api/out
3738
api/src/test/resources/sample/documentation

api/src/main/scala/io/github/datacatering/datacaterer/api/MetadataSourceBuilder.scala

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package io.github.datacatering.datacaterer.api
22

33
import com.softwaremill.quicklens.ModifyPimp
44
import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
5-
import io.github.datacatering.datacaterer.api.model.Constants.{CONFLUENT_SCHEMA_REGISTRY_ID, CONFLUENT_SCHEMA_REGISTRY_SUBJECT, CONFLUENT_SCHEMA_REGISTRY_VERSION, DATA_CONTRACT_FILE, DATA_CONTRACT_SCHEMA, GREAT_EXPECTATIONS_FILE, JSON_SCHEMA_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION}
6-
import io.github.datacatering.datacaterer.api.model.{ConfluentSchemaRegistrySource, DataContractCliSource, GreatExpectationsSource, JsonSchemaSource, MarquezMetadataSource, MetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource}
5+
import io.github.datacatering.datacaterer.api.model.Constants.{CONFLUENT_SCHEMA_REGISTRY_ID, CONFLUENT_SCHEMA_REGISTRY_SUBJECT, CONFLUENT_SCHEMA_REGISTRY_VERSION, DATA_CONTRACT_FILE, DATA_CONTRACT_SCHEMA, GREAT_EXPECTATIONS_FILE, JSON_SCHEMA_FILE, METADATA_SOURCE_URL, OPEN_LINEAGE_DATASET, OPEN_LINEAGE_NAMESPACE, OPEN_METADATA_API_VERSION, OPEN_METADATA_AUTH_TYPE, OPEN_METADATA_AUTH_TYPE_OPEN_METADATA, OPEN_METADATA_DEFAULT_API_VERSION, OPEN_METADATA_HOST, OPEN_METADATA_JWT_TOKEN, SCHEMA_LOCATION, YAML_PLAN_FILE, YAML_STEP_NAME, YAML_TASK_FILE, YAML_TASK_NAME}
6+
import io.github.datacatering.datacaterer.api.model.{ConfluentSchemaRegistrySource, DataContractCliSource, GreatExpectationsSource, JsonSchemaSource, MarquezMetadataSource, MetadataSource, OpenAPISource, OpenDataContractStandardSource, OpenMetadataSource, YamlPlanSource, YamlTaskSource}
77

88
case class MetadataSourceBuilder(metadataSource: MetadataSource = MarquezMetadataSource()) {
99
def this() = this(MarquezMetadataSource())
@@ -138,4 +138,56 @@ case class MetadataSourceBuilder(metadataSource: MetadataSource = MarquezMetadat
138138

139139
def jsonSchemaJava(schemaFile: String, options: java.util.Map[String, String]): MetadataSourceBuilder =
140140
jsonSchema(schemaFile, toScalaMap(options))
141+
142+
/**
143+
* Create metadata source from YAML plan file. This allows referencing existing YAML plan files
144+
* and optionally override configurations using the builder pattern.
145+
*
146+
* @param planFile Path to the YAML plan file
147+
* @return MetadataSourceBuilder with YamlPlanSource
148+
*/
149+
def yamlPlan(planFile: String): MetadataSourceBuilder = {
150+
this.modify(_.metadataSource).setTo(YamlPlanSource(Map(YAML_PLAN_FILE -> planFile)))
151+
}
152+
153+
/**
154+
* Create metadata source from YAML task file. This allows referencing existing YAML task files
155+
* and optionally override configurations using the builder pattern.
156+
*
157+
* @param taskFile Path to the YAML task file
158+
* @return MetadataSourceBuilder with YamlTaskSource
159+
*/
160+
def yamlTask(taskFile: String): MetadataSourceBuilder = {
161+
this.modify(_.metadataSource).setTo(YamlTaskSource(Map(YAML_TASK_FILE -> taskFile)))
162+
}
163+
164+
/**
165+
* Create metadata source from YAML task file with specific task name filter.
166+
*
167+
* @param taskFile Path to the YAML task file
168+
* @param taskName Name of the specific task to use from the YAML file
169+
* @return MetadataSourceBuilder with YamlTaskSource
170+
*/
171+
def yamlTask(taskFile: String, taskName: String): MetadataSourceBuilder = {
172+
this.modify(_.metadataSource).setTo(YamlTaskSource(Map(
173+
YAML_TASK_FILE -> taskFile,
174+
YAML_TASK_NAME -> taskName
175+
)))
176+
}
177+
178+
/**
179+
* Create metadata source from YAML task file with specific task and step name filters.
180+
*
181+
* @param taskFile Path to the YAML task file
182+
* @param taskName Name of the specific task to use from the YAML file
183+
* @param stepName Name of the specific step to use from the task
184+
* @return MetadataSourceBuilder with YamlTaskSource
185+
*/
186+
def yamlTask(taskFile: String, taskName: String, stepName: String): MetadataSourceBuilder = {
187+
this.modify(_.metadataSource).setTo(YamlTaskSource(Map(
188+
YAML_TASK_FILE -> taskFile,
189+
YAML_TASK_NAME -> taskName,
190+
YAML_STEP_NAME -> stepName
191+
)))
192+
}
141193
}

api/src/main/scala/io/github/datacatering/datacaterer/api/PlanBuilder.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import com.softwaremill.quicklens.ModifyPimp
44
import io.github.datacatering.datacaterer.api
55
import io.github.datacatering.datacaterer.api.connection.ConnectionTaskBuilder
66
import io.github.datacatering.datacaterer.api.converter.Converters.toScalaList
7-
import io.github.datacatering.datacaterer.api.model.Constants.{ENABLE_REFERENCE_MODE, METADATA_SOURCE_TYPE}
8-
import io.github.datacatering.datacaterer.api.model.{ForeignKeyRelation, Plan}
7+
import io.github.datacatering.datacaterer.api.model.Constants.{ENABLE_REFERENCE_MODE, METADATA_SOURCE_TYPE, YAML_PLAN}
8+
import io.github.datacatering.datacaterer.api.model.{ForeignKeyRelation, Plan, YamlPlanSource}
99

1010
import scala.annotation.varargs
1111

@@ -146,4 +146,20 @@ case class PlanBuilder(plan: Plan = Plan(), tasks: List[TasksBuilder] = List())
146146
}
147147
})
148148
}
149+
150+
/**
151+
* Create a PlanBuilder that loads base configuration from a YAML file.
152+
* This allows referencing existing YAML plan definitions while still being able to override
153+
* specific configurations using the builder pattern.
154+
*
155+
* @param yamlConfig Configuration specifying which YAML file to load
156+
* @return PlanBuilder with YAML plan as base configuration
157+
*/
158+
def fromYaml(yamlConfig: YamlConfig): PlanBuilder = {
159+
// Add YAML source as metadata source type for later processing
160+
val yamlSource = YamlPlanSource(yamlConfig.toOptionsMap + (METADATA_SOURCE_TYPE -> YAML_PLAN))
161+
// Return current builder with special marker to indicate YAML loading
162+
// The actual YAML loading will happen during execution when the plan is processed
163+
this.modify(_.plan.description).setTo(s"${plan.description} [YAML: ${yamlConfig.planFile.getOrElse("unknown")}]")
164+
}
149165
}

api/src/main/scala/io/github/datacatering/datacaterer/api/PlanRun.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,14 @@ trait PlanRun {
182182
*/
183183
def metadataSource: MetadataSourceBuilder = MetadataSourceBuilder()
184184

185+
/**
186+
* Create new YAML builder for referencing existing YAML plan and task files.
187+
* This allows loading existing YAML configurations and overriding specific parts using the programmatic API.
188+
*
189+
* @return YamlBuilder
190+
*/
191+
def yaml: YamlBuilder = YamlBuilder()
192+
185193
/**
186194
* Create new CSV generation step with configurations
187195
*

api/src/main/scala/io/github/datacatering/datacaterer/api/TaskBuilder.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,20 @@ case class TaskBuilder(task: Task = Task()) {
145145
* @return the updated `TaskBuilder` instance
146146
*/
147147
@varargs def steps(steps: StepBuilder*): TaskBuilder = this.modify(_.task.steps)(_ ++ steps.map(_.step))
148+
149+
/**
150+
* Create a TaskBuilder that loads base configuration from a YAML file.
151+
* This allows referencing existing YAML task definitions while still being able to override
152+
* specific configurations using the builder pattern.
153+
*
154+
* @param yamlConfig Configuration specifying which YAML file to load
155+
* @return TaskBuilder with YAML task as base configuration
156+
*/
157+
def fromYaml(yamlConfig: YamlConfig): TaskBuilder = {
158+
// Add special marker to indicate this task should load from YAML
159+
// The actual YAML loading will happen during execution when the task is processed
160+
this.modify(_.task.name).setTo(s"${task.name}_yaml_${yamlConfig.taskFile.getOrElse("unknown").hashCode}")
161+
}
148162
}
149163

150164
case class StepBuilder(step: Step = Step(), optValidation: Option[DataSourceValidationBuilder] = None) {
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package io.github.datacatering.datacaterer.api
2+
3+
import com.softwaremill.quicklens.ModifyPimp
4+
import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
5+
import io.github.datacatering.datacaterer.api.model.Constants.{YAML_PLAN_FILE, YAML_STEP_NAME, YAML_TASK_FILE, YAML_TASK_NAME}
6+
import io.github.datacatering.datacaterer.api.model.{Plan, Task}
7+
8+
/**
9+
* Builds configurations by loading existing YAML plan or task files and allowing override of specific configurations.
10+
* This enables users to reference existing YAML definitions while still being able to customize specific aspects
11+
* using the programmatic API.
12+
*
13+
* @param yamlConfig Configuration for YAML file loading
14+
*/
15+
case class YamlBuilder(yamlConfig: YamlConfig = YamlConfig()) {
16+
def this() = this(YamlConfig())
17+
18+
/**
19+
* Load from a YAML plan file. This creates a plan builder that references the existing YAML plan
20+
* and allows overriding specific configurations.
21+
*
22+
* @param planFile Path to the YAML plan file
23+
* @return PlanBuilder with YAML plan as base
24+
*/
25+
def plan(planFile: String): PlanBuilder = {
26+
val updatedConfig = this.modify(_.yamlConfig.planFile).setTo(Some(planFile))
27+
PlanBuilder().fromYaml(updatedConfig.yamlConfig)
28+
}
29+
30+
/**
31+
* Load from a YAML task file. This creates a task builder that references the existing YAML task
32+
* and allows overriding specific configurations.
33+
*
34+
* @param taskFile Path to the YAML task file
35+
* @return TaskBuilder with YAML task as base
36+
*/
37+
def task(taskFile: String): TaskBuilder = {
38+
val updatedConfig = this.modify(_.yamlConfig.taskFile).setTo(Some(taskFile))
39+
TaskBuilder().fromYaml(updatedConfig.yamlConfig)
40+
}
41+
42+
/**
43+
* Load from a YAML task file with specific task name filter.
44+
*
45+
* @param taskFile Path to the YAML task file
46+
* @param taskName Name of the specific task to use from the YAML file
47+
* @return TaskBuilder with filtered YAML task as base
48+
*/
49+
def task(taskFile: String, taskName: String): TaskBuilder = {
50+
val updatedConfig = this.modify(_.yamlConfig.taskFile).setTo(Some(taskFile))
51+
.modify(_.yamlConfig.taskName).setTo(Some(taskName))
52+
TaskBuilder().fromYaml(updatedConfig.yamlConfig)
53+
}
54+
55+
/**
56+
* Load from a YAML task file with specific task and step name filters.
57+
*
58+
* @param taskFile Path to the YAML task file
59+
* @param taskName Name of the specific task to use from the YAML file
60+
* @param stepName Name of the specific step to use from the task
61+
* @return TaskBuilder with filtered YAML task as base
62+
*/
63+
def task(taskFile: String, taskName: String, stepName: String): TaskBuilder = {
64+
val updatedConfig = this.modify(_.yamlConfig.taskFile).setTo(Some(taskFile))
65+
.modify(_.yamlConfig.taskName).setTo(Some(taskName))
66+
.modify(_.yamlConfig.stepName).setTo(Some(stepName))
67+
TaskBuilder().fromYaml(updatedConfig.yamlConfig)
68+
}
69+
70+
/**
71+
* Java API - Load from a YAML task file with specific task name filter.
72+
*/
73+
def taskJava(taskFile: String, taskName: String): TaskBuilder = task(taskFile, taskName)
74+
75+
/**
76+
* Java API - Load from a YAML task file with specific task and step name filters.
77+
*/
78+
def taskJava(taskFile: String, taskName: String, stepName: String): TaskBuilder =
79+
task(taskFile, taskName, stepName)
80+
}
81+
82+
/**
83+
* Configuration for YAML file loading
84+
*
85+
* @param planFile Optional path to YAML plan file
86+
* @param taskFile Optional path to YAML task file
87+
* @param taskName Optional task name filter
88+
* @param stepName Optional step name filter
89+
*/
90+
case class YamlConfig(
91+
planFile: Option[String] = None,
92+
taskFile: Option[String] = None,
93+
taskName: Option[String] = None,
94+
stepName: Option[String] = None
95+
) {
96+
def this() = this(None, None, None, None)
97+
98+
/**
99+
* Convert to options map for metadata source usage
100+
*/
101+
def toOptionsMap: Map[String, String] = {
102+
val baseMap = Map.empty[String, String]
103+
val withPlan = planFile.fold(baseMap)(pf => baseMap + (YAML_PLAN_FILE -> pf))
104+
val withTask = taskFile.fold(withPlan)(tf => withPlan + (YAML_TASK_FILE -> tf))
105+
val withTaskName = taskName.fold(withTask)(tn => withTask + (YAML_TASK_NAME -> tn))
106+
val withStepName = stepName.fold(withTaskName)(sn => withTaskName + (YAML_STEP_NAME -> sn))
107+
withStepName
108+
}
109+
}

api/src/main/scala/io/github/datacatering/datacaterer/api/model/Constants.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ object Constants {
6060
lazy val GREAT_EXPECTATIONS_FILE = "expectationsFile"
6161
lazy val DATA_CONTRACT_FILE = "dataContractFile"
6262
lazy val DATA_CONTRACT_SCHEMA = "dataContractSchema"
63+
lazy val YAML_PLAN_FILE = "yamlPlanFile"
64+
lazy val YAML_TASK_FILE = "yamlTaskFile"
65+
lazy val YAML_TASK_NAME = "yamlTaskName"
66+
lazy val YAML_STEP_NAME = "yamlStepName"
6367
lazy val ROWS_PER_SECOND = "rowsPerSecond"
6468
lazy val UNWRAP_TOP_LEVEL_ARRAY = "unwrapTopLevelArray"
6569
lazy val HUDI_TABLE_NAME = "hoodie.table.name"
@@ -393,6 +397,8 @@ object Constants {
393397
lazy val DATAHUB = "datahub"
394398
lazy val CONFLUENT_SCHEMA_REGISTRY = "confluentSchemaRegistry"
395399
lazy val JSON_SCHEMA = "jsonSchema"
400+
lazy val YAML_PLAN = "yamlPlan"
401+
lazy val YAML_TASK = "yamlTask"
396402
lazy val DEFAULT_METADATA_SOURCE_NAME = "defaultMetadataSource"
397403

398404
//alert source

api/src/main/scala/io/github/datacatering/datacaterer/api/model/MetadataSourceModels.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.github.datacatering.datacaterer.api.model
22

3-
import io.github.datacatering.datacaterer.api.model.Constants.{CONFLUENT_SCHEMA_REGISTRY, DATA_CONTRACT_CLI, GREAT_EXPECTATIONS, JSON_SCHEMA, MARQUEZ, METADATA_SOURCE_HAS_OPEN_LINEAGE_SUPPORT, METADATA_SOURCE_TYPE, OPEN_API, OPEN_DATA_CONTRACT_STANDARD, OPEN_METADATA}
3+
import io.github.datacatering.datacaterer.api.model.Constants.{CONFLUENT_SCHEMA_REGISTRY, DATA_CONTRACT_CLI, GREAT_EXPECTATIONS, JSON_SCHEMA, MARQUEZ, METADATA_SOURCE_HAS_OPEN_LINEAGE_SUPPORT, METADATA_SOURCE_TYPE, OPEN_API, OPEN_DATA_CONTRACT_STANDARD, OPEN_METADATA, YAML_PLAN, YAML_TASK}
44

55
trait MetadataSource {
66

@@ -61,3 +61,15 @@ case class JsonSchemaSource(override val connectionOptions: Map[String, String]
6161
override val `type`: String = JSON_SCHEMA
6262

6363
}
64+
65+
case class YamlPlanSource(override val connectionOptions: Map[String, String] = Map()) extends MetadataSource {
66+
67+
override val `type`: String = YAML_PLAN
68+
69+
}
70+
71+
case class YamlTaskSource(override val connectionOptions: Map[String, String] = Map()) extends MetadataSource {
72+
73+
override val `type`: String = YAML_TASK
74+
75+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package io.github.datacatering.datacaterer.api
2+
3+
import io.github.datacatering.datacaterer.api.model.Constants.{YAML_PLAN_FILE, YAML_STEP_NAME, YAML_TASK_FILE, YAML_TASK_NAME}
4+
import org.scalatest.funsuite.AnyFunSuite
5+
6+
class YamlBuilderTest extends AnyFunSuite {
7+
8+
test("Can create YamlBuilder") {
9+
val yamlBuilder = YamlBuilder()
10+
11+
assert(yamlBuilder.yamlConfig.planFile.isEmpty)
12+
assert(yamlBuilder.yamlConfig.taskFile.isEmpty)
13+
assert(yamlBuilder.yamlConfig.taskName.isEmpty)
14+
assert(yamlBuilder.yamlConfig.stepName.isEmpty)
15+
}
16+
17+
test("Can create plan builder from YAML file") {
18+
val yamlBuilder = YamlBuilder()
19+
val planBuilder = yamlBuilder.plan("/path/to/plan.yaml")
20+
21+
assert(planBuilder.plan.description.contains("YAML: /path/to/plan.yaml"))
22+
}
23+
24+
test("Can create task builder from YAML file") {
25+
val yamlBuilder = YamlBuilder()
26+
val taskBuilder = yamlBuilder.task("/path/to/task.yaml")
27+
28+
assert(taskBuilder.task.name.contains("yaml_"))
29+
}
30+
31+
test("Can create task builder from YAML file with task name filter") {
32+
val yamlBuilder = YamlBuilder()
33+
val taskBuilder = yamlBuilder.task("/path/to/task.yaml", "myTask")
34+
35+
assert(taskBuilder.task.name.contains("yaml_"))
36+
}
37+
38+
test("Can create task builder from YAML file with task and step name filters") {
39+
val yamlBuilder = YamlBuilder()
40+
val taskBuilder = yamlBuilder.task("/path/to/task.yaml", "myTask", "myStep")
41+
42+
assert(taskBuilder.task.name.contains("yaml_"))
43+
}
44+
45+
test("YamlConfig can convert to options map") {
46+
val yamlConfig = YamlConfig(
47+
planFile = Some("/path/to/plan.yaml"),
48+
taskFile = Some("/path/to/task.yaml"),
49+
taskName = Some("myTask"),
50+
stepName = Some("myStep")
51+
)
52+
53+
val options = yamlConfig.toOptionsMap
54+
55+
assert(options(YAML_PLAN_FILE) == "/path/to/plan.yaml")
56+
assert(options(YAML_TASK_FILE) == "/path/to/task.yaml")
57+
assert(options(YAML_TASK_NAME) == "myTask")
58+
assert(options(YAML_STEP_NAME) == "myStep")
59+
}
60+
61+
test("YamlConfig handles empty values correctly") {
62+
val yamlConfig = YamlConfig()
63+
val options = yamlConfig.toOptionsMap
64+
65+
assert(options.isEmpty)
66+
}
67+
}

0 commit comments

Comments
 (0)