Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17197

ContinuousFileReaderOperator might shade the real exception when processing records

    XMLWordPrintableJSON

Details

    Description

      In ContinuousFileReaderOperator we have the following code:

      private final transient RunnableWithException processRecordAction = () -> {
      	try {
      		processRecord();
      	} catch (Exception e) {
      		switchState(ReaderState.CLOSED);
      		throw e;
      	}
      };
      

      The reader's state is forced to be CLOSED when an exception occurs. But this switchState method might throw another exception saying that reader's current state can't be switched directly to CLOSED. This new exception will shade the real exception thrown from processRecord.

      To trigger this situation, add the following test method to any blink planner IT case class (for example org.apache.flink.table.planner.runtime.batch.sql.CalcITCase):

      @Test
      def testCsv(): Unit = {
        conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
        val tableSource = TestTableSourceSinks.getPersonCsvTableSource
        tEnv.registerTableSource("MyTable", tableSource)
        val t = tEnv.sqlQuery("SELECT * FROM MyTable")
        TableUtils.collectToList(t)
      }
      

      We then need to change org.apache.flink.api.java.Utils.CollectHelper to make it a failing sink:

      @Override
      public void writeRecord(T record) throws IOException {
      	throw new RuntimeException("writeRecordFails");
      	// accumulator.add(record, serializer);
      }
      

      Run the test case and the following exception stack will occur:

      org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      
      	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
      	at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:659)
      	at org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:77)
      	at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:51)
      	at org.apache.flink.table.planner.utils.TestingTableEnvironment.execute(TableTestBase.scala:1054)
      	at org.apache.flink.table.api.TableUtils.collectToList(TableUtils.java:85)
      	at org.apache.flink.table.planner.runtime.batch.sql.CalcITCase.testCsv(CalcITCase.scala:73)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
      	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
      	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
      	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
      	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
      	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
      	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
      	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
      	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
      	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
      	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
      Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:112)
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:189)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:183)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:177)
      	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:497)
      	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:384)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
      	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.lang.IllegalStateException: can't switch state from terminal state READING to CLOSED
      	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:217)
      	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.switchState(ContinuousFileReaderOperator.java:366)
      	at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:213)
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.run(StreamTaskActionExecutor.java:42)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:276)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:205)
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:196)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:490)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:718)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:542)
      	at java.lang.Thread.run(Thread.java:748)
      

      new RuntimeException("writeRecordFails") is shaded by java.lang.IllegalStateException: can't switch state from terminal state READING to CLOSED.

      Attachments

        Issue Links

          Activity

            People

              roman Roman Khachatryan
              TsReaper Caizhi Weng
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m