Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21990

SourceStreamTask will always hang if the CheckpointedFunction#snapshotState throws an exception.

    XMLWordPrintableJSON

    Details

      Description

      If the source in SourceStreamTask implements CheckpointedFunction and an exception is thrown in the snapshotState method, then the SourceStreamTask will always hang.

      The main reason is that the checkpoint is executed in the mailbox. When the CheckpointedFunction#snapshotState  of the source throws an exception, the StreamTask#cleanUpInvoke will be called, where it will wait for the end of the LegacySourceFunctionThread of the source. However, the source thread does not end by itself (this requires the user to control it), the Task will hang at this time, and the JobMaster has no perception of this behavior.

      protected void cleanUpInvoke() throws Exception {
          getCompletionFuture().exceptionally(unused -> null).join(); //wait for the end of the source
          // clean up everything we initialized
          isRunning = false;
          ...
      }

      I think we should call the cancel method of the source first, and then wait for the end.

      The following is my test code, the test branch is Flink's master branch.

      @Test
      public void testSourceFailure() throws Exception {
          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.enableCheckpointing(2000L);
          env.setRestartStrategy(RestartStrategies.noRestart());
          env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
          JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
          try {
              // assert that the job only execute checkpoint once and only failed once.
              TestUtils.submitJobAndWaitForResult(
                      cluster.getClusterClient(), jobGraph, getClass().getClassLoader());
          } catch (JobExecutionException jobException) {
              Optional<FlinkRuntimeException> throwable =
                      ExceptionUtils.findThrowable(jobException, FlinkRuntimeException.class);
              Assert.assertTrue(throwable.isPresent());
              Assert.assertEquals(
                      CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
                      throwable.get().getMessage());
          }
          // assert that the job only failed once.
          Assert.assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
      }
      
      private static class FailedSource extends RichParallelSourceFunction<String>
              implements CheckpointedFunction {
      
          private transient boolean running;
      
          @Override
          public void open(Configuration parameters) throws Exception {
              running = true;
          }
      
          @Override
          public void run(SourceContext<String> ctx) throws Exception {
              while (running) {
                  ctx.collect("test");
              }
          }
      
          @Override
          public void cancel() {
              running = false;
          }
      
          @Override
          public void snapshotState(FunctionSnapshotContext context) throws Exception {
              throw new RuntimeException("source failed");
          }
      
          @Override
          public void initializeState(FunctionInitializationContext context) throws Exception {}
      }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                akalashnikov Anton Kalashnikov
                Reporter:
                Ming Li ming li
              • Votes:
                0 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: