Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9640

Checkpointing is aways aborted if any task has been finished

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.3.0, 1.4.0, 1.5.0, 1.6.0
    • None
    • None

    Description

      steps to reproduce:
      1. build a standalone flink cluster.
      2. submit a test job like this below:

      public class DemoJob {
      
          public static void main(String[] args) throws Exception {
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.disableOperatorChaining();
              env.setParallelism(4);
              env.enableCheckpointing(3000);
              env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
      
              DataStream<String> inputStream = env.addSource(new StringGeneratorParallelSourceFunction());
      
              inputStream.map(String::hashCode).print();
      
              env.execute();
          }
      
          public static class StringGeneratorParallelSourceFunction extends RichParallelSourceFunction<String> {
              private static final Logger LOG = LoggerFactory.getLogger(StringGeneratorParallelSourceFunction.class);
              private volatile boolean running = true;
              private int index;
              private int subtask_nums;
      
              @Override
              public void open(Configuration parameters) throws Exception {
                  index = getRuntimeContext().getIndexOfThisSubtask();
                  subtask_nums = getRuntimeContext().getNumberOfParallelSubtasks();
              }
      
              @Override
              public void run(SourceContext<String> ctx) throws Exception {
      
                  while (running) {
                      String data = UUID.randomUUID().toString();
                      ctx.collect(data);
                      LOG.info("subtask_index = {}, emit string = {}", index, data);
                      Thread.sleep(50);
                      if (index == subtask_nums / 2) {
                          running = false;
                          LOG.info("subtask_index = {}, finished.", index);
                      }
                  }
              }
      
              @Override
              public void cancel() {
                  running = false;
              }
          }
      }
      

      3. observer jm and tm logs can be found.
      taskmanager.log

      2018-06-21 17:05:54,144 INFO  com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - subtask_index = 2, emit string = 5b0c2467-ad2e-4b53-b1a4-7a0f64560570
      2018-06-21 17:05:54,151 INFO  com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - subtask_index = 0, emit string = 11af78b3-59ea-467c-a267-7c2238e44ffe
      2018-06-21 17:05:54,195 INFO  com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - subtask_index = 2, finished.
      2018-06-21 17:05:54,200 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to FINISHED.
      2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9).
      2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) [FINISHED]
      2018-06-21 17:05:54,202 INFO  org.apache.flink.yarn.YarnTaskManager                         - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source (6b2a374bec5f31112811613537dd4fd9)
      2018-06-21 17:05:54,211 INFO  com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - subtask_index = 0, emit string = f29f48fd-ca53-4a96-b596-93948c09581d
      

      jobmanager.log

      2018-06-21 17:05:52,682 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Unnamed (2/4) (3aee8bf5103065e8b57ebd0e214141ae) switched from DEPLOYING to RUNNING.
      2018-06-21 17:05:52,683 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (2/4) (de90106d04f63cb9ea531308202e233f) switched from DEPLOYING to RUNNING.
      2018-06-21 17:05:54,219 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to FINISHED.
      2018-06-21 17:05:54,224 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (3/4) (8f523afb97dc848a9578f9cae5870421) switched from RUNNING to FINISHED.
      2018-06-21 17:05:54,224 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: Unnamed (3/4) (39f76a87d20cfc491e11f0b5b08ec5c2) switched from RUNNING to FINISHED.
      2018-06-21 17:05:55,069 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (3/4) is not being executed at the moment. Aborting checkpoint.
      2018-06-21 17:05:58,067 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (3/4) is not being executed at the moment. Aborting checkpoint.
      2018-06-21 17:06:01,067 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (3/4) is not being executed at the moment. Aborting checkpoint.
      2018-06-21 17:06:04,067 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Custom Source (3/4) is not being executed at the moment. Aborting checkpoint.
      

      -------------------------------------------------------------------------------------------------

      I think we should filter out the FINISHED tasks, for tasks that need trigger checkpoint or need ack checkpoint. more details, see `org.apache.flink.runtime.checkpoint.CheckpointCoordinator` class.

      Attachments

        Issue Links

          Activity

            People

              yew1eb Hai Zhou
              yew1eb Hai Zhou
              Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: