Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28941

Savepoint ignores MaxConcurrentCheckpoint limit in aligned checkpoint case

    XMLWordPrintableJSON

Details

    Description

      When the unaligned checkpoint is disabled, savepoints would be set as forced[1], which means they can ignore the maxConcurrentCheckpoint limit[2] and lead to the situation that there are more than maxConcurrentCheckpoint running simultaneously.

      This behavior is incompatible with OperatorCoordinatorHolder, which requires that there should be at most one pending checkpoint at a time. As a result, exceptions, as follows, might be thrown[3].

      java.lang.IllegalStateException: Cannot mark for checkpoint 9, already marked for checkpoint 8
      	at org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.markForCheckpoint(SubtaskGatewayImpl.java:185) ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinatorInternal$6(OperatorCoordinatorHolder.java:328) ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at java.util.HashMap.forEach(HashMap.java:1289) ~[?:1.8.0_292]
      	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:327) ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:243) ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) ~[classes/:?]
      	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[classes/:?]
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) ~[classes/:?]
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) ~[classes/:?]
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[classes/:?]
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[classes/:?]
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [akka-actor_2.12-2.6.15.jar:2.6.15]
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [akka-actor_2.12-2.6.15.jar:2.6.15]
      	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [scala-library-2.12.7.jar:?]
      	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [scala-library-2.12.7.jar:?]
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [akka-actor_2.12-2.6.15.jar:2.6.15]
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.12.7.jar:?]
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [scala-library-2.12.7.jar:?]
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [scala-library-2.12.7.jar:?]
      	at akka.actor.Actor.aroundReceive(Actor.scala:537) [akka-actor_2.12-2.6.15.jar:2.6.15]
      	at akka.actor.Actor.aroundReceive$(Actor.scala:535) [akka-actor_2.12-2.6.15.jar:2.6.15]
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [akka-actor_2.12-2.6.15.jar:2.6.15]
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [akka-actor_2.12-2.6.15.jar:2.6.15]
      	at akka.actor.ActorCell.invoke(ActorCell.scala:548) [akka-actor_2.12-2.6.15.jar:2.6.15]
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [akka-actor_2.12-2.6.15.jar:2.6.15]
      	at akka.dispatch.Mailbox.run(Mailbox.scala:231) [akka-actor_2.12-2.6.15.jar:2.6.15]
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [akka-actor_2.12-2.6.15.jar:2.6.15]
      	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_292]
      	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_292]
      	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_292]
      	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_292]
      

      [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L160-L164
      [2] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L444-L449
      [3] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39860&view=logs&j=219f6d90-20a2-5863-7c1b-c80377a1018f&t=20186858-1485-5059-c9c6-446952519524&s=ab6e269b-88b2-5ded-2544-4aa5b1124530

      Attachments

        Issue Links

          Activity

            People

              yunfengzhou Yunfeng Zhou
              yunfengzhou Yunfeng Zhou
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: