Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.13.2
-
None
-
None
-
Flink 1.13.2
Ubuntu 21.04
Java 8
Description
I've noticed that when executing a valid query, in case there is a "bad" error when submitting it to the flink cluster, the client is going to crash, with a misleading beginning of the stacktrace. For example:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) Caused by: java.lang.RuntimeException: Error running SQL job. at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:606) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:537) at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:603) ... 8 more Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 6 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'Source: KafkaTableSource(o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_currency, o_ordertime, o_orderpriority, o_clerk, o_shippriority, o_comment) -> SourceConversion(table=[default_catalog.default_database.prod_orders, source: [KafkaTableSource(o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_currency, o_ordertime, o_orderpriority, o_clerk, o_shippriority, o_comment)]], fields=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_currency, o_ordertime, o_orderpriority, o_clerk, o_shippriority, o_comment]) -> Timestamps/Watermarks -> SinkConversionToRow -> Map -> Sink: CsvTableSink(o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_currency, o_ordertime, o_orderpriority, o_clerk, o_shippriority, o_comment)': File or directory already exists. Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories. at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:215) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146) ... 10 more Caused by: java.io.IOException: File or directory already exists. Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories. at org.apache.flink.core.fs.FileSystem.initOutPathDistFS(FileSystem.java:940) at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:286) at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:100) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:212) ... 20 more End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ... 4 more Shutting down the session... done.
Or another example is in this issue: https://issues.apache.org/jira/browse/FLINK-22188
Shouldn't we catch these exceptions and nicely report them, without crashing the client? For example, the exception could be written to stderr and to a log file, without crashing the client, thus waiting for the next input query.
Also the initial lines of the stacktrace are misleading:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
While it's clearly a misconfiguration from my side of the connector.
What do you think about it?