Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21990

SourceStreamTask will always hang if the CheckpointedFunction#snapshotState throws an exception.

    XMLWordPrintableJSON

Details

    Description

      If the source in SourceStreamTask implements CheckpointedFunction and an exception is thrown in the snapshotState method, then the SourceStreamTask will always hang.

      The main reason is that the checkpoint is executed in the mailbox. When the CheckpointedFunction#snapshotState  of the source throws an exception, the StreamTask#cleanUpInvoke will be called, where it will wait for the end of the LegacySourceFunctionThread of the source. However, the source thread does not end by itself (this requires the user to control it), the Task will hang at this time, and the JobMaster has no perception of this behavior.

      protected void cleanUpInvoke() throws Exception {
          getCompletionFuture().exceptionally(unused -> null).join(); //wait for the end of the source
          // clean up everything we initialized
          isRunning = false;
          ...
      }

      I think we should call the cancel method of the source first, and then wait for the end.

      The following is my test code, the test branch is Flink's master branch.

      @Test
      public void testSourceFailure() throws Exception {
          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.enableCheckpointing(2000L);
          env.setRestartStrategy(RestartStrategies.noRestart());
          env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
          JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
          try {
              // assert that the job only execute checkpoint once and only failed once.
              TestUtils.submitJobAndWaitForResult(
                      cluster.getClusterClient(), jobGraph, getClass().getClassLoader());
          } catch (JobExecutionException jobException) {
              Optional<FlinkRuntimeException> throwable =
                      ExceptionUtils.findThrowable(jobException, FlinkRuntimeException.class);
              Assert.assertTrue(throwable.isPresent());
              Assert.assertEquals(
                      CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
                      throwable.get().getMessage());
          }
          // assert that the job only failed once.
          Assert.assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
      }
      
      private static class FailedSource extends RichParallelSourceFunction<String>
              implements CheckpointedFunction {
      
          private transient boolean running;
      
          @Override
          public void open(Configuration parameters) throws Exception {
              running = true;
          }
      
          @Override
          public void run(SourceContext<String> ctx) throws Exception {
              while (running) {
                  ctx.collect("test");
              }
          }
      
          @Override
          public void cancel() {
              running = false;
          }
      
          @Override
          public void snapshotState(FunctionSnapshotContext context) throws Exception {
              throw new RuntimeException("source failed");
          }
      
          @Override
          public void initializeState(FunctionInitializationContext context) throws Exception {}
      }
      

      Attachments

        Issue Links

          Activity

            People

              akalashnikov Anton Kalashnikov
              Ming Li Ming Li
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: