Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27569

Terminated Flink job restarted from empty state when execution.shutdown-on-application-finish is false

    XMLWordPrintableJSON

Details

    Description

      When Jobmanager HA is enabled and execution.shutdown-on-application-finish = false, terminated jobs (failed, cancelled etc) will be resubmitted from a compeltely empty state on jobmanager failover.

      Please see the following situation. Flink 1.15, HA enabled, shutdown on app finish off:

      1. Submit Flink application cluster
      2. Call cancel with savepoint -> see logs below

      job succesfully finishes with savepoint

      2022-05-11 06:42:48,562 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 00000000000000000000000000000000 reached terminal state FINISHED.
      2022-05-11 06:42:48,624 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 00000000000000000000000000000000 has been registered for cleanup in the JobResultStore after reaching a terminal state.
      2022-05-11 06:42:48,626 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Stopping the JobMaster for job 'State machine job' (00000000000000000000000000000000).
      2022-05-11 06:42:48,629 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Shutting down
      2022-05-11 06:42:48,647 INFO  org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter [] - Shutting down.
      2022-05-11 06:42:48,647 INFO  org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter [] - Removing counter from ConfigMap basic-checkpoint-ha-example-00000000000000000000000000000000-config-map
      2022-05-11 06:42:48,652 INFO  org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [0cdb18eefcb2133049223214d4716fa0].
      2022-05-11 06:42:48,653 INFO  org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [bf5ece74692d786f6ba2b067c76ee1d9].
      2022-05-11 06:42:48,653 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Close ResourceManager connection 220ea961c86ea8042fde2151fd05a5c9: Stopping JobMaster for job 'State machine job' (00000000000000000000000000000000).
      2022-05-11 06:42:48,653 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
      2022-05-11 06:42:48,653 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
      2022-05-11 06:42:48,655 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Stopped to watch for default/basic-checkpoint-ha-example-cluster-config-map, watching id:9a1bc36b-6a76-4970-96a0-945e9a12b66d
      2022-05-11 06:42:48,655 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
      2022-05-11 06:42:48,655 INFO  org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
      2022-05-11 06:42:48,655 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Stopped to watch for default/basic-checkpoint-ha-example-cluster-config-map, watching id:5facec4c-d888-43b4-88d0-d1f34912d35a
      2022-05-11 06:42:48,655 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Disconnect job manager 969eeac09f5cf4813103003495204620@akka.tcp://flink@172.17.0.6:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000 from the resource manager.
      2022-05-11 06:42:48,660 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
      2022-05-11 06:42:48,723 INFO  org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices [] - Clean up the high availability data for job 00000000000000000000000000000000.
      2022-05-11 06:42:48,753 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed job graph 00000000000000000000000000000000 from KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
      2022-05-11 06:42:48,758 INFO  org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices [] - Finished cleaning up the high availability data for job 00000000000000000000000000000000.
      2022-05-11 06:42:50,321 INFO  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application completed SUCCESSFULLY

      3. Trigger JobManager failover

      Jobmanager recovers, but resubmits job from empty state:

      2022-05-11 06:48:04,535 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 00000000000000000000000000000000 is submitted.
      2022-05-11 06:48:04,535 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=00000000000000000000000000000000.
      2022-05-11 06:48:04,629 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 pods from previous attempts, current attempt id is 1.
      2022-05-11 06:48:04,629 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt.
      2022-05-11 06:48:04,650 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received JobGraph submission 'State machine job' (00000000000000000000000000000000).
      2022-05-11 06:48:04,652 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Submitting job 'State machine job' (00000000000000000000000000000000).
      2022-05-11 06:48:04,746 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Added JobGraph(jobId: 00000000000000000000000000000000) to KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
      2022-05-11 06:48:04,826 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@370a1b27.
      2022-05-11 06:48:04,838 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 .
      2022-05-11 06:48:04,843 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job 'State machine job' (00000000000000000000000000000000).
      2022-05-11 06:48:04,926 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for State machine job (00000000000000000000000000000000).
      2022-05-11 06:48:04,955 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Recovering checkpoints from KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-00000000000000000000000000000000-config-map'}.
      2022-05-11 06:48:04,959 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Found 0 checkpoints in KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-00000000000000000000000000000000-config-map'}.
      2022-05-11 06:48:04,959 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Trying to fetch 0 checkpoints from storage.
      2022-05-11 06:48:04,974 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job State machine job (00000000000000000000000000000000).
      2022-05-11 06:48:04,974 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 0 ms.
      2022-05-11 06:48:05,032 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
      2022-05-11 06:48:05,035 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@670a312c
      2022-05-11 06:48:05,035 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend
      2022-05-11 06:48:05,036 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Checkpoint storage is set to 'jobmanager'
      2022-05-11 06:48:05,053 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
      2022-05-11 06:48:05,058 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@bdfaf5f for State machine job (00000000000000000000000000000000).
      2022-05-11 06:48:05,065 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}.
      2022-05-11 06:48:05,066 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for default/basic-checkpoint-ha-example-cluster-config-map, watching id:83411d91-3094-46c5-b2cc-0576bf5cc161
      2022-05-11 06:48:05,126 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job 'State machine job' (00000000000000000000000000000000) under job master id 9c63401786b3856e5c8a0cf069e44198.
      2022-05-11 06:48:05,132 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
      

       

      In addition, checkpoint history is also lost (which is probably the main cause of the issue)

      Attachments

        1. Screenshot 2022-05-11 at 08.46.51.png
          166 kB
          Gyula Fora
        2. Screenshot 2022-05-11 at 08.50.03.png
          151 kB
          Gyula Fora

        Issue Links

          Activity

            People

              Unassigned Unassigned
              gyfora Gyula Fora
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: