Uploaded image for project: 'Apache Helix'
  1. Apache Helix
  2. HELIX-655

Helix per-participant concurrent task throttling

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.6.x
    • Fix Version/s: None
    • Component/s: helix-core
    • Labels:
      None

      Description

      Overview

      Currently, all runnable jobs/tasks in Helix are equally treated. They are all scheduled according to the rebalancer algorithm. Means, their assignment might be different, but they will all be in RUNNING state.
      This may cause an issue if there are too many concurrently runnable jobs. When Helix controller starts all these jobs, the instances may be overload as they are assigning resources and executing all the tasks. As a result, the jobs won't be able to finish in a reasonable time window.
      The issue is even more critical to long run jobs. According to our meeting with Gobblin team, when a job is scheduled, they allocate resource for the job. So in the situation described above, more and more resources will be reserved for the pending jobs. The cluster will soon be exhausted.
      For solving the problem, an application needs to schedule jobs in a relatively low frequency (what Gobblin is doing now). This may cause low utilization.

      A better way to fix this issue, at framework level, is throttling jobs/tasks that are running concurrently, and allowing setting priority for different jobs to control total execute time.
      So given same amount of jobs, the cluster is in a better condition. As a result, jobs running in that cluster have a more controllable execute time.

      Existing related control mechanisms are:

      • ConcurrentTasksPerInstance for each job
      • ParallelJobs for each workflow
      • Threadpool limitation on the participant if user customizes TaskStateModelFactory.

      But none of them can directly help when concurrent workflows or jobs number is large. If an application keeps scheduling jobs/jobQueues, Helix will start any runnable jobs without considering the workload on the participants.
      The application may be able to carefully configures these items to achieve the goal. But they won't be able to easily find the sweet spot. Especially the cluster might be changing (scale out etc.).

      Problem summary

      1. All runnable tasks will start executing, which may overload the participant.
      2. Application needs a mechanism to prioritize important jobs (or workflows). Otherwise, important tasks may be blocked by other less important ones. And allocated resource is wasted.

      Feature proposed

      Based on our discussing, we proposed 2 features that can help to resolve the issue.

      1. Running task throttling on each participant. This is for avoiding overload.
      2. Job priority control that ensures high priority jobs are scheduled earlier.

      In addition, application can leverage workflow/job monitor items as feedback from Helix to adjust their stretgy.

        Issue Links

          Activity

          Hide
          jiajunwang Jiajun Wang added a comment - - edited

          Design

          Task Throttling Per Participant

          This limitation conceptually equals to max thread pool size in TaskStateModelFactory on the participant.
          If user constructs TaskStateModelFactory using their customized executor with a limited sized thread pool, that participant will never execute more tasks than the threshold.
          The problem is that since the limitation is not known by the controller, tasks will still be assigned to the participant. And they will be queued in participant thread pool and never re-assigned.

          It makes more sense to throttle tasks in the controller. At the same time that tasks are assigned to participants.
          Basically, a participant is configured with a "MaxRunnigTasksNumber". And the controller assigns task accordingly.

          pseudo code

          When calculating Best possible state in the JobRebalancer

          Foreach Job in RunnableJobs:
          TaskToParticipantMapping = CalculateAssignment(Job)
          Foreach MappingEntry in TaskToParticipantMapping:
          If Running_task + ToBeAssigned_task exceeds Participant_Task_Threshold:
          TaskToParticipantMapping.remove(MappingEntry)
          [Stretch] Try next applicable participant (consider task attached to resource)

          The above logic can be considered as a task queue algorithm. However, the original assignment will keep relying on current logic. So if all participants have enough capacity, tasks will still be evenly dispatched.

          participant configuration

          {
          "id" : "localhost_12918",
          "simpleFields" :

          { "HELIX_ENABLED" : "true", "HELIX_ENABLED_TIMESTAMP" : "1493326930182", "HELIX_HOST" : "localhost", "HELIX_PORT" : "12918", "MAX_RUNNING_TASK" : "55" }

          }

          Backward compatible

          For old participants, the controller assumes the thread pool is with a default capacity 40 (equal to default message handling thread pool size).

          Assumption

          Note that if some tasks have the workload that is much heavier than others, only control tasks number won't work.
          In this design, we assume that tasks have the approximately same workload.

          [Stretch] Optimization

          Existing JobRebalancer will be trigger every time a state change event happens. That means completely sorting all pending jobs/tasks and calculate assignment.
          A better strategy is to maintain a Job priority queue in the controller.
          When a job became runnable, enqueue.
          When a job is complete, dequeue.
          Any task state update, check participant capacity and assign the task from the queue if possible.
          This refactoring is considered as a stretch goal.

          Alternative option

          "GlobalMaxConcurrentJobNumber" Per Cluster

          Helix controller restricts the number of running jobs.
          However, with this throttling, once a job is scheduled, it will occupy the slot until the finish. This will be bad when all the running jobs are long-run. No new jobs will be scheduled.
          Moreover, it's harder for admin to set a reasonable total job count, given workflows and jobs are usually quite different regarding their real workload.
          Comparing these 2 options, "MaxRunnigTasksPerParticipant" is directly related to participant's capacity. Once the controller schedule tasks according to this, we can for sure avoid overloading the instances.
          Even we throttle jobs, there is no guarantee about the running thread in each participant.
          Moreover, a user can currently control job scheduling by adjusting the frequency of submitting jobs. So "GlobalMaxConcurrentJobNumber" is not necessary.

          Job Priority

          Given limited resource, which job we schedule first?

          Schedule the jobs with the highest priority first until participants are full

          In this design, we proposed the simplest solution for priority control.
          The user can configure job resource priority or Helix will assume "age" (time that the job was scheduled) as a priority.
          If part of the jobs is assigned priority, others are not, Helix will assume jobs with priority setting have higher priority.
          One issue here is that if the application keeps sending high priority jobs to Helix, lower priority jobs will be starving.
          Since this is controlled by the application (and mostly desired result), Helix won't apply any additional control on these starving jobs.
          Our plan is:
          Step 1. Support job "start time" based priority
          Step 2. Support user defined priority

          Alternative options

          Option 1. Using per-job and per-workflow concurrency control to implement priority

          WorkflowConfig.ParallelJobs and JobConfig.numConcurrentTasksPerInstance are used to control how many jobs and tasks can be executed in parallel within a single workflow.
          Given that the cluster administrators can configure these numbers "correctly", workflows will be assigned expected resources eventually.
          However, there is no promising that high priority workflows will be scheduled before others. This is because tasks are picked up randomly, so the controller may end up with fill the task pool with all items from low priority workflows.
          Cons

          1. Hard for users to setup the right numbers.
          2. Cannot strictly ensure priority.
          3. May lead to low utilization.

          Option 2. Jobs are assigned to execution slots according to priority

          Helix controller assigns different portions of resources (execute slots) to jobs according to their priority. For instance, we may have following assignment given the total capacity is 100.
          So high priority jobs will always get a larger portion of resources. If any job does not use all of its portions, our algorithm should be smart enough to assign those portion to other jobs.
          The problem of this method is complexity. In addition, since we are not assigning all possible resource to the highest priority jobs, those jobs are not guaranteed to be finished quickly, and users might feel confusing.

          Show
          jiajunwang Jiajun Wang added a comment - - edited Design Task Throttling Per Participant This limitation conceptually equals to max thread pool size in TaskStateModelFactory on the participant. If user constructs TaskStateModelFactory using their customized executor with a limited sized thread pool, that participant will never execute more tasks than the threshold. The problem is that since the limitation is not known by the controller, tasks will still be assigned to the participant. And they will be queued in participant thread pool and never re-assigned. It makes more sense to throttle tasks in the controller. At the same time that tasks are assigned to participants. Basically, a participant is configured with a "MaxRunnigTasksNumber". And the controller assigns task accordingly. pseudo code When calculating Best possible state in the JobRebalancer Foreach Job in RunnableJobs: TaskToParticipantMapping = CalculateAssignment(Job) Foreach MappingEntry in TaskToParticipantMapping: If Running_task + ToBeAssigned_task exceeds Participant_Task_Threshold: TaskToParticipantMapping.remove(MappingEntry) [Stretch] Try next applicable participant (consider task attached to resource) The above logic can be considered as a task queue algorithm. However, the original assignment will keep relying on current logic. So if all participants have enough capacity, tasks will still be evenly dispatched. participant configuration { "id" : "localhost_12918", "simpleFields" : { "HELIX_ENABLED" : "true", "HELIX_ENABLED_TIMESTAMP" : "1493326930182", "HELIX_HOST" : "localhost", "HELIX_PORT" : "12918", "MAX_RUNNING_TASK" : "55" } } Backward compatible For old participants, the controller assumes the thread pool is with a default capacity 40 (equal to default message handling thread pool size). Assumption Note that if some tasks have the workload that is much heavier than others, only control tasks number won't work. In this design, we assume that tasks have the approximately same workload. [Stretch] Optimization Existing JobRebalancer will be trigger every time a state change event happens. That means completely sorting all pending jobs/tasks and calculate assignment. A better strategy is to maintain a Job priority queue in the controller. When a job became runnable, enqueue. When a job is complete, dequeue. Any task state update, check participant capacity and assign the task from the queue if possible. This refactoring is considered as a stretch goal. Alternative option "GlobalMaxConcurrentJobNumber" Per Cluster Helix controller restricts the number of running jobs. However, with this throttling, once a job is scheduled, it will occupy the slot until the finish. This will be bad when all the running jobs are long-run. No new jobs will be scheduled. Moreover, it's harder for admin to set a reasonable total job count, given workflows and jobs are usually quite different regarding their real workload. Comparing these 2 options, "MaxRunnigTasksPerParticipant" is directly related to participant's capacity. Once the controller schedule tasks according to this, we can for sure avoid overloading the instances. Even we throttle jobs, there is no guarantee about the running thread in each participant. Moreover, a user can currently control job scheduling by adjusting the frequency of submitting jobs. So "GlobalMaxConcurrentJobNumber" is not necessary. Job Priority Given limited resource, which job we schedule first? Schedule the jobs with the highest priority first until participants are full In this design, we proposed the simplest solution for priority control. The user can configure job resource priority or Helix will assume "age" (time that the job was scheduled) as a priority. If part of the jobs is assigned priority, others are not, Helix will assume jobs with priority setting have higher priority. One issue here is that if the application keeps sending high priority jobs to Helix, lower priority jobs will be starving. Since this is controlled by the application (and mostly desired result), Helix won't apply any additional control on these starving jobs. Our plan is: Step 1. Support job "start time" based priority Step 2. Support user defined priority Alternative options Option 1. Using per-job and per-workflow concurrency control to implement priority WorkflowConfig.ParallelJobs and JobConfig.numConcurrentTasksPerInstance are used to control how many jobs and tasks can be executed in parallel within a single workflow. Given that the cluster administrators can configure these numbers "correctly", workflows will be assigned expected resources eventually. However, there is no promising that high priority workflows will be scheduled before others. This is because tasks are picked up randomly, so the controller may end up with fill the task pool with all items from low priority workflows. Cons Hard for users to setup the right numbers. Cannot strictly ensure priority. May lead to low utilization. Option 2. Jobs are assigned to execution slots according to priority Helix controller assigns different portions of resources (execute slots) to jobs according to their priority. For instance, we may have following assignment given the total capacity is 100. So high priority jobs will always get a larger portion of resources. If any job does not use all of its portions, our algorithm should be smart enough to assign those portion to other jobs. The problem of this method is complexity. In addition, since we are not assigning all possible resource to the highest priority jobs, those jobs are not guaranteed to be finished quickly, and users might feel confusing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user jiajunwang opened a pull request:

          https://github.com/apache/helix/pull/89

          HELIX-655 Helix per-participant concurrent task throttling

          Add per participant concurrent task throttling.

          1. Add a participant configuration item "MAX_CONCURRENT_TASK" for throttling.
          New assigned task + existing running/init task <= MAX_CONCURRENT_TASK. Otherwise, new assignment won't be included in best possible state.
          2. Tasks are assigned in the order of jobs' start time. Older jobs have higher priority than other jobs.
          3. Add test case (TestTaskThrottling.java) for testing new throttling and priority.

          Ticket:
          https://issues.apache.org/jira/browse/HELIX-655

          Test:
          mvn test in helix-core

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/jiajunwang/helix helix-0.6.x

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/helix/pull/89.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #89


          commit 1db00f63f7ef300b308fb50b7fa6651b771ba07d
          Author: Jiajun Wang <jjwang@linkedin.com>
          Date: 2017-05-03T07:50:49Z

          HELIX-655 Helix per-participant concurrent task throttling

          Add per participant concurrent task throttling.

          1. Add a participant configuration item "MAX_CONCURRENT_TASK" for throttling.
          New assigned task + existing running/init task <= MAX_CONCURRENT_TASK. Otherwise, new assignment won't be included in best possible state.
          2. Tasks are assigned in the order of jobs' start time. Older jobs have higher priority than other jobs.
          3. Add test case (TestTaskThrottling.java) for testing new throttling and priority.

          Ticket:
          https://issues.apache.org/jira/browse/HELIX-655

          Test:
          mvn test in helix-core


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user jiajunwang opened a pull request: https://github.com/apache/helix/pull/89 HELIX-655 Helix per-participant concurrent task throttling Add per participant concurrent task throttling. 1. Add a participant configuration item "MAX_CONCURRENT_TASK" for throttling. New assigned task + existing running/init task <= MAX_CONCURRENT_TASK. Otherwise, new assignment won't be included in best possible state. 2. Tasks are assigned in the order of jobs' start time. Older jobs have higher priority than other jobs. 3. Add test case (TestTaskThrottling.java) for testing new throttling and priority. Ticket: https://issues.apache.org/jira/browse/HELIX-655 Test: mvn test in helix-core You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiajunwang/helix helix-0.6.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/helix/pull/89.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #89 commit 1db00f63f7ef300b308fb50b7fa6651b771ba07d Author: Jiajun Wang <jjwang@linkedin.com> Date: 2017-05-03T07:50:49Z HELIX-655 Helix per-participant concurrent task throttling Add per participant concurrent task throttling. 1. Add a participant configuration item "MAX_CONCURRENT_TASK" for throttling. New assigned task + existing running/init task <= MAX_CONCURRENT_TASK. Otherwise, new assignment won't be included in best possible state. 2. Tasks are assigned in the order of jobs' start time. Older jobs have higher priority than other jobs. 3. Add test case (TestTaskThrottling.java) for testing new throttling and priority. Ticket: https://issues.apache.org/jira/browse/HELIX-655 Test: mvn test in helix-core
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r115829596

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java —
          @@ -204,4 +236,28 @@ private MappingCalculator getMappingCalculator(Rebalancer rebalancer, String res

          return mappingCalculator;
          }
          +
          + class JobResourcePriority implements Comparable<JobResourcePriority> {
          — End diff –

          I would name this differently. This object is not a "priority", but a job resource with priority. Maybe `JobResourceWithPriority`, `ComparableJobResource` or just `JobResource` ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r115829596 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java — @@ -204,4 +236,28 @@ private MappingCalculator getMappingCalculator(Rebalancer rebalancer, String res return mappingCalculator; } + + class JobResourcePriority implements Comparable<JobResourcePriority> { — End diff – I would name this differently. This object is not a "priority", but a job resource with priority. Maybe `JobResourceWithPriority`, `ComparableJobResource` or just `JobResource` ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r115848245

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java —
          @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String
          return partitionSet;
          }

          + /**
          + * Get the partitions count for each participant with the pending state and given resource state model
          + * @param resourceStateModel specified resource state model to look up
          + * @param state specified pending resource state to look up
          + * @return set of participants to partitions mapping
          + */
          + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) {
          + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>();
          + for (String resource : _pendingStateMap.keySet()) {
          + String stateModel = _resourceStateModelMap.get(resource);
          + if (stateModel != null && stateModel.equals(resourceStateModel)
          + || stateModel == null && resourceStateModel == null) {
          + for (Partition partition : _pendingStateMap.get(resource).keySet()) {
          + Map<String, Message> partitionMessage = _pendingStateMap.get(resource).get(partition);
          + for (Map.Entry<String, Message> participantMap : partitionMessage.entrySet()) {
          + String participant = participantMap.getKey();
          + if (!pendingPartitionCount.containsKey(participant))

          { + pendingPartitionCount.put(participant, 0); + }

          + String toState = participantMap.getValue().getToState();
          + if (toState != null && toState.equals(state) || toState == null && state == null) {
          — End diff –

          Same here as mentioned. To me it's a bit confusing at first glance.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r115848245 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java — @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String return partitionSet; } + /** + * Get the partitions count for each participant with the pending state and given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified pending resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) { + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>(); + for (String resource : _pendingStateMap.keySet()) { + String stateModel = _resourceStateModelMap.get(resource); + if (stateModel != null && stateModel.equals(resourceStateModel) + || stateModel == null && resourceStateModel == null) { + for (Partition partition : _pendingStateMap.get(resource).keySet()) { + Map<String, Message> partitionMessage = _pendingStateMap.get(resource).get(partition); + for (Map.Entry<String, Message> participantMap : partitionMessage.entrySet()) { + String participant = participantMap.getKey(); + if (!pendingPartitionCount.containsKey(participant)) { + pendingPartitionCount.put(participant, 0); + } + String toState = participantMap.getValue().getToState(); + if (toState != null && toState.equals(state) || toState == null && state == null) { — End diff – Same here as mentioned. To me it's a bit confusing at first glance.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r115849569

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java —
          @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String
          return partitionSet;
          }

          + /**
          + * Get the partitions count for each participant with the pending state and given resource state model
          + * @param resourceStateModel specified resource state model to look up
          + * @param state specified pending resource state to look up
          + * @return set of participants to partitions mapping
          + */
          + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) {
          + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>();
          + for (String resource : _pendingStateMap.keySet()) {
          + String stateModel = _resourceStateModelMap.get(resource);
          + if (stateModel != null && stateModel.equals(resourceStateModel)
          + || stateModel == null && resourceStateModel == null) {
          + for (Partition partition : _pendingStateMap.get(resource).keySet()) {
          + Map<String, Message> partitionMessage = _pendingStateMap.get(resource).get(partition);
          + for (Map.Entry<String, Message> participantMap : partitionMessage.entrySet()) {
          + String participant = participantMap.getKey();
          + if (!pendingPartitionCount.containsKey(participant))

          { + pendingPartitionCount.put(participant, 0); + }

          + String toState = participantMap.getValue().getToState();
          + if (toState != null && toState.equals(state) || toState == null && state == null)

          { + pendingPartitionCount.put(participant, pendingPartitionCount.get(participant) + 1); + }

          + }
          + }
          + }
          + }
          + return pendingPartitionCount;
          + }
          +
          + /**
          + * Get the partitions count for each participant in the current state and with given resource state model
          + * @param resourceStateModel specified resource state model to look up
          + * @param state specified current resource state to look up
          + * @return set of participants to partitions mapping
          + */
          + public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel, String state) {
          — End diff –

          This is similar with the above method, would it be better to combine them together?
          I see that `_pendingStateMap` contains Messages instead of Strings, making it a bit hard to abstract. But look, its name is "_pendingStateMap", shouldn't it contain the pending state, instead of the message?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r115849569 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java — @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String return partitionSet; } + /** + * Get the partitions count for each participant with the pending state and given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified pending resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) { + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>(); + for (String resource : _pendingStateMap.keySet()) { + String stateModel = _resourceStateModelMap.get(resource); + if (stateModel != null && stateModel.equals(resourceStateModel) + || stateModel == null && resourceStateModel == null) { + for (Partition partition : _pendingStateMap.get(resource).keySet()) { + Map<String, Message> partitionMessage = _pendingStateMap.get(resource).get(partition); + for (Map.Entry<String, Message> participantMap : partitionMessage.entrySet()) { + String participant = participantMap.getKey(); + if (!pendingPartitionCount.containsKey(participant)) { + pendingPartitionCount.put(participant, 0); + } + String toState = participantMap.getValue().getToState(); + if (toState != null && toState.equals(state) || toState == null && state == null) { + pendingPartitionCount.put(participant, pendingPartitionCount.get(participant) + 1); + } + } + } + } + } + return pendingPartitionCount; + } + + /** + * Get the partitions count for each participant in the current state and with given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified current resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel, String state) { — End diff – This is similar with the above method, would it be better to combine them together? I see that `_pendingStateMap` contains Messages instead of Strings, making it a bit hard to abstract. But look, its name is "_pendingStateMap", shouldn't it contain the pending state, instead of the message?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r115853776

          — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java —
          @@ -704,4 +712,30 @@ private PartitionAssignment(String instance, String state)

          { _state = state; }

          }
          +
          + /**
          + * Reset RUNNING/INIT tasks count in JobRebalancer
          + */
          + public static void resetActiveTaskCount(Collection<String> liveInstances, CurrentStateOutput currentStateOutput) {
          + // init participant map
          + for (String liveInstance : liveInstances)

          { + participantActiveTaskCount.put(liveInstance, 0); + }

          + // Active task == init and running tasks
          + fillActiveTaskCount(currentStateOutput.getPartitionCountWithPendingState(TaskConstants.STATE_MODEL_NAME,
          — End diff –

          Is there any case where `INIT` is the toState?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r115853776 — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java — @@ -704,4 +712,30 @@ private PartitionAssignment(String instance, String state) { _state = state; } } + + /** + * Reset RUNNING/INIT tasks count in JobRebalancer + */ + public static void resetActiveTaskCount(Collection<String> liveInstances, CurrentStateOutput currentStateOutput) { + // init participant map + for (String liveInstance : liveInstances) { + participantActiveTaskCount.put(liveInstance, 0); + } + // Active task == init and running tasks + fillActiveTaskCount(currentStateOutput.getPartitionCountWithPendingState(TaskConstants.STATE_MODEL_NAME, — End diff – Is there any case where `INIT` is the toState?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r115852400

          — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java —
          @@ -57,6 +57,7 @@
          new FixedTargetTaskAssignmentCalculator();
          private static TaskAssignmentCalculator genericTaskAssignmentCal =
          new GenericTaskAssignmentCalculator();
          + private static Map<String, Integer> participantActiveTaskCount = new HashMap<String, Integer>();
          — End diff –

          This variable should start with `_`. I believe `genericTaskAssignmentCal` above was wrong.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r115852400 — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java — @@ -57,6 +57,7 @@ new FixedTargetTaskAssignmentCalculator(); private static TaskAssignmentCalculator genericTaskAssignmentCal = new GenericTaskAssignmentCalculator(); + private static Map<String, Integer> participantActiveTaskCount = new HashMap<String, Integer>(); — End diff – This variable should start with `_`. I believe `genericTaskAssignmentCal` above was wrong.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r115830770

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java —
          @@ -90,60 +96,86 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource

          BestPossibleStateOutput output = new BestPossibleStateOutput();

          • for (String resourceName : resourceMap.keySet()) {
          • logger.debug("Processing resource:" + resourceName);
            + // Reset current INIT/RUNNING tasks on participants for throttling
            + JobRebalancer.resetActiveTaskCount(cache.getLiveInstances().keySet(), currentStateOutput);
          • Resource resource = resourceMap.get(resourceName);
          • // Ideal state may be gone. In that case we need to get the state model name
          • // from the current state
          • IdealState idealState = cache.getIdealState(resourceName);
            + PriorityQueue<JobResourcePriority> jobResourceQueue = new PriorityQueue<JobResourcePriority>();
              • End diff –

          I don't quite get why this is a queue. It's created in this stage, a bunch of things added in, and iterated through. Is there any reason not using a sorted set?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r115830770 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java — @@ -90,60 +96,86 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource BestPossibleStateOutput output = new BestPossibleStateOutput(); for (String resourceName : resourceMap.keySet()) { logger.debug("Processing resource:" + resourceName); + // Reset current INIT/RUNNING tasks on participants for throttling + JobRebalancer.resetActiveTaskCount(cache.getLiveInstances().keySet(), currentStateOutput); Resource resource = resourceMap.get(resourceName); // Ideal state may be gone. In that case we need to get the state model name // from the current state IdealState idealState = cache.getIdealState(resourceName); + PriorityQueue<JobResourcePriority> jobResourceQueue = new PriorityQueue<JobResourcePriority>(); End diff – I don't quite get why this is a queue. It's created in this stage, a bunch of things added in, and iterated through. Is there any reason not using a sorted set?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r115847749

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java —
          @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String
          return partitionSet;
          }

          + /**
          + * Get the partitions count for each participant with the pending state and given resource state model
          + * @param resourceStateModel specified resource state model to look up
          + * @param state specified pending resource state to look up
          + * @return set of participants to partitions mapping
          + */
          + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) {
          + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>();
          + for (String resource : _pendingStateMap.keySet()) {
          + String stateModel = _resourceStateModelMap.get(resource);
          + if (stateModel != null && stateModel.equals(resourceStateModel)
          — End diff –

          IMHO it's generally better to group conditions together so that people don't need to google and confirm that "&& takes precedence over ||".

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r115847749 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java — @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String return partitionSet; } + /** + * Get the partitions count for each participant with the pending state and given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified pending resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) { + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>(); + for (String resource : _pendingStateMap.keySet()) { + String stateModel = _resourceStateModelMap.get(resource); + if (stateModel != null && stateModel.equals(resourceStateModel) — End diff – IMHO it's generally better to group conditions together so that people don't need to google and confirm that "&& takes precedence over ||".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r115851381

          — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java —
          @@ -424,9 +425,15 @@ private ResourceAssignment computeResourceMapping(String jobResource,
          .contains(instance))

          { continue; }

          + // 1. throttled by job configuration
          // Contains the set of task partitions currently assigned to the instance.
          Set<Integer> pSet = entry.getValue();

          • int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
            + int jobCfgLimitation = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
              • End diff –

          Would it be better to make this small part a separate method?
          `int numToAssign = getNumToAssign(...)`
          This method is already too long.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r115851381 — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java — @@ -424,9 +425,15 @@ private ResourceAssignment computeResourceMapping(String jobResource, .contains(instance)) { continue; } + // 1. throttled by job configuration // Contains the set of task partitions currently assigned to the instance. Set<Integer> pSet = entry.getValue(); int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size(); + int jobCfgLimitation = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size(); End diff – Would it be better to make this small part a separate method? `int numToAssign = getNumToAssign(...)` This method is already too long.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r116554013

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java —
          @@ -90,60 +96,86 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource

          BestPossibleStateOutput output = new BestPossibleStateOutput();

          • for (String resourceName : resourceMap.keySet()) {
          • logger.debug("Processing resource:" + resourceName);
            + // Reset current INIT/RUNNING tasks on participants for throttling
            + JobRebalancer.resetActiveTaskCount(cache.getLiveInstances().keySet(), currentStateOutput);
          • Resource resource = resourceMap.get(resourceName);
          • // Ideal state may be gone. In that case we need to get the state model name
          • // from the current state
          • IdealState idealState = cache.getIdealState(resourceName);
            + PriorityQueue<JobResourcePriority> jobResourceQueue = new PriorityQueue<JobResourcePriority>();
              • End diff –

          SortedSet is an interface, so you mean using TreeSet?
          Why do you think that will be better? I think there is no big difference between a TreeSet and PriorityQueue here.

          Besides, we do want to implement some kind of job priority here, as you can see. So PriorityQueue is easier for others to understand.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r116554013 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java — @@ -90,60 +96,86 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource BestPossibleStateOutput output = new BestPossibleStateOutput(); for (String resourceName : resourceMap.keySet()) { logger.debug("Processing resource:" + resourceName); + // Reset current INIT/RUNNING tasks on participants for throttling + JobRebalancer.resetActiveTaskCount(cache.getLiveInstances().keySet(), currentStateOutput); Resource resource = resourceMap.get(resourceName); // Ideal state may be gone. In that case we need to get the state model name // from the current state IdealState idealState = cache.getIdealState(resourceName); + PriorityQueue<JobResourcePriority> jobResourceQueue = new PriorityQueue<JobResourcePriority>(); End diff – SortedSet is an interface, so you mean using TreeSet? Why do you think that will be better? I think there is no big difference between a TreeSet and PriorityQueue here. Besides, we do want to implement some kind of job priority here, as you can see. So PriorityQueue is easier for others to understand.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r116554664

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java —
          @@ -204,4 +236,28 @@ private MappingCalculator getMappingCalculator(Rebalancer rebalancer, String res

          return mappingCalculator;
          }
          +
          + class JobResourcePriority implements Comparable<JobResourcePriority> {
          — End diff –

          From my point of view, this is a priority with a job reference. So call it "priority" is fine.
          JobResourceWithPriority is too verbose.
          And ComparableJobResource/JobResource implies that it is another kind of job resource class. They seem to be even more confusing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r116554664 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java — @@ -204,4 +236,28 @@ private MappingCalculator getMappingCalculator(Rebalancer rebalancer, String res return mappingCalculator; } + + class JobResourcePriority implements Comparable<JobResourcePriority> { — End diff – From my point of view, this is a priority with a job reference. So call it "priority" is fine. JobResourceWithPriority is too verbose. And ComparableJobResource/JobResource implies that it is another kind of job resource class. They seem to be even more confusing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r116555244

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java —
          @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String
          return partitionSet;
          }

          + /**
          + * Get the partitions count for each participant with the pending state and given resource state model
          + * @param resourceStateModel specified resource state model to look up
          + * @param state specified pending resource state to look up
          + * @return set of participants to partitions mapping
          + */
          + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) {
          + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>();
          + for (String resource : _pendingStateMap.keySet()) {
          + String stateModel = _resourceStateModelMap.get(resource);
          + if (stateModel != null && stateModel.equals(resourceStateModel)
          — End diff –

          I don't think most of us need to google to know that...
          But I agree that it could be safer to add parentheses, just to make it safer.
          Thanks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r116555244 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java — @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String return partitionSet; } + /** + * Get the partitions count for each participant with the pending state and given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified pending resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) { + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>(); + for (String resource : _pendingStateMap.keySet()) { + String stateModel = _resourceStateModelMap.get(resource); + if (stateModel != null && stateModel.equals(resourceStateModel) — End diff – I don't think most of us need to google to know that... But I agree that it could be safer to add parentheses, just to make it safer. Thanks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r116556982

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java —
          @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String
          return partitionSet;
          }

          + /**
          + * Get the partitions count for each participant with the pending state and given resource state model
          + * @param resourceStateModel specified resource state model to look up
          + * @param state specified pending resource state to look up
          + * @return set of participants to partitions mapping
          + */
          + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) {
          + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>();
          + for (String resource : _pendingStateMap.keySet()) {
          + String stateModel = _resourceStateModelMap.get(resource);
          + if (stateModel != null && stateModel.equals(resourceStateModel)
          + || stateModel == null && resourceStateModel == null) {
          + for (Partition partition : _pendingStateMap.get(resource).keySet()) {
          + Map<String, Message> partitionMessage = _pendingStateMap.get(resource).get(partition);
          + for (Map.Entry<String, Message> participantMap : partitionMessage.entrySet()) {
          + String participant = participantMap.getKey();
          + if (!pendingPartitionCount.containsKey(participant))

          { + pendingPartitionCount.put(participant, 0); + }

          + String toState = participantMap.getValue().getToState();
          + if (toState != null && toState.equals(state) || toState == null && state == null)

          { + pendingPartitionCount.put(participant, pendingPartitionCount.get(participant) + 1); + }

          + }
          + }
          + }
          + }
          + return pendingPartitionCount;
          + }
          +
          + /**
          + * Get the partitions count for each participant in the current state and with given resource state model
          + * @param resourceStateModel specified resource state model to look up
          + * @param state specified current resource state to look up
          + * @return set of participants to partitions mapping
          + */
          + public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel, String state) {
          — End diff –

          It is doable. But that won't reduce code.
          Moreover, keeping separate methods is more flexible.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r116556982 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java — @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String return partitionSet; } + /** + * Get the partitions count for each participant with the pending state and given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified pending resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) { + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>(); + for (String resource : _pendingStateMap.keySet()) { + String stateModel = _resourceStateModelMap.get(resource); + if (stateModel != null && stateModel.equals(resourceStateModel) + || stateModel == null && resourceStateModel == null) { + for (Partition partition : _pendingStateMap.get(resource).keySet()) { + Map<String, Message> partitionMessage = _pendingStateMap.get(resource).get(partition); + for (Map.Entry<String, Message> participantMap : partitionMessage.entrySet()) { + String participant = participantMap.getKey(); + if (!pendingPartitionCount.containsKey(participant)) { + pendingPartitionCount.put(participant, 0); + } + String toState = participantMap.getValue().getToState(); + if (toState != null && toState.equals(state) || toState == null && state == null) { + pendingPartitionCount.put(participant, pendingPartitionCount.get(participant) + 1); + } + } + } + } + } + return pendingPartitionCount; + } + + /** + * Get the partitions count for each participant in the current state and with given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified current resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel, String state) { — End diff – It is doable. But that won't reduce code. Moreover, keeping separate methods is more flexible.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r116557426

          — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java —
          @@ -57,6 +57,7 @@
          new FixedTargetTaskAssignmentCalculator();
          private static TaskAssignmentCalculator genericTaskAssignmentCal =
          new GenericTaskAssignmentCalculator();
          + private static Map<String, Integer> participantActiveTaskCount = new HashMap<String, Integer>();
          — End diff –

          Good catch! Should be _participantActiveTaskCount.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r116557426 — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java — @@ -57,6 +57,7 @@ new FixedTargetTaskAssignmentCalculator(); private static TaskAssignmentCalculator genericTaskAssignmentCal = new GenericTaskAssignmentCalculator(); + private static Map<String, Integer> participantActiveTaskCount = new HashMap<String, Integer>(); — End diff – Good catch! Should be _participantActiveTaskCount.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r116559458

          — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java —
          @@ -704,4 +712,30 @@ private PartitionAssignment(String instance, String state)

          { _state = state; }

          }
          +
          + /**
          + * Reset RUNNING/INIT tasks count in JobRebalancer
          + */
          + public static void resetActiveTaskCount(Collection<String> liveInstances, CurrentStateOutput currentStateOutput) {
          + // init participant map
          + for (String liveInstance : liveInstances)

          { + participantActiveTaskCount.put(liveInstance, 0); + }

          + // Active task == init and running tasks
          + fillActiveTaskCount(currentStateOutput.getPartitionCountWithPendingState(TaskConstants.STATE_MODEL_NAME,
          — End diff –

          Yes, at least based on task state model.
          And even there is not, it will be safer to add INIT check for pendingState.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r116559458 — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java — @@ -704,4 +712,30 @@ private PartitionAssignment(String instance, String state) { _state = state; } } + + /** + * Reset RUNNING/INIT tasks count in JobRebalancer + */ + public static void resetActiveTaskCount(Collection<String> liveInstances, CurrentStateOutput currentStateOutput) { + // init participant map + for (String liveInstance : liveInstances) { + participantActiveTaskCount.put(liveInstance, 0); + } + // Active task == init and running tasks + fillActiveTaskCount(currentStateOutput.getPartitionCountWithPendingState(TaskConstants.STATE_MODEL_NAME, — End diff – Yes, at least based on task state model. And even there is not, it will be safer to add INIT check for pendingState.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r116564901

          — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java —
          @@ -424,9 +425,15 @@ private ResourceAssignment computeResourceMapping(String jobResource,
          .contains(instance))

          { continue; }

          + // 1. throttled by job configuration
          // Contains the set of task partitions currently assigned to the instance.
          Set<Integer> pSet = entry.getValue();

          • int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
            + int jobCfgLimitation = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size();
              • End diff –

          I get your point.
          But it is not quite necessary at this moment.
          I prefer to split this huge method when there is any re-used logic.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r116564901 — Diff: helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java — @@ -424,9 +425,15 @@ private ResourceAssignment computeResourceMapping(String jobResource, .contains(instance)) { continue; } + // 1. throttled by job configuration // Contains the set of task partitions currently assigned to the instance. Set<Integer> pSet = entry.getValue(); int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size(); + int jobCfgLimitation = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size(); End diff – I get your point. But it is not quite necessary at this moment. I prefer to split this huge method when there is any re-used logic.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r116566511

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java —
          @@ -90,60 +96,86 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource

          BestPossibleStateOutput output = new BestPossibleStateOutput();

          • for (String resourceName : resourceMap.keySet()) {
          • logger.debug("Processing resource:" + resourceName);
            + // Reset current INIT/RUNNING tasks on participants for throttling
            + JobRebalancer.resetActiveTaskCount(cache.getLiveInstances().keySet(), currentStateOutput);
          • Resource resource = resourceMap.get(resourceName);
          • // Ideal state may be gone. In that case we need to get the state model name
          • // from the current state
          • IdealState idealState = cache.getIdealState(resourceName);
            + PriorityQueue<JobResourcePriority> jobResourceQueue = new PriorityQueue<JobResourcePriority>();
              • End diff –

          What I meant was, I don't seem to see any usage of a "queue".
          Right now we're not producing jobs into the queue while at the same time consuming jobs from the queue. Right now we have a pre-created set of jobs, sorted by priority, then just looped once. I don't see the necessity of using a priority queue.
          Does it make sense?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r116566511 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java — @@ -90,60 +96,86 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource BestPossibleStateOutput output = new BestPossibleStateOutput(); for (String resourceName : resourceMap.keySet()) { logger.debug("Processing resource:" + resourceName); + // Reset current INIT/RUNNING tasks on participants for throttling + JobRebalancer.resetActiveTaskCount(cache.getLiveInstances().keySet(), currentStateOutput); Resource resource = resourceMap.get(resourceName); // Ideal state may be gone. In that case we need to get the state model name // from the current state IdealState idealState = cache.getIdealState(resourceName); + PriorityQueue<JobResourcePriority> jobResourceQueue = new PriorityQueue<JobResourcePriority>(); End diff – What I meant was, I don't seem to see any usage of a "queue". Right now we're not producing jobs into the queue while at the same time consuming jobs from the queue. Right now we have a pre-created set of jobs, sorted by priority, then just looped once. I don't see the necessity of using a priority queue. Does it make sense?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kongweihan commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r116567854

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java —
          @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String
          return partitionSet;
          }

          + /**
          + * Get the partitions count for each participant with the pending state and given resource state model
          + * @param resourceStateModel specified resource state model to look up
          + * @param state specified pending resource state to look up
          + * @return set of participants to partitions mapping
          + */
          + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) {
          + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>();
          + for (String resource : _pendingStateMap.keySet()) {
          + String stateModel = _resourceStateModelMap.get(resource);
          + if (stateModel != null && stateModel.equals(resourceStateModel)
          + || stateModel == null && resourceStateModel == null) {
          + for (Partition partition : _pendingStateMap.get(resource).keySet()) {
          + Map<String, Message> partitionMessage = _pendingStateMap.get(resource).get(partition);
          + for (Map.Entry<String, Message> participantMap : partitionMessage.entrySet()) {
          + String participant = participantMap.getKey();
          + if (!pendingPartitionCount.containsKey(participant))

          { + pendingPartitionCount.put(participant, 0); + }

          + String toState = participantMap.getValue().getToState();
          + if (toState != null && toState.equals(state) || toState == null && state == null)

          { + pendingPartitionCount.put(participant, pendingPartitionCount.get(participant) + 1); + }

          + }
          + }
          + }
          + }
          + return pendingPartitionCount;
          + }
          +
          + /**
          + * Get the partitions count for each participant in the current state and with given resource state model
          + * @param resourceStateModel specified resource state model to look up
          + * @param state specified current resource state to look up
          + * @return set of participants to partitions mapping
          + */
          + public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel, String state) {
          — End diff –

          If 2 similar methods are replaced with 1 method, why wouldn't it reduce the code redundancy?

          Show
          githubbot ASF GitHub Bot added a comment - Github user kongweihan commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r116567854 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java — @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String return partitionSet; } + /** + * Get the partitions count for each participant with the pending state and given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified pending resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) { + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>(); + for (String resource : _pendingStateMap.keySet()) { + String stateModel = _resourceStateModelMap.get(resource); + if (stateModel != null && stateModel.equals(resourceStateModel) + || stateModel == null && resourceStateModel == null) { + for (Partition partition : _pendingStateMap.get(resource).keySet()) { + Map<String, Message> partitionMessage = _pendingStateMap.get(resource).get(partition); + for (Map.Entry<String, Message> participantMap : partitionMessage.entrySet()) { + String participant = participantMap.getKey(); + if (!pendingPartitionCount.containsKey(participant)) { + pendingPartitionCount.put(participant, 0); + } + String toState = participantMap.getValue().getToState(); + if (toState != null && toState.equals(state) || toState == null && state == null) { + pendingPartitionCount.put(participant, pendingPartitionCount.get(participant) + 1); + } + } + } + } + } + return pendingPartitionCount; + } + + /** + * Get the partitions count for each participant in the current state and with given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified current resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel, String state) { — End diff – If 2 similar methods are replaced with 1 method, why wouldn't it reduce the code redundancy?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r116571005

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java —
          @@ -90,60 +96,86 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource

          BestPossibleStateOutput output = new BestPossibleStateOutput();

          • for (String resourceName : resourceMap.keySet()) {
          • logger.debug("Processing resource:" + resourceName);
            + // Reset current INIT/RUNNING tasks on participants for throttling
            + JobRebalancer.resetActiveTaskCount(cache.getLiveInstances().keySet(), currentStateOutput);
          • Resource resource = resourceMap.get(resourceName);
          • // Ideal state may be gone. In that case we need to get the state model name
          • // from the current state
          • IdealState idealState = cache.getIdealState(resourceName);
            + PriorityQueue<JobResourcePriority> jobResourceQueue = new PriorityQueue<JobResourcePriority>();
              • End diff –

          Got your point.
          But we don't need to use all features of the queue to leverage it here.
          If the performance is bad, we shall consider an alternative. But that's not the case.
          And even we use "sortedSet", there is no usage of a "set" here. So, to me, there is no different.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r116571005 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java — @@ -90,60 +96,86 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource BestPossibleStateOutput output = new BestPossibleStateOutput(); for (String resourceName : resourceMap.keySet()) { logger.debug("Processing resource:" + resourceName); + // Reset current INIT/RUNNING tasks on participants for throttling + JobRebalancer.resetActiveTaskCount(cache.getLiveInstances().keySet(), currentStateOutput); Resource resource = resourceMap.get(resourceName); // Ideal state may be gone. In that case we need to get the state model name // from the current state IdealState idealState = cache.getIdealState(resourceName); + PriorityQueue<JobResourcePriority> jobResourceQueue = new PriorityQueue<JobResourcePriority>(); End diff – Got your point. But we don't need to use all features of the queue to leverage it here. If the performance is bad, we shall consider an alternative. But that's not the case. And even we use "sortedSet", there is no usage of a "set" here. So, to me, there is no different.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r116576058

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java —
          @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String
          return partitionSet;
          }

          + /**
          + * Get the partitions count for each participant with the pending state and given resource state model
          + * @param resourceStateModel specified resource state model to look up
          + * @param state specified pending resource state to look up
          + * @return set of participants to partitions mapping
          + */
          + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) {
          + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>();
          + for (String resource : _pendingStateMap.keySet()) {
          + String stateModel = _resourceStateModelMap.get(resource);
          + if (stateModel != null && stateModel.equals(resourceStateModel)
          + || stateModel == null && resourceStateModel == null) {
          + for (Partition partition : _pendingStateMap.get(resource).keySet()) {
          + Map<String, Message> partitionMessage = _pendingStateMap.get(resource).get(partition);
          + for (Map.Entry<String, Message> participantMap : partitionMessage.entrySet()) {
          + String participant = participantMap.getKey();
          + if (!pendingPartitionCount.containsKey(participant))

          { + pendingPartitionCount.put(participant, 0); + }

          + String toState = participantMap.getValue().getToState();
          + if (toState != null && toState.equals(state) || toState == null && state == null)

          { + pendingPartitionCount.put(participant, pendingPartitionCount.get(participant) + 1); + }

          + }
          + }
          + }
          + }
          + return pendingPartitionCount;
          + }
          +
          + /**
          + * Get the partitions count for each participant in the current state and with given resource state model
          + * @param resourceStateModel specified resource state model to look up
          + * @param state specified current resource state to look up
          + * @return set of participants to partitions mapping
          + */
          + public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel, String state) {
          — End diff –

          They are different methods.
          As you said, types in the maps are different. So will need to introduce additional code for processing the Message map. As a result, final code lines won't be reduced significantly.

          More like:
          1. Convert Map<String, Map<Partition, Map<String, Message>>> _pendingStateMap to Map<String, Map<Partition, Map<String, String>>> tempPendingStateMap
          2. Call method public Map<String, Integer> getPartitionCount(String resourceStateModel, String state, Map<String, Map<Partition, Map<String, String>>> stateMap) to get the final result.

          Please let me know if you have a better idea. Otherwise, it seems not quite worth the change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r116576058 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java — @@ -260,13 +260,72 @@ public Message getPendingState(String resourceName, Partition partition, String return partitionSet; } + /** + * Get the partitions count for each participant with the pending state and given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified pending resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithPendingState(String resourceStateModel, String state) { + Map<String, Integer> pendingPartitionCount = new HashMap<String, Integer>(); + for (String resource : _pendingStateMap.keySet()) { + String stateModel = _resourceStateModelMap.get(resource); + if (stateModel != null && stateModel.equals(resourceStateModel) + || stateModel == null && resourceStateModel == null) { + for (Partition partition : _pendingStateMap.get(resource).keySet()) { + Map<String, Message> partitionMessage = _pendingStateMap.get(resource).get(partition); + for (Map.Entry<String, Message> participantMap : partitionMessage.entrySet()) { + String participant = participantMap.getKey(); + if (!pendingPartitionCount.containsKey(participant)) { + pendingPartitionCount.put(participant, 0); + } + String toState = participantMap.getValue().getToState(); + if (toState != null && toState.equals(state) || toState == null && state == null) { + pendingPartitionCount.put(participant, pendingPartitionCount.get(participant) + 1); + } + } + } + } + } + return pendingPartitionCount; + } + + /** + * Get the partitions count for each participant in the current state and with given resource state model + * @param resourceStateModel specified resource state model to look up + * @param state specified current resource state to look up + * @return set of participants to partitions mapping + */ + public Map<String, Integer> getPartitionCountWithCurrentState(String resourceStateModel, String state) { — End diff – They are different methods. As you said, types in the maps are different. So will need to introduce additional code for processing the Message map. As a result, final code lines won't be reduced significantly. More like: 1. Convert Map<String, Map<Partition, Map<String, Message>>> _pendingStateMap to Map<String, Map<Partition, Map<String, String>>> tempPendingStateMap 2. Call method public Map<String, Integer> getPartitionCount(String resourceStateModel, String state, Map<String, Map<Partition, Map<String, String>>> stateMap) to get the final result. Please let me know if you have a better idea. Otherwise, it seems not quite worth the change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on the issue:

          https://github.com/apache/helix/pull/89

          Rebase to latest change and apply one additional change as suggested by lei-xia.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on the issue: https://github.com/apache/helix/pull/89 Rebase to latest change and apply one additional change as suggested by lei-xia.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user lei-xia commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r119443872

          — Diff: helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java —
          @@ -408,6 +410,18 @@ public void setInstanceEnabledForPartition(String resourceName, String partition
          }
          }

          + /**
          + * Get maximum allowed running task count on this instance
          + * @return the maximum task count
          + */
          + public int getMaxConcurrentTask() {
          — End diff –

          make this config option also available in cluster config? In most of cases, user would choose same value for all of its nodes, this makes them easier to set just once instead of in each instance config.

          If the value is set in both cluster config and some of instance config, the value in instance config can take effect.

          Show
          githubbot ASF GitHub Bot added a comment - Github user lei-xia commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r119443872 — Diff: helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java — @@ -408,6 +410,18 @@ public void setInstanceEnabledForPartition(String resourceName, String partition } } + /** + * Get maximum allowed running task count on this instance + * @return the maximum task count + */ + public int getMaxConcurrentTask() { — End diff – make this config option also available in cluster config? In most of cases, user would choose same value for all of its nodes, this makes them easier to set just once instead of in each instance config. If the value is set in both cluster config and some of instance config, the value in instance config can take effect.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user lei-xia commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r119444125

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java —
          @@ -90,61 +96,84 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource

          BestPossibleStateOutput output = new BestPossibleStateOutput();

          • for (String resourceName : resourceMap.keySet()) {
          • logger.debug("Processing resource:" + resourceName);
            + // Reset current INIT/RUNNING tasks on participants for throttling
            + JobRebalancer.resetActiveTaskCount(cache.getLiveInstances().keySet(), currentStateOutput);
          • Resource resource = resourceMap.get(resourceName);
          • // Ideal state may be gone. In that case we need to get the state model name
          • // from the current state
          • IdealState idealState = cache.getIdealState(resourceName);
            + PriorityQueue<JobResourcePriority> jobResourceQueue = new PriorityQueue<JobResourcePriority>();
              • End diff –

          We can use the queue for all resources, separating only task-related resources and puting it into the queue makes the logic less generic here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user lei-xia commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r119444125 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java — @@ -90,61 +96,84 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource BestPossibleStateOutput output = new BestPossibleStateOutput(); for (String resourceName : resourceMap.keySet()) { logger.debug("Processing resource:" + resourceName); + // Reset current INIT/RUNNING tasks on participants for throttling + JobRebalancer.resetActiveTaskCount(cache.getLiveInstances().keySet(), currentStateOutput); Resource resource = resourceMap.get(resourceName); // Ideal state may be gone. In that case we need to get the state model name // from the current state IdealState idealState = cache.getIdealState(resourceName); + PriorityQueue<JobResourcePriority> jobResourceQueue = new PriorityQueue<JobResourcePriority>(); End diff – We can use the queue for all resources, separating only task-related resources and puting it into the queue makes the logic less generic here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r119465719

          — Diff: helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java —
          @@ -408,6 +410,18 @@ public void setInstanceEnabledForPartition(String resourceName, String partition
          }
          }

          + /**
          + * Get maximum allowed running task count on this instance
          + * @return the maximum task count
          + */
          + public int getMaxConcurrentTask() {
          — End diff –

          Good point. I will make the change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r119465719 — Diff: helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java — @@ -408,6 +410,18 @@ public void setInstanceEnabledForPartition(String resourceName, String partition } } + /** + * Get maximum allowed running task count on this instance + * @return the maximum task count + */ + public int getMaxConcurrentTask() { — End diff – Good point. I will make the change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on a diff in the pull request:

          https://github.com/apache/helix/pull/89#discussion_r119465769

          — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java —
          @@ -90,61 +96,84 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource

          BestPossibleStateOutput output = new BestPossibleStateOutput();

          • for (String resourceName : resourceMap.keySet()) {
          • logger.debug("Processing resource:" + resourceName);
            + // Reset current INIT/RUNNING tasks on participants for throttling
            + JobRebalancer.resetActiveTaskCount(cache.getLiveInstances().keySet(), currentStateOutput);
          • Resource resource = resourceMap.get(resourceName);
          • // Ideal state may be gone. In that case we need to get the state model name
          • // from the current state
          • IdealState idealState = cache.getIdealState(resourceName);
            + PriorityQueue<JobResourcePriority> jobResourceQueue = new PriorityQueue<JobResourcePriority>();
              • End diff –

          Will change.
          Thanks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on a diff in the pull request: https://github.com/apache/helix/pull/89#discussion_r119465769 — Diff: helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java — @@ -90,61 +96,84 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource BestPossibleStateOutput output = new BestPossibleStateOutput(); for (String resourceName : resourceMap.keySet()) { logger.debug("Processing resource:" + resourceName); + // Reset current INIT/RUNNING tasks on participants for throttling + JobRebalancer.resetActiveTaskCount(cache.getLiveInstances().keySet(), currentStateOutput); Resource resource = resourceMap.get(resourceName); // Ideal state may be gone. In that case we need to get the state model name // from the current state IdealState idealState = cache.getIdealState(resourceName); + PriorityQueue<JobResourcePriority> jobResourceQueue = new PriorityQueue<JobResourcePriority>(); End diff – Will change. Thanks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang commented on the issue:

          https://github.com/apache/helix/pull/89

          Closed since master branch updated.

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang commented on the issue: https://github.com/apache/helix/pull/89 Closed since master branch updated.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user jiajunwang closed the pull request at:

          https://github.com/apache/helix/pull/89

          Show
          githubbot ASF GitHub Bot added a comment - Github user jiajunwang closed the pull request at: https://github.com/apache/helix/pull/89
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user jiajunwang opened a pull request:

          https://github.com/apache/helix/pull/100

          HELIX-655 Helix per-participant concurrent task throttling

          Add per participant concurrent task throttling.

          Add a participant configuration item "MAX_CONCURRENT_TASK" for throttling.
          New assigned task + existing running/init task <= MAX_CONCURRENT_TASK. Otherwise, new assignment won't be included in best possible state.
          Tasks are assigned in the order of jobs' start time. Older jobs have higher priority than other jobs.
          Add test case (TestTaskThrottling.java) for testing new throttling and priority.

          Ticket:
          https://issues.apache.org/jira/browse/HELIX-655

          Test:
          mvn test in helix-core

          Please refer to previous discussions in another pull request:
          https://github.com/apache/helix/pull/89

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/jiajunwang/helix master

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/helix/pull/100.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #100


          commit 20685cf6e1276aaa1bf6264c1fe6a3173081d22c
          Author: Jiajun Wang <jjwang@linkedin.com>
          Date: 2017-05-31T21:57:23Z

          Helix per-participant concurrent task throttling

          Add per participant concurrent task throttling.

          1. Add a participant configuration item "MAX_CONCURRENT_TASK" for throttling setting.
          Add cluster configuration item "MAX_CONCURRENT_TASK_PER_INSTANCE" as the default throttling settings.
          New assigned task + existing running/init task <= MAX_CONCURRENT_TASK. Otherwise, new assignment won't be included in best possible state.
          2. Tasks are assigned in the order of jobs' start time. Older jobs have higher priority than other jobs and regular resources.
          3. Add test case (TestTaskThrottling.java) for testing new throttling and priority.

          Ticket:
          https://issues.apache.org/jira/browse/HELIX-655

          Test:
          mvn test in helix-core

          commit e35fe4fffc952f7ccae7bfa4cbf89ef75e404a53
          Author: Jiajun Wang <jjwang@linkedin.com>
          Date: 2017-06-03T06:26:50Z

          Add workflow configuration to allow or disallow assigning multiple jobs to one instance.

          By default, it is not allowed that Helix assigns multiple jobs in one workflow to the same instances.
          If it is set to be true, the instance can start executing multiple jobs in each workflow.

          When application sets max tasks throttling for the participants, allowing overlapping assignment can maximize utilization.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user jiajunwang opened a pull request: https://github.com/apache/helix/pull/100 HELIX-655 Helix per-participant concurrent task throttling Add per participant concurrent task throttling. Add a participant configuration item "MAX_CONCURRENT_TASK" for throttling. New assigned task + existing running/init task <= MAX_CONCURRENT_TASK. Otherwise, new assignment won't be included in best possible state. Tasks are assigned in the order of jobs' start time. Older jobs have higher priority than other jobs. Add test case (TestTaskThrottling.java) for testing new throttling and priority. Ticket: https://issues.apache.org/jira/browse/HELIX-655 Test: mvn test in helix-core Please refer to previous discussions in another pull request: https://github.com/apache/helix/pull/89 You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiajunwang/helix master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/helix/pull/100.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #100 commit 20685cf6e1276aaa1bf6264c1fe6a3173081d22c Author: Jiajun Wang <jjwang@linkedin.com> Date: 2017-05-31T21:57:23Z Helix per-participant concurrent task throttling Add per participant concurrent task throttling. 1. Add a participant configuration item "MAX_CONCURRENT_TASK" for throttling setting. Add cluster configuration item "MAX_CONCURRENT_TASK_PER_INSTANCE" as the default throttling settings. New assigned task + existing running/init task <= MAX_CONCURRENT_TASK. Otherwise, new assignment won't be included in best possible state. 2. Tasks are assigned in the order of jobs' start time. Older jobs have higher priority than other jobs and regular resources. 3. Add test case (TestTaskThrottling.java) for testing new throttling and priority. Ticket: https://issues.apache.org/jira/browse/HELIX-655 Test: mvn test in helix-core commit e35fe4fffc952f7ccae7bfa4cbf89ef75e404a53 Author: Jiajun Wang <jjwang@linkedin.com> Date: 2017-06-03T06:26:50Z Add workflow configuration to allow or disallow assigning multiple jobs to one instance. By default, it is not allowed that Helix assigns multiple jobs in one workflow to the same instances. If it is set to be true, the instance can start executing multiple jobs in each workflow. When application sets max tasks throttling for the participants, allowing overlapping assignment can maximize utilization.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/helix/pull/100

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/helix/pull/100

            People

            • Assignee:
              dasahcc Junkai Xue
              Reporter:
              jiajunwang Jiajun Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development