Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22379

Do not trigger checkpoint when non source tasks are INITIALIZING

    XMLWordPrintableJSON

Details

    Description

      It looks like checkpoints are still being declined by not ready tasks:

      2021-04-21 12:59:10,192 INFO  org.apache.flink.runtime.taskmanager.Task                    Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512) switched from DEPLOYING to INITIALIZING.
      2021-04-21 12:59:10,193 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       Co-Keyed-Process (5/6) (c5873c9cf471a32925b54ed110250512) switched from DEPLOYING to INITIALIZING.
      ...
      2021-04-21 12:59:10,544 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    Decline checkpoint 12 by task c5873c9cf471a32925b54ed110250512 of job b643c49a878c1728d0564f194c8e563e at a579506c-cf1b-4a9c-9964-ab51ae9a1a71 @ localhost (dataPort=38367).
      org.apache.flink.util.SerializedThrowable: Checkpoint was declined (tasks not ready)
              at org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.checkpointStarted(RecoveredInputChannel.java:263) ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate.checkpointStarted(IndexedInputGate.java:36) ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.checkpointing.WaitingForFirstBarrierUnaligned.barrierReceived(WaitingForFirstBarrierUnaligned.java:69) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:65) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:228) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:419) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:557) ~[flink-streaming-java_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
              at java.lang.Thread.run(Thread.java:834) [?:?]
      

      100ms later in the log…

      2021-04-21 12:59:10,611 INFO  org.apache.flink.runtime.taskmanager.Task                    Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512) switched from INITIALIZING to RUNNING.
      2021-04-21 12:59:10,611 DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition Co-Keyed-Process (5/6)#0 (c5873c9cf471a32925b54ed110250512): Creating read view for subpartition 2 of partition 0e0bca97cc41ba9b4eb0f54995370397#4@c5873c9cf471a32925b54ed110250512.
      2021-04-21 12:59:10,612 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       Co-Keyed-Process (5/6) (c5873c9cf471a32925b54ed110250512) switched from INITIALIZING to RUNNING.
      

      It's because checkTasksStarted checks that only input tasks(SOURCE) are ready rather than all tasks are ready. So if inside of org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator#calculateCheckpointPlan
      replace

      checkTasksStarted(result.getTasksToTrigger());
      

      by

      checkTasksStarted(result.getTasksToWaitFor());
      

      it should work.

      Attachments

        Issue Links

          Activity

            People

              akalashnikov Anton Kalashnikov
              akalashnikov Anton Kalashnikov
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: