Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25267

Unable to (always) recover using checkpoint in HA setup (both Zookeeper and Kubernetes)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.12.1, statefun-3.0.0, statefun-3.1.0, 1.13.2
    • None
    • None
    • MacOS 11.6, minikube v1.23.2, tried with both Stateful Functions 3.0.0 and Stateful Functions 3.1.0

    • Important

    Description

      My Stateful Functions job is running on Kubernetes (minikube on my local env) and has these settings:

      • Using StateFun v3.1.0
      • Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem)
      • Checkpointing mode is EXACTLY_ONCE
      • State backend is rocksdb and incremental checkpointing is enabled

      When I kill the jobmanager (master) pod, minikube starts another pod and this new pod fails when it tries to load last checkpoint:

      ...
      2021-12-11 14:25:26,426 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job myStatefunApp (00000000000000000000000000000000).
      2021-12-11 14:25:26,443 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
      2021-12-11 14:25:26,516 INFO  org.apache.flink.runtime.util.ZooKeeperUtils                 [] - Initialized DefaultCompletedCheckpointStore in 'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}' with /checkpoints/00000000000000000000000000000000.
      2021-12-11 14:25:26,599 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job myStatefunApp (00000000000000000000000000000000).
      2021-12-11 14:25:26,599 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 0 ms.
      2021-12-11 14:25:26,617 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 1 ms
      2021-12-11 14:25:26,626 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using job/cluster config to configure application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, writeBatchSize=2097152}
      2021-12-11 14:25:26,627 INFO  org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using predefined options: DEFAULT.
      2021-12-11 14:25:26,627 INFO  org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
      2021-12-11 14:25:26,627 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, writeBatchSize=2097152}
      2021-12-11 14:25:26,631 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Checkpoint storage is set to 'filesystem': (checkpoints "hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
      2021-12-11 14:25:26,712 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Recovering checkpoints from ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
      2021-12-11 14:25:26,724 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 1 checkpoints in ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
      2021-12-11 14:25:26,725 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to fetch 1 checkpoints from storage.
      2021-12-11 14:25:26,725 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to retrieve checkpoint 2.
      2021-12-11 14:25:26,931 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for 00000000000000000000000000000000 located at hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
      2021-12-11 14:25:27,012 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
      org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.
          at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
          at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[?:?]
          at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.2.jar:1.13.2]
          at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.2.jar:1.13.2]
      Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
          at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
          at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
          at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
          at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
          at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
          at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
          at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
          at java.lang.Thread.run(Unknown Source) ~[?:?]
      Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state 18666b435c78ee2416e74bb997b798a7
           at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
          at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
          at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
          at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
          at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
          at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
          at java.lang.Thread.run(Unknown Source) ~[?:?]
      Caused by: java.lang.IllegalStateException: There is no operator for the state 18666b435c78ee2416e74bb997b798a7
           at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
          at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
          at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
          at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
          at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
          at java.lang.Thread.run(Unknown Source) ~[?:?]
      2021-12-11 14:25:27,017 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StatefulFunctionsClusterEntryPoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
      2021-12-11 14:25:27,021 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
      2021-12-11 14:25:27,025 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 0.0.0.0:6124
      2021-12-11 14:25:27,034 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing cache directory /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
      2021-12-11 14:25:27,035 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
      2021-12-11 14:25:27,035 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
      2021-12-11 14:25:27,036 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down complete.
      2021-12-11 14:25:27,036 INFO  org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent [] - Closing components.
      2021-12-11 14:25:27,037 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
      2021-12-11 14:25:27,037 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
      2021-12-11 14:25:27,037 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
      2021-12-11 14:25:27,037 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
      2021-12-11 14:25:27,038 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
      2021-12-11 14:25:27,038 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
      2021-12-11 14:25:27,039 INFO  org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - Stopping JobDispatcherLeaderProcess.
      2021-12-11 14:25:27,040 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Closing the slot manager.
      2021-12-11 14:25:27,040 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Suspending the slot manager.
      2021-12-11 14:25:27,041 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
      2021-12-11 14:25:27,041 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'} 

       

      But somehow, among several restarts, jobmanager can randomly restore job from the last checkpoint. After I changed log level of Flink to DEBUG, I've managed to get the difference between an unsuccessful (resulting in above log) and a successful sequence of events. It seems that operators can get assigned different hashes between restarts, here is the relevant log section for the unsucessful assignment (renamed my operators for clarity):

       

      2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5' {id: 5, parallelism: 1, user function: }
      2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4' {id: 4, parallelism: 1, user function: }
      2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6' {id: 6, parallelism: 1, user function: } 

      .. and here is the same log section for the successful assignment:

      2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5' {id: 5, parallelism: 1, user function: }
      2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4' {id: 4, parallelism: 1, user function: }
      2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6' {id: 6, parallelism: 1, user function: } 

      As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and jobmanager could match the operator for the state loaded from the checkpoint and it could continue normal operation. Another thing to note is, router operators have different ids assigned between the 2 runs.

       

      I took a look at StreamGraphHasherV2 code (link) there is an explicit attempt to have the operator order the same between different runs, however my Stateful Functions application seems to be able to avoid that attempt.

       

      Since we can't assign operator ids when using Stateful Functions, is there anything I can do right to get it working correctly? Is this a bug, or am I trying it with a wrong combination of settings or something like that?

       

      As a last note, I've also posted the same earlier to Stack Overflow, here is the link to the question.

      Thanks

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              keremulutas Kerem Ulutaş
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: