Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-36010

Duplicate Job Submission returns Succeeded on single execution even if Global state is FAILED

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.8.3, 1.17.2, 1.20.0, 1.19.1
    • 1.19.3, 1.20.2
    • None

    Description

      Description

      Running a job on single execution mode with HA enabled typically short circuits duplicate execution of job after it reaches globally terminal state. However this returns ApplicationStatus.SUCCEEDED even if initial global job status is FAILED. This breaks the consistency of states.

      Reproducing steps

      1- Create Flink configmap and service account as in https://github.com/vahmed-hamdy/flink-test-projects/tree/master/k8s/job-dedupe

      2- Submit Flink Job and TM deployments similar to https://github.com/vahmed-hamdy/flink-test-projects/blob/master/k8s/job-dedupe/FlinkApplicationJob.yml

      3- delete Taskmanager pods enforcing Job failover until restart count consumed, The Job now reached Terminal state FAILED.

      4- Batch Job is now restarted due to job manager failover, restarted but due to single execution mode the job manager doesn't resubmit the job.

      5- The job is recorded as SUCCEEDED due to detecting a globally terminal state (FAILED).

      Printing result to stdout. Use --output to specify output path.
      2024-08-08 12:32:50,110 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 3d98a3e455408045defa90c6b0b03a5a is submitted.
      2024-08-08 12:32:50,110 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=3d98a3e455408045defa90c6b0b03a5a.
      2024-08-08 12:32:50,118 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received JobGraph submission 'CarTopSpeedWindowingExample' (3d98a3e455408045defa90c6b0b03a5a).
      2024-08-08 12:32:50,164 WARN  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Ignoring JobGraph submission 'CarTopSpeedWindowingExample' (3d98a3e455408045defa90c6b0b03a5a) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution.
      2024-08-08 12:32:50,171 INFO  org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application completed SUCCESSFULLY
      2024-08-08 12:32:50,171 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneApplicationClusterEntryPoint down with application status SUCCEEDED. Diagnostics null.
      

      Resource Highlights

      flink-conf.yaml: |
      job-result-store.delete-on-commit: false
          high-availability.type: ZOOKEEPER
          high-availability.jobmanager.port: 6125
          high-availability.storageDir: s3a://flink-high-availability-dir
          high-availability.zookeeper.quorum: my-release-zookeeper:2181
          high-availability.zookeeper.path.root: /flink
          jobmanager.execution.slot-allocation-timeout: 10000
      
      spec:
            restartPolicy: OnFailure
            containers:
              - name: jobmanager
                image: apache/flink:1.20.0-scala_2.12
                env:
                - name: POD_IP
                  valueFrom:
                    fieldRef:
                      apiVersion: v1
                      fieldPath: status.podIP
                args: ["standalone-job","--host", "$(POD_IP)",  "--job-classname", "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing", "--job-id", "3d98a3e455408045defa90c6b0b03a5a"]
      

       

      Expected Outcome

      Instead of recording the Application state to SUCCEEDED on resubmission it should be recorded as FAILED because the globally terminal state is FAILED

      Appendix: JobResultStore on HA

      {"result":{"id":"3d98a3e455408045defa90c6b0b03a5a","application-status":"FAILED","accumulator-results":{},"net-runtime":55991,"failure-cause":{"class":"org.apache.flink.runtime.JobException","stack-trace":"org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, backoffTimeMS=1).....

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            chalixar Ahmed Hamdy
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: