Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.8.3, 1.17.2, 1.20.0, 1.19.1
-
None
Description
Description
Running a job on single execution mode with HA enabled typically short circuits duplicate execution of job after it reaches globally terminal state. However this returns ApplicationStatus.SUCCEEDED even if initial global job status is FAILED. This breaks the consistency of states.
Reproducing steps
1- Create Flink configmap and service account as in https://github.com/vahmed-hamdy/flink-test-projects/tree/master/k8s/job-dedupe
2- Submit Flink Job and TM deployments similar to https://github.com/vahmed-hamdy/flink-test-projects/blob/master/k8s/job-dedupe/FlinkApplicationJob.yml
3- delete Taskmanager pods enforcing Job failover until restart count consumed, The Job now reached Terminal state FAILED.
4- Batch Job is now restarted due to job manager failover, restarted but due to single execution mode the job manager doesn't resubmit the job.
5- The job is recorded as SUCCEEDED due to detecting a globally terminal state (FAILED).
Printing result to stdout. Use --output to specify output path. 2024-08-08 12:32:50,110 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 3d98a3e455408045defa90c6b0b03a5a is submitted. 2024-08-08 12:32:50,110 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=3d98a3e455408045defa90c6b0b03a5a. 2024-08-08 12:32:50,118 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission 'CarTopSpeedWindowingExample' (3d98a3e455408045defa90c6b0b03a5a). 2024-08-08 12:32:50,164 WARN org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Ignoring JobGraph submission 'CarTopSpeedWindowingExample' (3d98a3e455408045defa90c6b0b03a5a) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution. 2024-08-08 12:32:50,171 INFO org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application completed SUCCESSFULLY 2024-08-08 12:32:50,171 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneApplicationClusterEntryPoint down with application status SUCCEEDED. Diagnostics null.
Resource Highlights
flink-conf.yaml: | job-result-store.delete-on-commit: false high-availability.type: ZOOKEEPER high-availability.jobmanager.port: 6125 high-availability.storageDir: s3a://flink-high-availability-dir high-availability.zookeeper.quorum: my-release-zookeeper:2181 high-availability.zookeeper.path.root: /flink jobmanager.execution.slot-allocation-timeout: 10000
spec: restartPolicy: OnFailure containers: - name: jobmanager image: apache/flink:1.20.0-scala_2.12 env: - name: POD_IP valueFrom: fieldRef: apiVersion: v1 fieldPath: status.podIP args: ["standalone-job","--host", "$(POD_IP)", "--job-classname", "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing", "--job-id", "3d98a3e455408045defa90c6b0b03a5a"]
Expected Outcome
Instead of recording the Application state to SUCCEEDED on resubmission it should be recorded as FAILED because the globally terminal state is FAILED
Appendix: JobResultStore on HA
{"result":{"id":"3d98a3e455408045defa90c6b0b03a5a","application-status":"FAILED","accumulator-results":{},"net-runtime":55991,"failure-cause":{"class":"org.apache.flink.runtime.JobException","stack-trace":"org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, backoffTimeMS=1).....