Skip to content

Commit b16ea54

Browse files
committed
Add in sample data generator API, add in API doc, add in gradle task for running UI and job
1 parent 8bbcd4b commit b16ea54

19 files changed

Lines changed: 2630 additions & 33 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ app/src/test/resources/sample/plan-gen
3535

3636
api/out
3737
api/src/test/resources/sample/documentation
38+
api_validation_report.txt
3839

3940
# UI
4041
ui/node_modules

app/build.gradle.kts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,61 @@ application {
260260
mainClass.set("io.github.datacatering.datacaterer.App")
261261
}
262262

263+
// Custom run tasks for different modes
264+
tasks.register<JavaExec>("runUI") {
265+
group = "application"
266+
description = "Run Data Caterer UI (standalone mode)"
267+
classpath = sourceSets.main.get().runtimeClasspath
268+
mainClass.set("io.github.datacatering.datacaterer.core.ui.DataCatererUI")
269+
270+
// Add JVM arguments similar to docker script
271+
jvmArgs(
272+
"-Djava.security.manager=allow",
273+
"-Djdk.module.illegalAccess=deny",
274+
"--add-opens=java.base/java.lang=ALL-UNNAMED",
275+
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
276+
"--add-opens=java.base/java.lang.reflect=ALL-UNNAMED",
277+
"--add-opens=java.base/java.io=ALL-UNNAMED",
278+
"--add-opens=java.base/java.net=ALL-UNNAMED",
279+
"--add-opens=java.base/java.nio=ALL-UNNAMED",
280+
"--add-opens=java.base/java.util=ALL-UNNAMED",
281+
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED",
282+
"--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED",
283+
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
284+
"--add-opens=java.base/sun.nio.cs=ALL-UNNAMED",
285+
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
286+
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
287+
"--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
288+
)
289+
}
290+
291+
tasks.register<JavaExec>("runSpark") {
292+
group = "application"
293+
description = "Run Data Caterer as Spark job"
294+
classpath = sourceSets.main.get().runtimeClasspath
295+
mainClass.set("io.github.datacatering.datacaterer.App")
296+
297+
// Add JVM arguments similar to docker script
298+
jvmArgs(
299+
"-Djava.security.manager=allow",
300+
"-Djdk.module.illegalAccess=deny",
301+
"--add-opens=java.base/java.lang=ALL-UNNAMED",
302+
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
303+
"--add-opens=java.base/java.lang.reflect=ALL-UNNAMED",
304+
"--add-opens=java.base/java.io=ALL-UNNAMED",
305+
"--add-opens=java.base/java.net=ALL-UNNAMED",
306+
"--add-opens=java.base/java.nio=ALL-UNNAMED",
307+
"--add-opens=java.base/java.util=ALL-UNNAMED",
308+
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED",
309+
"--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED",
310+
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
311+
"--add-opens=java.base/sun.nio.cs=ALL-UNNAMED",
312+
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
313+
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
314+
"--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
315+
)
316+
}
317+
263318
sourceSets {
264319
test {
265320
resources {

app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,18 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String
193193
val dataSourceResults = (1 to numBatches).flatMap(batch => {
194194
val startTime = LocalDateTime.now()
195195
LOGGER.info(s"Starting batch, batch=$batch, num-batches=$numBatches")
196-
val generatedDataForeachTask = executableTasks.flatMap(task =>
197-
task._2.steps.filter(_.enabled).map(s => generateDataForStep(batch, task, s))
198-
)
196+
val generatedDataForeachTask = executableTasks.flatMap(task => {
197+
task._2.steps.filter(_.enabled).map(s => {
198+
LOGGER.debug(s"Generating data for step, task-name=${task._1.name}, step-name=${s.name}, data-source-name=${task._1.dataSourceName}")
199+
try {
200+
generateDataForStep(batch, task, s)
201+
} catch {
202+
case ex: Exception =>
203+
LOGGER.error(s"Failed to generate data for step, task-name=${task._1.name}, step-name=${s.name}, data-source-name=${task._1.dataSourceName}")
204+
throw ex
205+
}
206+
})
207+
})
199208

200209
val sinkDf = plan.sinkOptions
201210
.map(_ => ForeignKeyUtil.getDataFramesWithForeignKeys(plan, generatedDataForeachTask))

app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class SinkFactory(
3737
def pushToSink(df: DataFrame, dataSourceName: String, step: Step, startTime: LocalDateTime): SinkResult = {
3838
val dfWithoutOmitFields = removeOmitFields(df)
3939
val saveMode = step.options.get(SAVE_MODE).map(_.toLowerCase.capitalize).map(SaveMode.valueOf).getOrElse(SaveMode.Append)
40-
val format = step.options(FORMAT)
40+
val format = step.options.getOrElse(FORMAT, throw new IllegalArgumentException(s"No format specified for data source: $dataSourceName, step: ${step.name}. Available options: ${step.options.keys.mkString(", ")}"))
4141
val enrichedConnectionConfig = additionalConnectionConfig(format, step.options)
4242

4343
val count = if (flagsConfig.enableCount) {
@@ -89,12 +89,10 @@ class SinkFactory(
8989
// if format is iceberg, need to use dataframev2 api for partition and writing
9090
connectionConfig.filter(_._1.startsWith("spark.sql"))
9191
.foreach(conf => df.sqlContext.setConf(conf._1, conf._2))
92-
LOGGER.info(s"[DEBUG unwrap] Format is: '$format', JSON constant is: '$JSON'")
9392
val trySaveData = if (format == ICEBERG) {
9493
Try(tryPartitionAndSaveDfV2(df, saveMode, connectionConfig))
9594
} else if (format == JSON) {
9695
// Special-case: allow unwrapping top-level array to emit a bare JSON array file
97-
LOGGER.info(s"[DEBUG unwrap] Format is JSON, calling trySaveJsonPossiblyUnwrapped")
9896
val tryMaybeUnwrap = Try(trySaveJsonPossiblyUnwrapped(df, saveMode, connectionConfig))
9997
tryMaybeUnwrap
10098
} else {
@@ -119,7 +117,6 @@ class SinkFactory(
119117
}
120118

121119
private def trySaveJsonPossiblyUnwrapped(df: DataFrame, saveMode: SaveMode, connectionConfig: Map[String, String]): Unit = {
122-
LOGGER.info("[DEBUG unwrap] trySaveJsonPossiblyUnwrapped called")
123120
val shouldUnwrap = detectTopLevelArrayToUnwrap(df)
124121
shouldUnwrap match {
125122
case Some(arrayFieldName) =>
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package io.github.datacatering.datacaterer.core.ui.model
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
4+
import io.github.datacatering.datacaterer.api.model.Field
5+
import org.apache.spark.sql.types.{DataType, StructType}
6+
7+
@JsonIgnoreProperties(ignoreUnknown = true)
8+
case class TaskFileSampleRequest(
9+
taskYamlPath: String,
10+
stepName: Option[String] = None,
11+
sampleSize: Int = 10,
12+
fastMode: Boolean = true
13+
)
14+
15+
@JsonIgnoreProperties(ignoreUnknown = true)
16+
case class SchemaSampleRequest(
17+
fields: List[Field],
18+
sampleSize: Int = 10,
19+
fastMode: Boolean = true
20+
)
21+
22+
@JsonIgnoreProperties(ignoreUnknown = true)
23+
case class TaskYamlSampleRequest(
24+
taskYamlContent: String,
25+
stepName: Option[String] = None,
26+
sampleSize: Int = 10,
27+
fastMode: Boolean = true
28+
)
29+
30+
@JsonIgnoreProperties(ignoreUnknown = true)
31+
case class SampleResponse(
32+
success: Boolean,
33+
executionId: String,
34+
schema: Option[SchemaInfo] = None,
35+
sampleData: Option[List[Map[String, Any]]] = None,
36+
metadata: Option[SampleMetadata] = None,
37+
error: Option[SampleError] = None
38+
)
39+
40+
@JsonIgnoreProperties(ignoreUnknown = true)
41+
case class SampleError(
42+
code: String,
43+
message: String,
44+
details: Option[String] = None
45+
)
46+
47+
@JsonIgnoreProperties(ignoreUnknown = true)
48+
case class SampleMetadata(
49+
sampleSize: Int,
50+
actualRecords: Int,
51+
generatedInMs: Long,
52+
fastModeEnabled: Boolean
53+
)
54+
55+
@JsonIgnoreProperties(ignoreUnknown = true)
56+
case class SchemaInfo(
57+
fields: List[SchemaField]
58+
)
59+
60+
object SchemaInfo {
61+
def fromSparkSchema(schema: StructType): SchemaInfo = {
62+
SchemaInfo(schema.fields.map(SchemaField.fromSparkField).toList)
63+
}
64+
}
65+
66+
@JsonIgnoreProperties(ignoreUnknown = true)
67+
case class SchemaField(
68+
name: String,
69+
`type`: String,
70+
nullable: Boolean,
71+
fields: Option[List[SchemaField]] = None
72+
)
73+
74+
object SchemaField {
75+
def fromSparkField(field: org.apache.spark.sql.types.StructField): SchemaField = {
76+
val fieldType = field.dataType match {
77+
case st: StructType =>
78+
SchemaField(
79+
name = field.name,
80+
`type` = "struct",
81+
nullable = field.nullable,
82+
fields = Some(st.fields.map(fromSparkField).toList)
83+
)
84+
case other =>
85+
SchemaField(
86+
name = field.name,
87+
`type` = other.typeName,
88+
nullable = field.nullable
89+
)
90+
}
91+
fieldType
92+
}
93+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.github.datacatering.datacaterer.core.ui.model
2+
3+
import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil
4+
import org.apache.pekko.http.scaladsl.model.{ContentTypes, HttpEntity, MediaTypes}
5+
import org.apache.pekko.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
6+
7+
import scala.concurrent.{ExecutionContext, Future}
8+
import scala.concurrent.duration._
9+
import scala.util.{Failure, Success, Try}
10+
11+
object TaskYamlUnmarshaller {
12+
13+
/**
14+
* Custom unmarshaller that can handle both JSON and raw YAML content for TaskYamlSampleRequest.
15+
*
16+
* - For application/json: Deserializes the JSON directly to TaskYamlSampleRequest
17+
* - For text/plain or application/yaml: Treats content as raw YAML and creates TaskYamlSampleRequest
18+
* with the raw content, relying on query parameters for other fields
19+
*/
20+
implicit val taskYamlSampleRequestUnmarshaller: FromEntityUnmarshaller[TaskYamlSampleRequest] = {
21+
Unmarshaller.withMaterializer { implicit ec: ExecutionContext => implicit mat =>
22+
entity: HttpEntity =>
23+
entity.toStrict(3.seconds) flatMap { strictEntity => // Use 3 second timeout
24+
val contentType = entity.contentType
25+
val data = strictEntity.data.utf8String
26+
27+
Future {
28+
val mediaType = contentType.mediaType
29+
if (mediaType == MediaTypes.`application/json`) {
30+
// Use Jackson to deserialize JSON to TaskYamlSampleRequest
31+
Try {
32+
ObjectMapperUtil.jsonObjectMapper.readValue(data, classOf[TaskYamlSampleRequest])
33+
}
34+
} else if (mediaType == MediaTypes.`text/plain` || mediaType.toString.contains("yaml")) {
35+
// Treat as raw YAML content - create TaskYamlSampleRequest with raw content
36+
// Query parameters will be handled separately in the route
37+
Try {
38+
TaskYamlSampleRequest(
39+
taskYamlContent = data,
40+
stepName = None, // Will be overridden by query parameters
41+
sampleSize = 10, // Will be overridden by query parameters
42+
fastMode = true // Will be overridden by query parameters
43+
)
44+
}
45+
} else {
46+
Failure(new IllegalArgumentException(
47+
s"Unsupported content type: $mediaType. Supported: application/json, text/plain, application/yaml"))
48+
}
49+
} flatMap {
50+
case Success(result) => Future.successful(result)
51+
case Failure(ex) => Future.failed(ex)
52+
}
53+
}
54+
}
55+
}
56+
}

app/src/main/scala/io/github/datacatering/datacaterer/core/ui/plan/PlanRepository.scala

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import io.github.datacatering.datacaterer.core.parser.PlanParser
1010
import io.github.datacatering.datacaterer.core.plan.{PlanProcessor, YamlPlanRun}
1111
import io.github.datacatering.datacaterer.core.ui.config.UiConfiguration.INSTALL_DIRECTORY
1212
import io.github.datacatering.datacaterer.core.ui.mapper.ConfigurationMapper.configurationMapping
13-
import io.github.datacatering.datacaterer.core.ui.model.{Connection, PlanRunExecution, PlanRunRequest, PlanRunRequests}
13+
import io.github.datacatering.datacaterer.core.ui.model.{Connection, PlanRunExecution, PlanRunRequest, PlanRunRequests, SchemaSampleRequest, TaskFileSampleRequest, TaskYamlSampleRequest, SampleResponse}
14+
import io.github.datacatering.datacaterer.core.ui.sample.FastSampleGenerator
15+
import io.github.datacatering.datacaterer.api.model.Field
1416
import io.github.datacatering.datacaterer.core.ui.plan.PlanResponseHandler.{KO, OK, Response}
1517
import io.github.datacatering.datacaterer.core.util.{ObjectMapperUtil, SparkProvider}
1618
import org.apache.log4j.Logger
@@ -67,6 +69,12 @@ object PlanRepository {
6769

6870
final case class StartupSpark() extends PlanCommand
6971

72+
final case class GenerateFromTaskFile(request: TaskFileSampleRequest, replyTo: ActorRef[SampleResponse]) extends PlanCommand
73+
74+
final case class GenerateFromTaskYaml(request: TaskYamlSampleRequest, replyTo: ActorRef[SampleResponse]) extends PlanCommand
75+
76+
final case class GenerateFromSchema(request: SchemaSampleRequest, replyTo: ActorRef[SampleResponse]) extends PlanCommand
77+
7078
private val executionSaveFolder = s"$INSTALL_DIRECTORY/execution"
7179
private val planSaveFolder = s"$INSTALL_DIRECTORY/plan"
7280
implicit val ec: ExecutionContextExecutor = ExecutionContext.global
@@ -104,6 +112,15 @@ object PlanRepository {
104112
case StartupSpark() =>
105113
startupSpark()
106114
Behaviors.same
115+
case GenerateFromTaskFile(request, replyTo) =>
116+
replyTo ! generateFromTaskFile(request)
117+
Behaviors.same
118+
case GenerateFromTaskYaml(request, replyTo) =>
119+
replyTo ! generateFromTaskYaml(request)
120+
Behaviors.same
121+
case GenerateFromSchema(request, replyTo) =>
122+
replyTo ! generateFromSchema(request)
123+
Behaviors.same
107124
}
108125
}.onFailure(SupervisorStrategy.restart)
109126
}
@@ -393,13 +410,79 @@ object PlanRepository {
393410
}
394411
}
395412

413+
private def generateFromTaskFile(request: TaskFileSampleRequest): SampleResponse = {
414+
LOGGER.debug(s"Generating sample from task file: ${request.taskYamlPath}, step: ${request.stepName}")
415+
try {
416+
FastSampleGenerator.generateFromTaskFile(request)
417+
} catch {
418+
case ex: Throwable =>
419+
LOGGER.error(s"Error generating sample from task file", ex)
420+
SampleResponse(
421+
success = false,
422+
executionId = java.util.UUID.randomUUID().toString.split("-").head,
423+
error = Some(io.github.datacatering.datacaterer.core.ui.model.SampleError("INTERNAL_ERROR", ex.getMessage))
424+
)
425+
}
426+
}
427+
428+
private def generateFromTaskYaml(request: TaskYamlSampleRequest): SampleResponse = {
429+
LOGGER.debug(s"Generating sample from task YAML content, step: ${request.stepName}")
430+
try {
431+
FastSampleGenerator.generateFromTaskYaml(request)
432+
} catch {
433+
case ex: Throwable =>
434+
LOGGER.error(s"Error generating sample from task YAML", ex)
435+
SampleResponse(
436+
success = false,
437+
executionId = java.util.UUID.randomUUID().toString.split("-").head,
438+
error = Some(io.github.datacatering.datacaterer.core.ui.model.SampleError("INTERNAL_ERROR", ex.getMessage))
439+
)
440+
}
441+
}
442+
443+
private def generateFromSchema(request: SchemaSampleRequest): SampleResponse = {
444+
LOGGER.debug(s"Generating sample from inline fields: ${request.fields.size} fields")
445+
try {
446+
FastSampleGenerator.generateFromSchema(request)
447+
} catch {
448+
case ex: Throwable =>
449+
LOGGER.error(s"Error generating sample from schema", ex)
450+
SampleResponse(
451+
success = false,
452+
executionId = java.util.UUID.randomUUID().toString.split("-").head,
453+
error = Some(io.github.datacatering.datacaterer.core.ui.model.SampleError("INTERNAL_ERROR", ex.getMessage))
454+
)
455+
}
456+
}
457+
396458
private def startupSpark(): Response = {
397459
LOGGER.debug("Starting up Spark")
398460
setUiRunning
399461
try {
400462
implicit val sparkSession: SparkSession = new SparkProvider(DEFAULT_MASTER, DEFAULT_RUNTIME_CONFIG).getSparkSession
401463
//run some dummy query
402464
sparkSession.sql("SELECT 1").collect()
465+
466+
//warm up data generation pipeline with a simple sample request
467+
LOGGER.debug("Warming up data generation pipeline")
468+
val warmupRequest = SchemaSampleRequest(
469+
fields = List(
470+
Field(
471+
name = "warmup_id",
472+
`type` = Some("long"),
473+
options = Map("min" -> 1L, "max" -> 10L)
474+
)
475+
),
476+
sampleSize = 1,
477+
fastMode = true
478+
)
479+
val warmupResult = FastSampleGenerator.generateFromSchema(warmupRequest)
480+
if (warmupResult.success) {
481+
LOGGER.debug("Data generation pipeline warmed up successfully")
482+
} else {
483+
LOGGER.warn(s"Warmup failed: ${warmupResult.error}")
484+
}
485+
403486
OK
404487
} catch {
405488
case ex: Throwable => KO("Failed to start up Spark", ex)

0 commit comments

Comments
 (0)