Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-8740

Job-level metrics lost during job re-submission in HA mode

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 1.4.0
    • 1.5.0
    • Runtime / Coordination
    • None

    Description

      When Flink is running in High Availability and a leader re-election occurs to the same job manager, the job is unable to register the job-level metrics due to a name collision. 

      This may occur even if a different Job Manager is elected, but as it is a local JobManagerMetricsGroup which spits out the error, that is unlikely the case.

       

      Expected Behavior

      When a job is forced to re-submit due to Job Manager re-election, job-level metrics should be available in the new instance of the job (uptime, checkpoints size, checkpoint duration, etc)

      Actual Behavior

      When job gets re-submitted, it is unable to register job-level metrics due to collision in the JobManagerMetricGroup, which leads to situation where even though job is running the metrics around checkpoints and uptime are not available

      Steps to reproduce

      1. Start up Flink in HA mode using ZooKeeper, single node is fine
      2. Submit a job to the cluster
      3. Stop and restart ZooKeeper
      4. In Job Manager logs you will see the following errors:
      5. 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'totalNumberOfCheckpoints'. Metric will not be reported....
        79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'numberOfInProgressCheckpoints'. Metric will not be reported....
        79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - Name collision: Group already contains a Metric with the name 'numberOfCompletedCheckpoints'. Metric will not be reported....

      Proposed Solution

      I suspect that there may be other related issues than just the metrics, but a code change that seems to fix the issue is that, during recovery, to remove the existing registered Job Metrics:

      if (isRecovery) {
         log.info(s"Removing metrics for $jobId, new will be added during recover")
         jobManagerMetricGroup.removeJob(jobId)
      }

      I'd be happy to submit this in a PR if that is acceptable to open up the discussion, but I am not sure the consequences of not closing the previous JMMG or perhaps simply not re-registering job-level metrics during recovery. Doing this would seem to entail informing lower levels about the recovery.

      Attachments

        Issue Links

          Activity

            People

              trohrmann Till Rohrmann
              jdewald Joshua DeWald
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: