From 9c634063329061b6302b563f6356a3925410178d Mon Sep 17 00:00:00 2001 From: Marvin Froeder Date: Fri, 22 May 2026 12:27:47 -0300 Subject: [PATCH 1/3] fix: log uncaught exceptions from CliRunner main so JobManager pod logs capture them Signed-off-by: Marvin Froeder --- .../com/datasqrl/flinkrunner/CliRunner.java | 52 +++++++++++-------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java index 50cc411..760dbc7 100644 --- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java +++ b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java @@ -92,28 +92,36 @@ public CliRunner( public static void main(String[] args) throws Exception { log.info("Executing flink-sql-runner: {}", Arrays.toString(args)); - var cmd = new CommandLine(new SqlRunner()); - cmd.setUnmatchedArgumentsAllowed(true); - - var resCode = cmd.execute(args); - if (resCode != 0) { - System.exit(resCode); - } - - if (cmd.isUsageHelpRequested()) { - return; - } - - SqlRunner runner = cmd.getCommand(); - - // Determine UDF path - if (runner.udfPath == null) { - runner.udfPath = System.getenv("UDF_PATH"); + try { + var cmd = new CommandLine(new SqlRunner()); + cmd.setUnmatchedArgumentsAllowed(true); + + var resCode = cmd.execute(args); + if (resCode != 0) { + System.exit(resCode); + } + + if (cmd.isUsageHelpRequested()) { + return; + } + + var runner = cmd.getCommand(); + + // Determine UDF path + if (runner.udfPath == null) { + runner.udfPath = System.getenv("UDF_PATH"); + } + + new CliRunner(runner.mode, runner.sqlFile, runner.planFile, runner.configDir, runner.udfPath) + .run(); + + log.info("Finished flink-sql-runner execution"); + } catch (Throwable t) { + // Surface the failure on stdout/stderr so log scrapers and `kubectl logs` capture it. + // Without this, the exception is only reported through the Flink REST API and shows up + // as a Kubernetes Event but never reaches the JobManager pod logs. + log.error("flink-sql-runner failed", t); + throw t; } - - new CliRunner(runner.mode, runner.sqlFile, runner.planFile, runner.configDir, runner.udfPath) - .run(); - - log.info("Finished flink-sql-runner execution"); } } From d3fb1ffe10221185d27c82e1b4124f1c1929898c Mon Sep 17 00:00:00 2001 From: Marvin Froeder Date: Fri, 22 May 2026 12:38:10 -0300 Subject: [PATCH 2/3] fix: narrow CliRunner main try/catch to wrap only run() and use explicit SqlRunner type Signed-off-by: Marvin Froeder --- .../com/datasqrl/flinkrunner/CliRunner.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java index 760dbc7..4ce5c5a 100644 --- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java +++ b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java @@ -92,36 +92,36 @@ public CliRunner( public static void main(String[] args) throws Exception { log.info("Executing flink-sql-runner: {}", Arrays.toString(args)); - try { - var cmd = new CommandLine(new SqlRunner()); - cmd.setUnmatchedArgumentsAllowed(true); + var cmd = new CommandLine(new SqlRunner()); + cmd.setUnmatchedArgumentsAllowed(true); - var resCode = cmd.execute(args); - if (resCode != 0) { - System.exit(resCode); - } + var resCode = cmd.execute(args); + if (resCode != 0) { + System.exit(resCode); + } - if (cmd.isUsageHelpRequested()) { - return; - } + if (cmd.isUsageHelpRequested()) { + return; + } - var runner = cmd.getCommand(); + SqlRunner runner = cmd.getCommand(); - // Determine UDF path - if (runner.udfPath == null) { - runner.udfPath = System.getenv("UDF_PATH"); - } + // Determine UDF path + if (runner.udfPath == null) { + runner.udfPath = System.getenv("UDF_PATH"); + } + try { new CliRunner(runner.mode, runner.sqlFile, runner.planFile, runner.configDir, runner.udfPath) .run(); - - log.info("Finished flink-sql-runner execution"); } catch (Throwable t) { - // Surface the failure on stdout/stderr so log scrapers and `kubectl logs` capture it. - // Without this, the exception is only reported through the Flink REST API and shows up - // as a Kubernetes Event but never reaches the JobManager pod logs. + // Surface the failure so log scrapers and `kubectl logs` capture it. Without this, the + // exception is only reported through the Flink REST API and shows up as a Kubernetes + // Event but never reaches the JobManager pod logs. log.error("flink-sql-runner failed", t); throw t; } + + log.info("Finished flink-sql-runner execution"); } } From ca846ffe20f7ad52e7e7e9bc50211401df160076 Mon Sep 17 00:00:00 2001 From: Ferenc Csaky Date: Fri, 22 May 2026 19:12:18 +0200 Subject: [PATCH 3/3] add test case, make comment more concise --- .../com/datasqrl/flinkrunner/CliRunner.java | 18 ++++++++------- .../com/datasqrl/flinkrunner/CliRunnerIT.java | 23 +++++++++++++++++++ .../src/test/resources/sql/dummy_error.sql | 1 + 3 files changed, 34 insertions(+), 8 deletions(-) create mode 100644 flink-sql-runner/src/test/resources/sql/dummy_error.sql diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java index 4ce5c5a..09b225e 100644 --- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java +++ b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/CliRunner.java @@ -90,7 +90,7 @@ public CliRunner( } public static void main(String[] args) throws Exception { - log.info("Executing flink-sql-runner: {}", Arrays.toString(args)); + log.info("Starting Flink SQL runner: {}", Arrays.toString(args)); var cmd = new CommandLine(new SqlRunner()); cmd.setUnmatchedArgumentsAllowed(true); @@ -112,16 +112,18 @@ public static void main(String[] args) throws Exception { } try { - new CliRunner(runner.mode, runner.sqlFile, runner.planFile, runner.configDir, runner.udfPath) - .run(); + var cliRunner = + new CliRunner( + runner.mode, runner.sqlFile, runner.planFile, runner.configDir, runner.udfPath); + + cliRunner.run(); + } catch (Throwable t) { - // Surface the failure so log scrapers and `kubectl logs` capture it. Without this, the - // exception is only reported through the Flink REST API and shows up as a Kubernetes - // Event but never reaches the JobManager pod logs. - log.error("flink-sql-runner failed", t); + // Make sure we log any error to be able to present it in a K8s env + log.error("Flink SQL runner failed", t); throw t; } - log.info("Finished flink-sql-runner execution"); + log.info("Flink SQL runner finished"); } } diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerIT.java b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerIT.java index 7829ceb..89412ad 100644 --- a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerIT.java +++ b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/CliRunnerIT.java @@ -15,6 +15,8 @@ */ package com.datasqrl.flinkrunner; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.ArrayList; import java.util.stream.Stream; import org.junit.jupiter.api.Test; @@ -71,4 +73,25 @@ void givenKafkaPlanScript_whenExecuting_thenSuccess() throws Exception { String jobId = flinkRun("--planfile", "/it/planfile/kafka_plan.json"); assertJobIsRunning(jobId); } + + @Test + void givenFailingSqlScript_whenExecuting_thenLogsFailure() throws Exception { + var execRes = + flinkContainer.execInContainer("sql-runner", "--sqlfile", "/it/sqlfile/dummy_error.sql"); + var commandOutput = execRes.getStdout() + execRes.getStderr(); + + assertThat(execRes.getExitCode()).isNotZero(); + + untilAssert( + () -> { + var logRes = + flinkContainer.execInContainer( + "bash", + "-c", + "grep -R 'Flink SQL runner failed\\|DUMMY_ERROR' /opt/flink/log || true"); + var loggedOutput = commandOutput + logRes.getStdout() + logRes.getStderr(); + + assertThat(loggedOutput).contains("Flink SQL runner failed").contains("DUMMY_ERROR"); + }); + } } diff --git a/flink-sql-runner/src/test/resources/sql/dummy_error.sql b/flink-sql-runner/src/test/resources/sql/dummy_error.sql new file mode 100644 index 0000000..b1debf1 --- /dev/null +++ b/flink-sql-runner/src/test/resources/sql/dummy_error.sql @@ -0,0 +1 @@ +SELECT DUMMY_ERROR();