Uploaded image for project: 'Hadoop Common'
  1. Hadoop Common
  2. HADOOP-3445

Implementing core scheduler functionality in Resource Manager (V1) for Hadoop



    • New Feature
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • 0.19.0
    • None
    • None
    • Reviewed
    • Hide
      Introduced Capacity Task Scheduler.
      Introduced Capacity Task Scheduler.


      The architecture for the Hadoop Resource Manager (V1) is described in HADOOP-3444. This Jira proposes implementation details on the core scheduling piece - the changes to the JT to handle Orgs, queues, guaranteed capacities, user limits, and ultimately, scheduling a task on a TT.

      As per the architecture in HADOOP-3444, the JT contains a new component, Job Queue Manager (JQM), to handle queues of jobs. Each queue represents a queue in an Org (one queue per Org). Job queues are backed up by disk based storage.

      We now look at some details. Terminology:

      • A queue has excess capacity if it does not have enough jobs (queued or running) to take up its guaranteed capacity. Excess capacity needs to be distributed to queues that have none.
      • Queues that have given up excess capacity to other queues are called low queues, for the sake of this discussion. Queues that are running on additional capacity are called high queues.

      For each queue, the JT keeps track of the following:

      • Guaranteed capacity (GC): the capacity guaranteed to the queue, set up through configuration. The sum of all GCs is equal to the grid capacity. Since we're handling Map and Reduce slots differently, we will have a GC for each, i.e., a CG-M for maps and a GC-R for reducers. The sum of all GC-Ms is equal to the sum of all map slots available in the Grid, and the same for GC-Rs.
      • Allocated capacity (AC): the current capacity of the queue. This can be higher or lower than the GC because of excess capacity distribution. The sum of all ACs is equal to the grid capacity. As above, each queue will have a AC-M and AC-R.
      • Timer for claiming containers: can just be the # of seconds the queue can wait till it needs its capacity back. There will be separate timers for claiming map and reduce slots (we will likely want to take more time to claim reduce slots, as reducers take longer to run).
      • # of containers being used, i.e., the number of running tasks associated with the queue (C-RUN). Each queue will have a C-RUN-M and C-RUN-R.
      • Whether any jobs are queued.
      • The number of Map and Reduce containers used by each user.

      Every once in a while (this can be done periodically, or based on events), the JT looks at redistributing capacity. This can result in excess capacity being given to queues that need them, and capacity being claimed by queues.

      Excess capacity is redistributed as follows:

      • The algorithm below is in terms of tasks, which can be map or reduce tasks. It is the same for both. The JT will run the algorithm to redistribute excess capacity for both Maps and Reduces.
      • The JT checks each queue to see if it has excess capacity. A queue has excess capacity if the number of running tasks associated with the queue is less than the allocated capacity of the queue (i.e., if C-RUN < AC) and there are no jobs queued.
        • Note: a tighter definition is if C-RUN plus the number of tasks required by the waiting jobs is less than AC, but we don't need that level of detail.
      • If there is at least one queue with excess capacity, the total excess capacity is the sum of excess capacities of each queue. The JT figures out the queues that this capacity can be distributed to. These are queues that need capacity, where C-RUN = AC (i.e., the queue is running at max capacity) and there are queued jobs.
      • The JT now figures out how much excess capacity to distribute to each queue that needs it. This can be done in many ways.
        • Distribute capacity in the ratio of each Org's guaranteed capacity. So if queues Q1, Q2, and Q3 have guaranteed capacities of GC1, GC2, and GC3, and if Q3 has N containers of excess capacity, Q1 gets (GC1*N)/(GC1+GC2) additional capacity, while Q2 gets (GC2*N)/(GC1+GC2).
        • You could use some other ratio that uses the number of waiting jobs. The more waiting jobs a queue has, the more its share of excess capacity.
      • For each queue that needs capacity, the JT increments its AC with the capacity it is allocated. At the same time, the JT appropriately decrements the AC of queues with excess capacity.

      Excess capacity is reclaimed as follows:

      • The algorithm below is in terms of tasks, which can be map or reduce tasks. It is the same for both. The JT will run the algorithm to reclaim excess capacity for both Maps and Reduces.
      • The JT determines which queues are low queues (if AC < GC). If a low queue has a job waiting, then we need to reclaim its resources. Capacity to be reclaimed = GC-AC.
      • Capacity is re-claimed from any of the high queues (where AC > GC).
      • JT decrements the AC of the high queue from which capacity is to be claimed, and increments the AC of the low queue. The decremented AC of the high queue cannot go below its GC, so the low queue may get its capacity back from more than one queue.
      • The JT also starts a timer for the low queue (this can be an actual timer, or just a count, perhaps representing seconds, which can be decremented by the JT periodically).
      • If a timer goes off, the JT needs to instruct some high queue to kill some of their tasks. How do we decide which high queues to claim capacity from?
        • The candidates are those high queues which are running more tasks than they should be, i.e., where C-RUN > AC.
        • Among these queues, the JT can pick those that are using the most excess capacity (i.e. queues with a higher value for (C-RUN - AC)/AC ).
      • How does a high queue decide which tasks to kill?
        • Ideally, you want to kill tasks that have started recently or made the least progress. You might want to use the same algorithm you use to decide which tasks to speculatively run (though that algorithm needs to be fixed).
        • Note: it is expensive to kill tasks, so we need to focus on getting better at deciding which tasks to kill.

      Within a queue, a user's limit can dynamically change depending on how many users have submitted jobs. This needs to be handled in a way similar to how we handle excess capacity between queues.

      When a TT has a free Map slot:

      1. TT contacts JT to give it a Map task to run.
      2. JT figures out which queue to approach first (among all queues that have capacity, i.e., where C-RUN-M < AC-M). This can be done in a few ways:
        • Round-robin, so every queue/Org has the same chance to get a free container.
        • JT can pick the queue with the maximum unused capacity.
      3. JT needs to pick a job which can use the slot.
        • If it has no running jobs from that queue, it gets one from the JQM.
          • JT asks for the first Job in the selected queue, via the JQM. If the job's user's limit is maxed out, the job is returned to the queue and JT asks for the next job. This continues till the JT finds a suitable job.
          • Or else, JT has a list of users in the queue whose jobs it is running, and it can figure out which of these users are over their limit. It asks the JQM for the first job in the queue whose user is not in a list of maxed-out users it provides.
        • If the JT already has a list of running jobs from the queue, it looks at each (in order of priority) till it finds one whose user's limit has not been exceeded.
      4. If there is no job in the queue that is eligible to run (the queue may have no queued jobs), the JT picks another queue using the same steps.
      5. The JT figures out which Map task from the job to run on the free TT using the same algorithm as today (find a locality match using the job's cache, then look for failed tasks or tasks on the rack, etc).
      6. JT increments C-RUN-M and the number of Map containers used by the job's user. It then returns the task to the TT.

      When a TT has a free Reduce slot: This is similar to what happens with a free Map slot, except that:

      • we can use a different algorithm to decide which Reduce task to run from a give job. I'm not sure what we do today for Reduce tasks (I think we just pick the first one), but if it needs to be improved, that's a separate issue.
      • Since there is no preemption of jobs based on priorities, we will not have the situation where a job's Reducers are blocking containers as they're waiting for Maps to run and there are no Map slots to run.

      When a task fails or completes: JT decrements C-RUN and the # of containers used by the user.


        1. 3445.1.patch
          51 kB
          Vivek Ratan
        2. 3445.2.patch
          65 kB
          Vivek Ratan
        3. 3445.3.patch
          58 kB
          Vivek Ratan
        4. 3445.4.patch
          81 kB
          Vivek Ratan
        5. 3445.5.patch
          103 kB
          Vivek Ratan
        6. 3445.6.patch
          156 kB
          Vivek Ratan
        7. 3445.7.patch
          165 kB
          Vivek Ratan
        8. 3445.8.patch
          165 kB
          Vivek Ratan
        9. 3445.9.patch
          113 kB
          Vivek Ratan
        10. 3445.10.patch
          158 kB
          Vivek Ratan
        11. 3445.12.patch
          151 kB
          Owen O'Malley
        12. 3445.13.patch
          157 kB
          Vivek Ratan

        Issue Links



              vivekr Vivek Ratan
              vivekr Vivek Ratan
              0 Vote for this issue
              17 Start watching this issue