Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: JobManager
    • Labels:
      None

      Description

      When the JobManagerRunner grants leadership, it should check whether the current job is already running or not. If the job is running, the JobManager should reconcile itself (enter RECONCILING state) and waits for the TaskManager reporting task status. Otherwise the JobManger can schedule the ExecutionGraph in common way.

      The RunningJobsRegistry can provide the way to check the job running status, but we should expand the current interface and fix the related process to support this function.

      1. RunningJobsRegistry sets RUNNING status after JobManagerRunner granting leadership at the first time.

      2. If the job finishes, the job status will be set FINISHED by RunningJobsRegistry and the status will be deleted before exit.

      3. If the mini cluster starts multi JobManagerRunner, and the leader JobManagerRunner already finishes the job to set the job status FINISHED, other JobManagerRunner will exit after grants the leadership again.

      4. If the JobManager fails, the job status will be still in RUNNING. So if the JobManagerRunner (the previous or new one) grants leadership again, it will check the job status and enters RECONCILING state.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3385

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3385
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in e7011d78a3019880a4e00ab5f697c3cfd20161bb and e0dede9fb0a2ef7560254b6fc40d852ebf16c956 and 7f244b8d4267b50c88aef69f6fd915595f23b368

          Show
          StephanEwen Stephan Ewen added a comment - Fixed in e7011d78a3019880a4e00ab5f697c3cfd20161bb and e0dede9fb0a2ef7560254b6fc40d852ebf16c956 and 7f244b8d4267b50c88aef69f6fd915595f23b368
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shuai-xu commented on the issue:

          https://github.com/apache/flink/pull/3385

          @StephanEwen , Thank you very much, sorry for the test break, next time I will be more careful.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/3385 @StephanEwen , Thank you very much, sorry for the test break, next time I will be more careful.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3385

          One test case seemed to be failing in this PR:
          I have merged the PR to my local repository, fixed the test, and added some fixes/cleanups on top.
          Will merge back to Flink master tomorrow...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3385 One test case seemed to be failing in this PR: I have merged the PR to my local repository, fixed the test, and added some fixes/cleanups on top. Will merge back to Flink master tomorrow...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3385

          Thanks!
          I think I can take this over now...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3385 Thanks! I think I can take this over now...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shuai-xu commented on the issue:

          https://github.com/apache/flink/pull/3385

          hi @StephanEwen , thank for you review, I modify it according to your comments, add getJobSchedulingStatus to it and add tests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/3385 hi @StephanEwen , thank for you review, I modify it according to your comments, add getJobSchedulingStatus to it and add tests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3385#discussion_r102993092

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java —
          @@ -55,7 +59,7 @@ public void setJobRunning(JobID jobID) throws IOException {
          try {
          String zkPath = runningJobPath + jobID.toString();
          this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());

          • this.client.setData().forPath(zkPath);
            + this.client.setData().forPath(zkPath, RUNNING.getBytes());
              • End diff –

          String to bytes conversion (and bytes to string) must always explicitly specify the encoding (Charset). Otherwise, there can be mismatches when different machines configure different default Charsets.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3385#discussion_r102993092 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java — @@ -55,7 +59,7 @@ public void setJobRunning(JobID jobID) throws IOException { try { String zkPath = runningJobPath + jobID.toString(); this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient()); this.client.setData().forPath(zkPath); + this.client.setData().forPath(zkPath, RUNNING.getBytes()); End diff – String to bytes conversion (and bytes to string) must always explicitly specify the encoding (Charset). Otherwise, there can be mismatches when different machines configure different default Charsets.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3385

          With the problem observed above, I think we should change the approach a bit:

          • The registry should have an enum that it returns: `getJobSchedulingStatus` or so, which can be `PENDING`, `RUNNING`, and `DONE`. That way there is only one access to the registry and we don't have the problem that the internal status is changed between checks.
          • The file-based registry would create one file for the transition to `RUNNING` and another for the transition to `DONE`. Important is that the transition to `DONE` does not remove the file for `RUNNING`. The status check checks backwards - first for the `DONE` file, then for the `RUNNING` file.
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3385 With the problem observed above, I think we should change the approach a bit: The registry should have an enum that it returns: `getJobSchedulingStatus` or so, which can be `PENDING`, `RUNNING`, and `DONE`. That way there is only one access to the registry and we don't have the problem that the internal status is changed between checks. The file-based registry would create one file for the transition to `RUNNING` and another for the transition to `DONE`. Important is that the transition to `DONE` does not remove the file for `RUNNING`. The status check checks backwards - first for the `DONE` file, then for the `RUNNING` file.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3385

          One issue I think can happen in practice is that the checks "isRunning" and "isFinished" are not atomic. Imagine this scenario:

          • job is running
          • JobManager checks "isFinished" -> false
          • job finishes
          • JobManager checks "isRunning" -> false
          • JobManager starts job = bug
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3385 One issue I think can happen in practice is that the checks "isRunning" and "isFinished" are not atomic. Imagine this scenario: job is running JobManager checks "isFinished" -> false job finishes JobManager checks "isRunning" -> false JobManager starts job = bug
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3385

          I would like to merge this and make a few edits on top...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3385 I would like to merge this and make a few edits on top...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3385

          PR looks like a good start, but I think we need to add a few things on top:

          • The file-based registry cannot distinguish between "job created but not running" and "job running". This distinction is important to decide whether to start reconciliation.
          • There are currently no tests for the extended functionality
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3385 PR looks like a good start, but I think we need to add a few things on top: The file-based registry cannot distinguish between "job created but not running" and "job running". This distinction is important to decide whether to start reconciliation. There are currently no tests for the extended functionality
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user shuai-xu opened a pull request:

          https://github.com/apache/flink/pull/3385

          FLINK-5501 JM use running job registry to determine whether is the first running

          This pr if for jira-#[5501](https://issues.apache.org/jira/browse/FLINK-5501).

          The main changes are:
          1. Add interface isJobFinished() and clearJob() to RunningJobRegistry and implement them.
          2. After grantLeadership, JMRunner will first check whether the job is finished, if finished, it means that other JM has finished the job, it only need to exist.
          3. Then JMRunner will check whether the job is running, if running, it means other JM has run it, but not succeeded, so it need to recover it.
          4. If the job is not running, it mean the first running, the JMRunner will setJobRunning in RunningJobRegistry.
          5. After job finished, will clear the job state from RunningJobRegistry

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/shuai-xu/flink jira-5501

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3385.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3385


          commit 7c5068e7ea0592f3ba0527d3d363c7cf4653713d
          Author: shuai.xus <shuai.xus@alibaba-inc.com>
          Date: 2017-02-22T06:15:43Z

          FLINK-5501 JM use running job registry to determine whether is the first running


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/3385 FLINK-5501 JM use running job registry to determine whether is the first running This pr if for jira-# [5501] ( https://issues.apache.org/jira/browse/FLINK-5501 ). The main changes are: 1. Add interface isJobFinished() and clearJob() to RunningJobRegistry and implement them. 2. After grantLeadership, JMRunner will first check whether the job is finished, if finished, it means that other JM has finished the job, it only need to exist. 3. Then JMRunner will check whether the job is running, if running, it means other JM has run it, but not succeeded, so it need to recover it. 4. If the job is not running, it mean the first running, the JMRunner will setJobRunning in RunningJobRegistry. 5. After job finished, will clear the job state from RunningJobRegistry You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-5501 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3385.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3385 commit 7c5068e7ea0592f3ba0527d3d363c7cf4653713d Author: shuai.xus <shuai.xus@alibaba-inc.com> Date: 2017-02-22T06:15:43Z FLINK-5501 JM use running job registry to determine whether is the first running
          Hide
          StephanEwen Stephan Ewen added a comment -

          I would actually go ahead with what you suggested originally and extend the RunningJobsRegistry to support the different states "CREATED", "RUNNING", "FINISHED".

          I was only listing the other possible options for interested readers.

          Show
          StephanEwen Stephan Ewen added a comment - I would actually go ahead with what you suggested originally and extend the RunningJobsRegistry to support the different states "CREATED", "RUNNING", "FINISHED". I was only listing the other possible options for interested readers.
          Hide
          zjwang zhijiang added a comment - - edited

          Stephan EwenThank you for the quick response!

          Yeah, you already considered all the feasible alternatives to implement this goal and I totally agreed with that.

          1. For extending the leader election service, I also thought of this way before implementation. For currently ZookeeperLeaderElectionService, the leader node is EPHEMERAL type, if the incrementing number is carried in this node, it should be changed to PERSISTENT type, otherwise there should add another node for incrementing number. This way is very similar with by RunningJobsRegistry, from semantic aspect, LeaderElectionService may be more suitable. But from minimum change aspect, I already implemented that by RunningJobsRegistry.

          2. Actually I did not think of this way before, and it is an total different idea and interesting. The TaskManager is aware of JobManager leader change and will be re-register the new leader after changed. So the JobManager can resort to the registration process to determine the status.
          But it may be complicated to coordinate between common schedule and reconciling, because they will be triggered at the same time. And also it will bring more resource waste temporarily. If the JobManager can determine the status after startup in an easy way, it can do the specific process and no need to do ambiguous thing.

          In summary, I prefer the first way to implement the goal. And the whole JobManager failure feature has been finished in my side, could I submit the pull request for this issue based on RunningJobsRegistry implementation?

          Show
          zjwang zhijiang added a comment - - edited Stephan Ewen Thank you for the quick response! Yeah, you already considered all the feasible alternatives to implement this goal and I totally agreed with that. 1. For extending the leader election service, I also thought of this way before implementation. For currently ZookeeperLeaderElectionService , the leader node is EPHEMERAL type, if the incrementing number is carried in this node, it should be changed to PERSISTENT type, otherwise there should add another node for incrementing number. This way is very similar with by RunningJobsRegistry , from semantic aspect, LeaderElectionService may be more suitable. But from minimum change aspect, I already implemented that by RunningJobsRegistry . 2. Actually I did not think of this way before, and it is an total different idea and interesting. The TaskManager is aware of JobManager leader change and will be re-register the new leader after changed. So the JobManager can resort to the registration process to determine the status. But it may be complicated to coordinate between common schedule and reconciling, because they will be triggered at the same time. And also it will bring more resource waste temporarily. If the JobManager can determine the status after startup in an easy way, it can do the specific process and no need to do ambiguous thing. In summary, I prefer the first way to implement the goal. And the whole JobManager failure feature has been finished in my side, could I submit the pull request for this issue based on RunningJobsRegistry implementation?
          Hide
          StephanEwen Stephan Ewen added a comment -

          I think the approach you outlined is good.

          For thought and future reference, Till Rohrmann and me were thinking through the following alternatives as well that we rejected in the end:

          1. Extend the leader election service such that it carries an incrementing number when leaders change. If the leader is elected with 0 then it simply starts the job, if it is elected with something != 0, it starts with reconciling. That approach, however, is not very suitable for cluster sessions, and does not have a good separation of concerns.

          2. JobManager always starts the job, and if a TaskManager registers as "reconciling", it cancels the job and goes to "reconciling".

          • Advantage: No special state, plus eager acquisition of resources in case no reconciliation happens
          • Disadvantage: Reconciliation is the more common case (assuming very long running streaming jobs) and this runs off "in the wrong direction" for the common case, triggering unnecessary resource allocation. It is also probably more complicated to implement.
          Show
          StephanEwen Stephan Ewen added a comment - I think the approach you outlined is good. For thought and future reference, Till Rohrmann and me were thinking through the following alternatives as well that we rejected in the end: 1. Extend the leader election service such that it carries an incrementing number when leaders change. If the leader is elected with 0 then it simply starts the job, if it is elected with something != 0 , it starts with reconciling. That approach, however, is not very suitable for cluster sessions, and does not have a good separation of concerns. 2. JobManager always starts the job, and if a TaskManager registers as "reconciling", it cancels the job and goes to "reconciling". Advantage: No special state, plus eager acquisition of resources in case no reconciliation happens Disadvantage: Reconciliation is the more common case (assuming very long running streaming jobs) and this runs off "in the wrong direction" for the common case, triggering unnecessary resource allocation. It is also probably more complicated to implement.

            People

            • Assignee:
              tiemsn shuai.xu
              Reporter:
              zjwang zhijiang
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development