Skip to content

Commit b286068

Browse files
authored
Merge pull request #104 from data-catering/feature/sample-api
Add in sample data generator API, add in API doc, add in gradle task …
2 parents 1e6b25f + 40fa866 commit b286068

19 files changed

Lines changed: 2634 additions & 31 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ app/src/test/resources/sample/plan-gen
3535

3636
api/out
3737
api/src/test/resources/sample/documentation
38+
api_validation_report.txt
3839

3940
# UI
4041
ui/node_modules

app/build.gradle.kts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,61 @@ application {
260260
mainClass.set("io.github.datacatering.datacaterer.App")
261261
}
262262

263+
// Custom run tasks for different modes
264+
tasks.register<JavaExec>("runUI") {
265+
group = "application"
266+
description = "Run Data Caterer UI (standalone mode)"
267+
classpath = sourceSets.main.get().runtimeClasspath
268+
mainClass.set("io.github.datacatering.datacaterer.core.ui.DataCatererUI")
269+
270+
// Add JVM arguments similar to docker script
271+
jvmArgs(
272+
"-Djava.security.manager=allow",
273+
"-Djdk.module.illegalAccess=deny",
274+
"--add-opens=java.base/java.lang=ALL-UNNAMED",
275+
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
276+
"--add-opens=java.base/java.lang.reflect=ALL-UNNAMED",
277+
"--add-opens=java.base/java.io=ALL-UNNAMED",
278+
"--add-opens=java.base/java.net=ALL-UNNAMED",
279+
"--add-opens=java.base/java.nio=ALL-UNNAMED",
280+
"--add-opens=java.base/java.util=ALL-UNNAMED",
281+
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED",
282+
"--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED",
283+
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
284+
"--add-opens=java.base/sun.nio.cs=ALL-UNNAMED",
285+
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
286+
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
287+
"--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
288+
)
289+
}
290+
291+
tasks.register<JavaExec>("runSpark") {
292+
group = "application"
293+
description = "Run Data Caterer as Spark job"
294+
classpath = sourceSets.main.get().runtimeClasspath
295+
mainClass.set("io.github.datacatering.datacaterer.App")
296+
297+
// Add JVM arguments similar to docker script
298+
jvmArgs(
299+
"-Djava.security.manager=allow",
300+
"-Djdk.module.illegalAccess=deny",
301+
"--add-opens=java.base/java.lang=ALL-UNNAMED",
302+
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
303+
"--add-opens=java.base/java.lang.reflect=ALL-UNNAMED",
304+
"--add-opens=java.base/java.io=ALL-UNNAMED",
305+
"--add-opens=java.base/java.net=ALL-UNNAMED",
306+
"--add-opens=java.base/java.nio=ALL-UNNAMED",
307+
"--add-opens=java.base/java.util=ALL-UNNAMED",
308+
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED",
309+
"--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED",
310+
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
311+
"--add-opens=java.base/sun.nio.cs=ALL-UNNAMED",
312+
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
313+
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
314+
"--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
315+
)
316+
}
317+
263318
sourceSets {
264319
test {
265320
resources {

app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,18 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String
193193
val dataSourceResults = (1 to numBatches).flatMap(batch => {
194194
val startTime = LocalDateTime.now()
195195
LOGGER.info(s"Starting batch, batch=$batch, num-batches=$numBatches")
196-
val generatedDataForeachTask = executableTasks.flatMap(task =>
197-
task._2.steps.filter(_.enabled).map(s => generateDataForStep(batch, task, s))
198-
)
196+
val generatedDataForeachTask = executableTasks.flatMap(task => {
197+
task._2.steps.filter(_.enabled).map(s => {
198+
LOGGER.debug(s"Generating data for step, task-name=${task._1.name}, step-name=${s.name}, data-source-name=${task._1.dataSourceName}")
199+
try {
200+
generateDataForStep(batch, task, s)
201+
} catch {
202+
case ex: Exception =>
203+
LOGGER.error(s"Failed to generate data for step, task-name=${task._1.name}, step-name=${s.name}, data-source-name=${task._1.dataSourceName}")
204+
throw ex
205+
}
206+
})
207+
})
199208

200209
val sinkDf = plan.sinkOptions
201210
.map(_ => ForeignKeyUtil.getDataFramesWithForeignKeys(plan, generatedDataForeachTask))

app/src/main/scala/io/github/datacatering/datacaterer/core/sink/SinkFactory.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class SinkFactory(
3737
def pushToSink(df: DataFrame, dataSourceName: String, step: Step, startTime: LocalDateTime): SinkResult = {
3838
val dfWithoutOmitFields = removeOmitFields(df)
3939
val saveMode = step.options.get(SAVE_MODE).map(_.toLowerCase.capitalize).map(SaveMode.valueOf).getOrElse(SaveMode.Append)
40-
val format = step.options(FORMAT)
40+
val format = step.options.getOrElse(FORMAT, throw new IllegalArgumentException(s"No format specified for data source: $dataSourceName, step: ${step.name}. Available options: ${step.options.keys.mkString(", ")}"))
4141
val enrichedConnectionConfig = additionalConnectionConfig(format, step.options)
4242

4343
val count = if (flagsConfig.enableCount) {
@@ -89,12 +89,10 @@ class SinkFactory(
8989
// if format is iceberg, need to use dataframev2 api for partition and writing
9090
connectionConfig.filter(_._1.startsWith("spark.sql"))
9191
.foreach(conf => df.sqlContext.setConf(conf._1, conf._2))
92-
LOGGER.info(s"[DEBUG unwrap] Format is: '$format', JSON constant is: '$JSON'")
9392
val trySaveData = if (format == ICEBERG) {
9493
Try(tryPartitionAndSaveDfV2(df, saveMode, connectionConfig))
9594
} else if (format == JSON) {
9695
// Special-case: allow unwrapping top-level array to emit a bare JSON array file
97-
LOGGER.info(s"[DEBUG unwrap] Format is JSON, calling trySaveJsonPossiblyUnwrapped")
9896
val tryMaybeUnwrap = Try(trySaveJsonPossiblyUnwrapped(df, saveMode, connectionConfig))
9997
tryMaybeUnwrap
10098
} else {
@@ -119,7 +117,6 @@ class SinkFactory(
119117
}
120118

121119
private def trySaveJsonPossiblyUnwrapped(df: DataFrame, saveMode: SaveMode, connectionConfig: Map[String, String]): Unit = {
122-
LOGGER.info("[DEBUG unwrap] trySaveJsonPossiblyUnwrapped called")
123120
val shouldUnwrap = detectTopLevelArrayToUnwrap(df)
124121
shouldUnwrap match {
125122
case Some(arrayFieldName) =>
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package io.github.datacatering.datacaterer.core.ui.model
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
4+
import io.github.datacatering.datacaterer.api.model.Field
5+
import org.apache.spark.sql.types.{DataType, StructType}
6+
7+
@JsonIgnoreProperties(ignoreUnknown = true)
8+
case class TaskFileSampleRequest(
9+
taskYamlPath: String,
10+
stepName: Option[String] = None,
11+
sampleSize: Int = 10,
12+
fastMode: Boolean = true
13+
)
14+
15+
@JsonIgnoreProperties(ignoreUnknown = true)
16+
case class SchemaSampleRequest(
17+
fields: List[Field],
18+
sampleSize: Int = 10,
19+
fastMode: Boolean = true
20+
)
21+
22+
@JsonIgnoreProperties(ignoreUnknown = true)
23+
case class TaskYamlSampleRequest(
24+
taskYamlContent: String,
25+
stepName: Option[String] = None,
26+
sampleSize: Int = 10,
27+
fastMode: Boolean = true
28+
)
29+
30+
@JsonIgnoreProperties(ignoreUnknown = true)
31+
case class SampleResponse(
32+
success: Boolean,
33+
executionId: String,
34+
schema: Option[SchemaInfo] = None,
35+
sampleData: Option[List[Map[String, Any]]] = None,
36+
metadata: Option[SampleMetadata] = None,
37+
error: Option[SampleError] = None
38+
)
39+
40+
@JsonIgnoreProperties(ignoreUnknown = true)
41+
case class SampleError(
42+
code: String,
43+
message: String,
44+
details: Option[String] = None
45+
)
46+
47+
@JsonIgnoreProperties(ignoreUnknown = true)
48+
case class SampleMetadata(
49+
sampleSize: Int,
50+
actualRecords: Int,
51+
generatedInMs: Long,
52+
fastModeEnabled: Boolean
53+
)
54+
55+
@JsonIgnoreProperties(ignoreUnknown = true)
56+
case class SchemaInfo(
57+
fields: List[SchemaField]
58+
)
59+
60+
object SchemaInfo {
61+
def fromSparkSchema(schema: StructType): SchemaInfo = {
62+
SchemaInfo(schema.fields.map(SchemaField.fromSparkField).toList)
63+
}
64+
}
65+
66+
@JsonIgnoreProperties(ignoreUnknown = true)
67+
case class SchemaField(
68+
name: String,
69+
`type`: String,
70+
nullable: Boolean,
71+
fields: Option[List[SchemaField]] = None
72+
)
73+
74+
object SchemaField {
75+
def fromSparkField(field: org.apache.spark.sql.types.StructField): SchemaField = {
76+
val fieldType = field.dataType match {
77+
case st: StructType =>
78+
SchemaField(
79+
name = field.name,
80+
`type` = "struct",
81+
nullable = field.nullable,
82+
fields = Some(st.fields.map(fromSparkField).toList)
83+
)
84+
case other =>
85+
SchemaField(
86+
name = field.name,
87+
`type` = other.typeName,
88+
nullable = field.nullable
89+
)
90+
}
91+
fieldType
92+
}
93+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.github.datacatering.datacaterer.core.ui.model
2+
3+
import io.github.datacatering.datacaterer.core.util.ObjectMapperUtil
4+
import org.apache.pekko.http.scaladsl.model.{ContentTypes, HttpEntity, MediaTypes}
5+
import org.apache.pekko.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
6+
7+
import scala.concurrent.{ExecutionContext, Future}
8+
import scala.concurrent.duration._
9+
import scala.util.{Failure, Success, Try}
10+
11+
object TaskYamlUnmarshaller {
12+
13+
/**
14+
* Custom unmarshaller that can handle both JSON and raw YAML content for TaskYamlSampleRequest.
15+
*
16+
* - For application/json: Deserializes the JSON directly to TaskYamlSampleRequest
17+
* - For text/plain or application/yaml: Treats content as raw YAML and creates TaskYamlSampleRequest
18+
* with the raw content, relying on query parameters for other fields
19+
*/
20+
implicit val taskYamlSampleRequestUnmarshaller: FromEntityUnmarshaller[TaskYamlSampleRequest] = {
21+
Unmarshaller.withMaterializer { implicit ec: ExecutionContext => implicit mat =>
22+
entity: HttpEntity =>
23+
entity.toStrict(3.seconds) flatMap { strictEntity => // Use 3 second timeout
24+
val contentType = entity.contentType
25+
val data = strictEntity.data.utf8String
26+
27+
Future {
28+
val mediaType = contentType.mediaType
29+
if (mediaType == MediaTypes.`application/json`) {
30+
// Use Jackson to deserialize JSON to TaskYamlSampleRequest
31+
Try {
32+
ObjectMapperUtil.jsonObjectMapper.readValue(data, classOf[TaskYamlSampleRequest])
33+
}
34+
} else if (mediaType == MediaTypes.`text/plain` || mediaType.toString.contains("yaml")) {
35+
// Treat as raw YAML content - create TaskYamlSampleRequest with raw content
36+
// Query parameters will be handled separately in the route
37+
Try {
38+
TaskYamlSampleRequest(
39+
taskYamlContent = data,
40+
stepName = None, // Will be overridden by query parameters
41+
sampleSize = 10, // Will be overridden by query parameters
42+
fastMode = true // Will be overridden by query parameters
43+
)
44+
}
45+
} else {
46+
Failure(new IllegalArgumentException(
47+
s"Unsupported content type: $mediaType. Supported: application/json, text/plain, application/yaml"))
48+
}
49+
} flatMap {
50+
case Success(result) => Future.successful(result)
51+
case Failure(ex) => Future.failed(ex)
52+
}
53+
}
54+
}
55+
}
56+
}

0 commit comments

Comments
 (0)