Details

    • Type: New Feature
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: helix-core
    • Labels:
      None

      Description

      Feature summary

      Helix Task Framework empowers user to run tasks on instances managed by Helix. There're 2 type of tasks: generic task and fixed target task. For fixed target task, the task always follows the targeted partition and is rebalanced if the partition is rebalanced. For generic task, Helix provides user the choice to rebalance the running task or not, when the topology of the cluster changes.

      For most users, it's better to disabled this feature(as default) since there's no need to re-run the task every time new node is added. For users with long-running tasks, enabling this feature can be very useful so that when new node is added, the load of the tasks are better balanced among the cluster.

      Defined system behavior

      When a node fails,

      Feature disabled:
      • Running tasks on that failed node will be rebalanced to a live node, since the task no longer exists and failed with the node.
      Feature enabled:
      • Same.

      When a new node is added,

      Feature disabled:
      • Running tasks will continue to run on the current instance.
      • If a running task fails after a while, it might be rebalanced and run on other instances, according to the new rebalance assignment under the new cluster topology.
      Feature enabled:
      • Running task might be cancelled and rebalanced immediately, according to the new rebalance assignment under the new cluster topology.

      Configuration

      A job level config field(RebalanceRunningTask) in JobConfig to enable/disable this feature. By default it's false.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kongweihan opened a pull request:

          https://github.com/apache/helix/pull/88

          HELIX-654 Running task rebalance

          Add a job config RebalanceRunningTask.

          For generic task, if feature is enabled, Helix will drop running
          tasks that are assigned differently from the previous assignment,
          which will cause cancellation of that running task on participant.
          The task will then be re-assigned to a new instance.

          For fix target task, running task always follows the partition, so
          tasks are always re-assigned as needed.

          Add different test cases for this feature enabled/disabled.

          Ticket:
          https://issues.apache.org/jira/browse/HELIX-654

          Test:
          mvn testing

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kongweihan/helix rebalance_running_task

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/helix/pull/88.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #88


          commit bf75b8988105f4c4d0f91bbdf893e67cabac6f25
          Author: Weihan Kong <wkong@linkedin.com>
          Date: 2017-04-26T22:34:25Z

          HELIX-654 Running task rebalance

          Add a job config RebalanceRunningTask.

          For generic task, if feature is enabled, Helix will drop running
          tasks that are assigned differently from the previous assignment,
          which will cause cancellation of that running task on participant.
          The task will then be re-assigned to a new instance.

          For fix target task, running task always follows the partition, so
          tasks are always re-assigned as needed.

          Add different test cases for this feature enabled/disabled.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kongweihan opened a pull request: https://github.com/apache/helix/pull/88 HELIX-654 Running task rebalance Add a job config RebalanceRunningTask. For generic task, if feature is enabled, Helix will drop running tasks that are assigned differently from the previous assignment, which will cause cancellation of that running task on participant. The task will then be re-assigned to a new instance. For fix target task, running task always follows the partition, so tasks are always re-assigned as needed. Add different test cases for this feature enabled/disabled. Ticket: https://issues.apache.org/jira/browse/HELIX-654 Test: mvn testing You can merge this pull request into a Git repository by running: $ git pull https://github.com/kongweihan/helix rebalance_running_task Alternatively you can review and apply these changes as the patch at: https://github.com/apache/helix/pull/88.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #88 commit bf75b8988105f4c4d0f91bbdf893e67cabac6f25 Author: Weihan Kong <wkong@linkedin.com> Date: 2017-04-26T22:34:25Z HELIX-654 Running task rebalance Add a job config RebalanceRunningTask. For generic task, if feature is enabled, Helix will drop running tasks that are assigned differently from the previous assignment, which will cause cancellation of that running task on participant. The task will then be re-assigned to a new instance. For fix target task, running task always follows the partition, so tasks are always re-assigned as needed. Add different test cases for this feature enabled/disabled.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/88#discussion_r117135837

          — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java —
          @@ -420,6 +411,14 @@ private ResourceAssignment computeResourceMapping(String jobResource,
          workflowConfig, workflowCtx, allPartitions, cache.getIdealStates());
          for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
          String instance = entry.getKey();
          +
          + if (!isGenericTaskJob(jobCfg) || jobCfg.isRebalanceRunningTask()) {
          — End diff –

          Why is this logic in the for loop? Do we need to execute it for each <instance, partitions> entry?

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/88#discussion_r117135837 — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java — @@ -420,6 +411,14 @@ private ResourceAssignment computeResourceMapping(String jobResource, workflowConfig, workflowCtx, allPartitions, cache.getIdealStates()); for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) { String instance = entry.getKey(); + + if (!isGenericTaskJob(jobCfg) || jobCfg.isRebalanceRunningTask()) { — End diff – Why is this logic in the for loop? Do we need to execute it for each <instance, partitions> entry?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/88#discussion_r117137720

          — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java —
          @@ -455,6 +454,44 @@ private ResourceAssignment computeResourceMapping(String jobResource,
          return ra;
          }

          + /**
          + * If assignment is different from previous assignment, drop the old running task if it's no
          + * longer assigned to the same instance, but not removing it from excludeSet because the same task
          + * should not be assigned to the new instance right way.
          + */
          + private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> newAssignment,
          + Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, PartitionAssignment> paMap,
          + JobContext jobContext) {
          + for (String instance : oldAssignment.keySet()) {
          + for (Integer pId : oldAssignment.get(instance)) {
          + if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING
          + && !newAssignment.get(instance).contains(pId)) {
          + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
          + jobContext.setPartitionState(pId, TaskPartitionState.DROPPED);
          — End diff –

          Do we need to set DROPPED here?
          New status will be updated by updateJobContextAndGetTaskCurrentState() next round, right?

          One problem of setting DROPPED here is that if the participant cannot cancel the job in a short time, it's status will still be RUNNING. Then in the first round, the controller sets it to be DROPPED. In the second round, it will be changed back to RUNNING. Although, eventually the state will be correct, it is confusing during this period.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/88#discussion_r117137720 — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java — @@ -455,6 +454,44 @@ private ResourceAssignment computeResourceMapping(String jobResource, return ra; } + /** + * If assignment is different from previous assignment, drop the old running task if it's no + * longer assigned to the same instance, but not removing it from excludeSet because the same task + * should not be assigned to the new instance right way. + */ + private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> newAssignment, + Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, PartitionAssignment> paMap, + JobContext jobContext) { + for (String instance : oldAssignment.keySet()) { + for (Integer pId : oldAssignment.get(instance)) { + if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING + && !newAssignment.get(instance).contains(pId)) { + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name())); + jobContext.setPartitionState(pId, TaskPartitionState.DROPPED); — End diff – Do we need to set DROPPED here? New status will be updated by updateJobContextAndGetTaskCurrentState() next round, right? One problem of setting DROPPED here is that if the participant cannot cancel the job in a short time, it's status will still be RUNNING. Then in the first round, the controller sets it to be DROPPED. In the second round, it will be changed back to RUNNING. Although, eventually the state will be correct, it is confusing during this period.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/88#discussion_r117352724

          — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java —
          @@ -420,6 +411,14 @@ private ResourceAssignment computeResourceMapping(String jobResource,
          workflowConfig, workflowCtx, allPartitions, cache.getIdealStates());
          for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
          String instance = entry.getKey();
          +
          + if (!isGenericTaskJob(jobCfg) || jobCfg.isRebalanceRunningTask()) {
          — End diff –

          You're right, this should be out of the loop!

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/88#discussion_r117352724 — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java — @@ -420,6 +411,14 @@ private ResourceAssignment computeResourceMapping(String jobResource, workflowConfig, workflowCtx, allPartitions, cache.getIdealStates()); for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) { String instance = entry.getKey(); + + if (!isGenericTaskJob(jobCfg) || jobCfg.isRebalanceRunningTask()) { — End diff – You're right, this should be out of the loop!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/88#discussion_r117354732

          — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java —
          @@ -455,6 +454,44 @@ private ResourceAssignment computeResourceMapping(String jobResource,
          return ra;
          }

          + /**
          + * If assignment is different from previous assignment, drop the old running task if it's no
          + * longer assigned to the same instance, but not removing it from excludeSet because the same task
          + * should not be assigned to the new instance right way.
          + */
          + private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> newAssignment,
          + Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, PartitionAssignment> paMap,
          + JobContext jobContext) {
          + for (String instance : oldAssignment.keySet()) {
          + for (Integer pId : oldAssignment.get(instance)) {
          + if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING
          + && !newAssignment.get(instance).contains(pId)) {
          + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
          + jobContext.setPartitionState(pId, TaskPartitionState.DROPPED);
          — End diff –

          You're right. However, on the other hand, if we don't set it, after the task is actually dropped, its CurrentState is deleted and JobRebalancer won't update the DROPPED state into JobContext. After the same task is scheduled again, it will be set to INIT. If something's wrong and the task is not scheduled, during this period the JobContext state is RUNNING but the task is already dropped.

          Which way do you think is better?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/88#discussion_r117354732 — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java — @@ -455,6 +454,44 @@ private ResourceAssignment computeResourceMapping(String jobResource, return ra; } + /** + * If assignment is different from previous assignment, drop the old running task if it's no + * longer assigned to the same instance, but not removing it from excludeSet because the same task + * should not be assigned to the new instance right way. + */ + private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> newAssignment, + Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, PartitionAssignment> paMap, + JobContext jobContext) { + for (String instance : oldAssignment.keySet()) { + for (Integer pId : oldAssignment.get(instance)) { + if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING + && !newAssignment.get(instance).contains(pId)) { + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name())); + jobContext.setPartitionState(pId, TaskPartitionState.DROPPED); — End diff – You're right. However, on the other hand, if we don't set it, after the task is actually dropped, its CurrentState is deleted and JobRebalancer won't update the DROPPED state into JobContext. After the same task is scheduled again, it will be set to INIT. If something's wrong and the task is not scheduled, during this period the JobContext state is RUNNING but the task is already dropped. Which way do you think is better?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/88#discussion_r117420922

          — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java —
          @@ -455,6 +454,44 @@ private ResourceAssignment computeResourceMapping(String jobResource,
          return ra;
          }

          + /**
          + * If assignment is different from previous assignment, drop the old running task if it's no
          + * longer assigned to the same instance, but not removing it from excludeSet because the same task
          + * should not be assigned to the new instance right way.
          + */
          + private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> newAssignment,
          + Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, PartitionAssignment> paMap,
          + JobContext jobContext) {
          + for (String instance : oldAssignment.keySet()) {
          + for (Integer pId : oldAssignment.get(instance)) {
          + if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING
          + && !newAssignment.get(instance).contains(pId)) {
          + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
          + jobContext.setPartitionState(pId, TaskPartitionState.DROPPED);
          — End diff –

          I think even the task is not re-scheduled, the information will be removed after cancel, right?
          So there shouldn't be the case that the context is RUNNING but the task is dropped. Unless something wrong happens during cancellation. In which case, showing RUNNING is correct.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/88#discussion_r117420922 — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java — @@ -455,6 +454,44 @@ private ResourceAssignment computeResourceMapping(String jobResource, return ra; } + /** + * If assignment is different from previous assignment, drop the old running task if it's no + * longer assigned to the same instance, but not removing it from excludeSet because the same task + * should not be assigned to the new instance right way. + */ + private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> newAssignment, + Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, PartitionAssignment> paMap, + JobContext jobContext) { + for (String instance : oldAssignment.keySet()) { + for (Integer pId : oldAssignment.get(instance)) { + if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING + && !newAssignment.get(instance).contains(pId)) { + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name())); + jobContext.setPartitionState(pId, TaskPartitionState.DROPPED); — End diff – I think even the task is not re-scheduled, the information will be removed after cancel, right? So there shouldn't be the case that the context is RUNNING but the task is dropped. Unless something wrong happens during cancellation. In which case, showing RUNNING is correct.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/88#discussion_r117837530

          — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java —
          @@ -455,6 +454,44 @@ private ResourceAssignment computeResourceMapping(String jobResource,
          return ra;
          }

          + /**
          + * If assignment is different from previous assignment, drop the old running task if it's no
          + * longer assigned to the same instance, but not removing it from excludeSet because the same task
          + * should not be assigned to the new instance right way.
          + */
          + private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> newAssignment,
          + Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, PartitionAssignment> paMap,
          + JobContext jobContext) {
          + for (String instance : oldAssignment.keySet()) {
          + for (Integer pId : oldAssignment.get(instance)) {
          + if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING
          + && !newAssignment.get(instance).contains(pId)) {
          + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
          + jobContext.setPartitionState(pId, TaskPartitionState.DROPPED);
          — End diff –

          I didn't see it gets updated. In the original code, if the CurrentState is null, it will throw exception at line 287.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/88#discussion_r117837530 — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java — @@ -455,6 +454,44 @@ private ResourceAssignment computeResourceMapping(String jobResource, return ra; } + /** + * If assignment is different from previous assignment, drop the old running task if it's no + * longer assigned to the same instance, but not removing it from excludeSet because the same task + * should not be assigned to the new instance right way. + */ + private void dropRebalancedRunningTasks(Map<String, SortedSet<Integer>> newAssignment, + Map<String, SortedSet<Integer>> oldAssignment, Map<Integer, PartitionAssignment> paMap, + JobContext jobContext) { + for (String instance : oldAssignment.keySet()) { + for (Integer pId : oldAssignment.get(instance)) { + if (jobContext.getPartitionState(pId) == TaskPartitionState.RUNNING + && !newAssignment.get(instance).contains(pId)) { + paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name())); + jobContext.setPartitionState(pId, TaskPartitionState.DROPPED); — End diff – I didn't see it gets updated. In the original code, if the CurrentState is null, it will throw exception at line 287.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on the issue:

          https://github.com/apache/helix/pull/88

          @dasahcc This patch is updated.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on the issue: https://github.com/apache/helix/pull/88 @dasahcc This patch is updated.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user dasahcc commented on the issue:

          https://github.com/apache/helix/pull/88

          Thanks for updating. Please make sure all the tests passed for this change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user dasahcc commented on the issue: https://github.com/apache/helix/pull/88 Thanks for updating. Please make sure all the tests passed for this change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on the issue:

          https://github.com/apache/helix/pull/88

          All tests pass.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on the issue: https://github.com/apache/helix/pull/88 All tests pass.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/helix/pull/88

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/helix/pull/88
          Hide
          hudson Hudson added a comment -

          FAILURE: Integrated in Jenkins build helix #1364 (See https://builds.apache.org/job/helix/1364/)
          HELIX-654 Running task rebalance (wkong: rev 8cbbf834efa30b07c31067e1b48ac6332763b02e)

          • (edit) helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
          • (edit) helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
          • (edit) helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java
          • (edit) helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
          • (edit) helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
          • (edit) helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java
          • (edit) helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
          • (edit) helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java
          • (edit) helix-core/src/main/java/org/apache/helix/task/JobConfig.java
          • (edit) helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
          • (add) helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java
          Show
          hudson Hudson added a comment - FAILURE: Integrated in Jenkins build helix #1364 (See https://builds.apache.org/job/helix/1364/ ) HELIX-654 Running task rebalance (wkong: rev 8cbbf834efa30b07c31067e1b48ac6332763b02e) (edit) helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java (edit) helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java (edit) helix-core/src/test/java/org/apache/helix/integration/task/MockTask.java (edit) helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java (edit) helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java (edit) helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java (edit) helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java (edit) helix-core/src/test/java/org/apache/helix/integration/task/TaskTestBase.java (edit) helix-core/src/main/java/org/apache/helix/task/JobConfig.java (edit) helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java (add) helix-core/src/test/java/org/apache/helix/integration/task/TestRebalanceRunningTask.java

            People

            • Assignee:
              Unassigned
              Reporter:
              kongweihan Weihan Kong
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:

                Development