Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Won't Fix
-
1.15.0
-
None
-
None
Description
When Jobmanager HA is enabled and execution.shutdown-on-application-finish = false, terminated jobs (failed, cancelled etc) will be resubmitted from a compeltely empty state on jobmanager failover.
Please see the following situation. Flink 1.15, HA enabled, shutdown on app finish off:
1. Submit Flink application cluster
2. Call cancel with savepoint -> see logs below
job succesfully finishes with savepoint
2022-05-11 06:42:48,562 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 00000000000000000000000000000000 reached terminal state FINISHED. 2022-05-11 06:42:48,624 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 00000000000000000000000000000000 has been registered for cleanup in the JobResultStore after reaching a terminal state. 2022-05-11 06:42:48,626 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job 'State machine job' (00000000000000000000000000000000). 2022-05-11 06:42:48,629 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Shutting down 2022-05-11 06:42:48,647 INFO org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter [] - Shutting down. 2022-05-11 06:42:48,647 INFO org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter [] - Removing counter from ConfigMap basic-checkpoint-ha-example-00000000000000000000000000000000-config-map 2022-05-11 06:42:48,652 INFO org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [0cdb18eefcb2133049223214d4716fa0]. 2022-05-11 06:42:48,653 INFO org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [bf5ece74692d786f6ba2b067c76ee1d9]. 2022-05-11 06:42:48,653 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager connection 220ea961c86ea8042fde2151fd05a5c9: Stopping JobMaster for job 'State machine job' (00000000000000000000000000000000). 2022-05-11 06:42:48,653 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService. 2022-05-11 06:42:48,653 INFO org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}. 2022-05-11 06:42:48,655 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Stopped to watch for default/basic-checkpoint-ha-example-cluster-config-map, watching id:9a1bc36b-6a76-4970-96a0-945e9a12b66d 2022-05-11 06:42:48,655 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService. 2022-05-11 06:42:48,655 INFO org.apache.flink.kubernetes.highavailability.KubernetesLeaderRetrievalDriver [] - Stopping KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}. 2022-05-11 06:42:48,655 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Stopped to watch for default/basic-checkpoint-ha-example-cluster-config-map, watching id:5facec4c-d888-43b4-88d0-d1f34912d35a 2022-05-11 06:42:48,655 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Disconnect job manager 969eeac09f5cf4813103003495204620@akka.tcp://flink@172.17.0.6:6123/user/rpc/jobmanager_2 for job 00000000000000000000000000000000 from the resource manager. 2022-05-11 06:42:48,660 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService. 2022-05-11 06:42:48,723 INFO org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices [] - Clean up the high availability data for job 00000000000000000000000000000000. 2022-05-11 06:42:48,753 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed job graph 00000000000000000000000000000000 from KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-cluster-config-map'}. 2022-05-11 06:42:48,758 INFO org.apache.flink.kubernetes.highavailability.KubernetesMultipleComponentLeaderElectionHaServices [] - Finished cleaning up the high availability data for job 00000000000000000000000000000000. 2022-05-11 06:42:50,321 INFO org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application completed SUCCESSFULLY
3. Trigger JobManager failover
Jobmanager recovers, but resubmits job from empty state:
2022-05-11 06:48:04,535 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 00000000000000000000000000000000 is submitted. 2022-05-11 06:48:04,535 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=00000000000000000000000000000000. 2022-05-11 06:48:04,629 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered 0 pods from previous attempts, current attempt id is 1. 2022-05-11 06:48:04,629 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt. 2022-05-11 06:48:04,650 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission 'State machine job' (00000000000000000000000000000000). 2022-05-11 06:48:04,652 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job 'State machine job' (00000000000000000000000000000000). 2022-05-11 06:48:04,746 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Added JobGraph(jobId: 00000000000000000000000000000000) to KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-cluster-config-map'}. 2022-05-11 06:48:04,826 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@370a1b27. 2022-05-11 06:48:04,838 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 . 2022-05-11 06:48:04,843 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job 'State machine job' (00000000000000000000000000000000). 2022-05-11 06:48:04,926 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for State machine job (00000000000000000000000000000000). 2022-05-11 06:48:04,955 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Recovering checkpoints from KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-00000000000000000000000000000000-config-map'}. 2022-05-11 06:48:04,959 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Found 0 checkpoints in KubernetesStateHandleStore{configMapName='basic-checkpoint-ha-example-00000000000000000000000000000000-config-map'}. 2022-05-11 06:48:04,959 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Trying to fetch 0 checkpoints from storage. 2022-05-11 06:48:04,974 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on master for job State machine job (00000000000000000000000000000000). 2022-05-11 06:48:04,974 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran initialization on master in 0 ms. 2022-05-11 06:48:05,032 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently. 2022-05-11 06:48:05,035 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@670a312c 2022-05-11 06:48:05,035 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend 2022-05-11 06:48:05,036 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage is set to 'jobmanager' 2022-05-11 06:48:05,053 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint found during restore. 2022-05-11 06:48:05,058 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@bdfaf5f for State machine job (00000000000000000000000000000000). 2022-05-11 06:48:05,065 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='basic-checkpoint-ha-example-cluster-config-map'}. 2022-05-11 06:48:05,066 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for default/basic-checkpoint-ha-example-cluster-config-map, watching id:83411d91-3094-46c5-b2cc-0576bf5cc161 2022-05-11 06:48:05,126 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution of job 'State machine job' (00000000000000000000000000000000) under job master id 9c63401786b3856e5c8a0cf069e44198. 2022-05-11 06:48:05,132 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
In addition, checkpoint history is also lost (which is probably the main cause of the issue)
Attachments
Attachments
Issue Links
- relates to
-
FLINK-27573 Configuring a new random job result store directory
- Closed