diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java index 50cc411..09b225e 100644 --- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java +++ b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java @@ -90,7 +90,7 @@ public CliRunner( } public static void main(String[] args) throws Exception { - log.info("Executing flink-sql-runner: {}", Arrays.toString(args)); + log.info("Starting Flink SQL runner: {}", Arrays.toString(args)); var cmd = new CommandLine(new SqlRunner()); cmd.setUnmatchedArgumentsAllowed(true); @@ -111,9 +111,19 @@ public static void main(String[] args) throws Exception { runner.udfPath = System.getenv("UDF_PATH"); } - new CliRunner(runner.mode, runner.sqlFile, runner.planFile, runner.configDir, runner.udfPath) - .run(); + try { + var cliRunner = + new CliRunner( + runner.mode, runner.sqlFile, runner.planFile, runner.configDir, runner.udfPath); - log.info("Finished flink-sql-runner execution"); + cliRunner.run(); + + } catch (Throwable t) { + // Make sure we log any error to be able to present it in a K8s env + log.error("Flink SQL runner failed", t); + throw t; + } + + log.info("Flink SQL runner finished"); } } diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerIT.java b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerIT.java index 7829ceb..89412ad 100644 --- a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerIT.java +++ b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerIT.java @@ -15,6 +15,8 @@ */ package com.datasqrl.flinkrunner; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.ArrayList; import java.util.stream.Stream; import org.junit.jupiter.api.Test; @@ -71,4 +73,25 @@ void givenKafkaPlanScript_whenExecuting_thenSuccess() throws Exception { String jobId = flinkRun("--planfile", "/it/planfile/kafka_plan.json"); assertJobIsRunning(jobId); } + + @Test + void givenFailingSqlScript_whenExecuting_thenLogsFailure() throws Exception { + var execRes = + flinkContainer.execInContainer("sql-runner", "--sqlfile", "/it/sqlfile/dummy_error.sql"); + var commandOutput = execRes.getStdout() + execRes.getStderr(); + + assertThat(execRes.getExitCode()).isNotZero(); + + untilAssert( + () -> { + var logRes = + flinkContainer.execInContainer( + "bash", + "-c", + "grep -R 'Flink SQL runner failed\\|DUMMY_ERROR' /opt/flink/log || true"); + var loggedOutput = commandOutput + logRes.getStdout() + logRes.getStderr(); + + assertThat(loggedOutput).contains("Flink SQL runner failed").contains("DUMMY_ERROR"); + }); + } } diff --git a/flink-sql-runner/src/test/resources/sql/dummy_error.sql b/flink-sql-runner/src/test/resources/sql/dummy_error.sql new file mode 100644 index 0000000..b1debf1 --- /dev/null +++ b/flink-sql-runner/src/test/resources/sql/dummy_error.sql @@ -0,0 +1 @@ +SELECT DUMMY_ERROR();