Hadoop Common
  1. Hadoop Common
  2. HADOOP-3445

Implementing core scheduler functionality in Resource Manager (V1) for Hadoop

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.19.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Hide
      Introduced Capacity Task Scheduler.
      Show
      Introduced Capacity Task Scheduler.

      Description

      The architecture for the Hadoop Resource Manager (V1) is described in HADOOP-3444. This Jira proposes implementation details on the core scheduling piece - the changes to the JT to handle Orgs, queues, guaranteed capacities, user limits, and ultimately, scheduling a task on a TT.

      As per the architecture in HADOOP-3444, the JT contains a new component, Job Queue Manager (JQM), to handle queues of jobs. Each queue represents a queue in an Org (one queue per Org). Job queues are backed up by disk based storage.

      We now look at some details. Terminology:

      • A queue has excess capacity if it does not have enough jobs (queued or running) to take up its guaranteed capacity. Excess capacity needs to be distributed to queues that have none.
      • Queues that have given up excess capacity to other queues are called low queues, for the sake of this discussion. Queues that are running on additional capacity are called high queues.

      For each queue, the JT keeps track of the following:

      • Guaranteed capacity (GC): the capacity guaranteed to the queue, set up through configuration. The sum of all GCs is equal to the grid capacity. Since we're handling Map and Reduce slots differently, we will have a GC for each, i.e., a CG-M for maps and a GC-R for reducers. The sum of all GC-Ms is equal to the sum of all map slots available in the Grid, and the same for GC-Rs.
      • Allocated capacity (AC): the current capacity of the queue. This can be higher or lower than the GC because of excess capacity distribution. The sum of all ACs is equal to the grid capacity. As above, each queue will have a AC-M and AC-R.
      • Timer for claiming containers: can just be the # of seconds the queue can wait till it needs its capacity back. There will be separate timers for claiming map and reduce slots (we will likely want to take more time to claim reduce slots, as reducers take longer to run).
      • # of containers being used, i.e., the number of running tasks associated with the queue (C-RUN). Each queue will have a C-RUN-M and C-RUN-R.
      • Whether any jobs are queued.
      • The number of Map and Reduce containers used by each user.

      Every once in a while (this can be done periodically, or based on events), the JT looks at redistributing capacity. This can result in excess capacity being given to queues that need them, and capacity being claimed by queues.

      Excess capacity is redistributed as follows:

      • The algorithm below is in terms of tasks, which can be map or reduce tasks. It is the same for both. The JT will run the algorithm to redistribute excess capacity for both Maps and Reduces.
      • The JT checks each queue to see if it has excess capacity. A queue has excess capacity if the number of running tasks associated with the queue is less than the allocated capacity of the queue (i.e., if C-RUN < AC) and there are no jobs queued.
        • Note: a tighter definition is if C-RUN plus the number of tasks required by the waiting jobs is less than AC, but we don't need that level of detail.
      • If there is at least one queue with excess capacity, the total excess capacity is the sum of excess capacities of each queue. The JT figures out the queues that this capacity can be distributed to. These are queues that need capacity, where C-RUN = AC (i.e., the queue is running at max capacity) and there are queued jobs.
      • The JT now figures out how much excess capacity to distribute to each queue that needs it. This can be done in many ways.
        • Distribute capacity in the ratio of each Org's guaranteed capacity. So if queues Q1, Q2, and Q3 have guaranteed capacities of GC1, GC2, and GC3, and if Q3 has N containers of excess capacity, Q1 gets (GC1*N)/(GC1+GC2) additional capacity, while Q2 gets (GC2*N)/(GC1+GC2).
        • You could use some other ratio that uses the number of waiting jobs. The more waiting jobs a queue has, the more its share of excess capacity.
      • For each queue that needs capacity, the JT increments its AC with the capacity it is allocated. At the same time, the JT appropriately decrements the AC of queues with excess capacity.

      Excess capacity is reclaimed as follows:

      • The algorithm below is in terms of tasks, which can be map or reduce tasks. It is the same for both. The JT will run the algorithm to reclaim excess capacity for both Maps and Reduces.
      • The JT determines which queues are low queues (if AC < GC). If a low queue has a job waiting, then we need to reclaim its resources. Capacity to be reclaimed = GC-AC.
      • Capacity is re-claimed from any of the high queues (where AC > GC).
      • JT decrements the AC of the high queue from which capacity is to be claimed, and increments the AC of the low queue. The decremented AC of the high queue cannot go below its GC, so the low queue may get its capacity back from more than one queue.
      • The JT also starts a timer for the low queue (this can be an actual timer, or just a count, perhaps representing seconds, which can be decremented by the JT periodically).
      • If a timer goes off, the JT needs to instruct some high queue to kill some of their tasks. How do we decide which high queues to claim capacity from?
        • The candidates are those high queues which are running more tasks than they should be, i.e., where C-RUN > AC.
        • Among these queues, the JT can pick those that are using the most excess capacity (i.e. queues with a higher value for (C-RUN - AC)/AC ).
      • How does a high queue decide which tasks to kill?
        • Ideally, you want to kill tasks that have started recently or made the least progress. You might want to use the same algorithm you use to decide which tasks to speculatively run (though that algorithm needs to be fixed).
        • Note: it is expensive to kill tasks, so we need to focus on getting better at deciding which tasks to kill.

      Within a queue, a user's limit can dynamically change depending on how many users have submitted jobs. This needs to be handled in a way similar to how we handle excess capacity between queues.

      When a TT has a free Map slot:

      1. TT contacts JT to give it a Map task to run.
      2. JT figures out which queue to approach first (among all queues that have capacity, i.e., where C-RUN-M < AC-M). This can be done in a few ways:
        • Round-robin, so every queue/Org has the same chance to get a free container.
        • JT can pick the queue with the maximum unused capacity.
      3. JT needs to pick a job which can use the slot.
        • If it has no running jobs from that queue, it gets one from the JQM.
          • JT asks for the first Job in the selected queue, via the JQM. If the job's user's limit is maxed out, the job is returned to the queue and JT asks for the next job. This continues till the JT finds a suitable job.
          • Or else, JT has a list of users in the queue whose jobs it is running, and it can figure out which of these users are over their limit. It asks the JQM for the first job in the queue whose user is not in a list of maxed-out users it provides.
        • If the JT already has a list of running jobs from the queue, it looks at each (in order of priority) till it finds one whose user's limit has not been exceeded.
      4. If there is no job in the queue that is eligible to run (the queue may have no queued jobs), the JT picks another queue using the same steps.
      5. The JT figures out which Map task from the job to run on the free TT using the same algorithm as today (find a locality match using the job's cache, then look for failed tasks or tasks on the rack, etc).
      6. JT increments C-RUN-M and the number of Map containers used by the job's user. It then returns the task to the TT.

      When a TT has a free Reduce slot: This is similar to what happens with a free Map slot, except that:

      • we can use a different algorithm to decide which Reduce task to run from a give job. I'm not sure what we do today for Reduce tasks (I think we just pick the first one), but if it needs to be improved, that's a separate issue.
      • Since there is no preemption of jobs based on priorities, we will not have the situation where a job's Reducers are blocking containers as they're waiting for Maps to run and there are no Map slots to run.

      When a task fails or completes: JT decrements C-RUN and the # of containers used by the user.

      1. 3445.13.patch
        157 kB
        Vivek Ratan
      2. 3445.12.patch
        151 kB
        Owen O'Malley
      3. 3445.10.patch
        158 kB
        Vivek Ratan
      4. 3445.9.patch
        113 kB
        Vivek Ratan
      5. 3445.8.patch
        165 kB
        Vivek Ratan
      6. 3445.7.patch
        165 kB
        Vivek Ratan
      7. 3445.6.patch
        156 kB
        Vivek Ratan
      8. 3445.5.patch
        103 kB
        Vivek Ratan
      9. 3445.4.patch
        81 kB
        Vivek Ratan
      10. 3445.3.patch
        58 kB
        Vivek Ratan
      11. 3445.2.patch
        65 kB
        Vivek Ratan
      12. 3445.1.patch
        51 kB
        Vivek Ratan

        Issue Links

          Activity

          Vivek Ratan created issue -
          Vivek Ratan made changes -
          Field Original Value New Value
          Link This issue is part of HADOOP-3444 [ HADOOP-3444 ]
          Doug Cutting made changes -
          Link This issue is blocked by HADOOP-3412 [ HADOOP-3412 ]
          Hide
          Vivek Ratan added a comment -

          Given that we now have queues of jobs (technically, JobInProgress objects) in the JT, I'm thinking that we should replace the List jobsByPriority (in the JT) with a number of lists, one per queue, and we have a hashmap of queue names to queue lists. Any code that works on jobsByPriority now works on one or more of these lists, but the logic remains the same.

          These multiple lists, plus the hashmap that maps queue names to each list, is really what the JobQueueManager component is, in the arch diagram in HADOOP-3444. One question is, should we have a separate class called JobQueueManager that manages the multiple lists and the hashmap, or should we just leave the hashmap as is in the JT code. While there are benefits to having a separate class, I personally prefer the latter approach - code changes will be minimal and we don't really need a level of encapsulation yet. Any preferences?

          Show
          Vivek Ratan added a comment - Given that we now have queues of jobs (technically, JobInProgress objects) in the JT, I'm thinking that we should replace the List jobsByPriority (in the JT) with a number of lists, one per queue, and we have a hashmap of queue names to queue lists. Any code that works on jobsByPriority now works on one or more of these lists, but the logic remains the same. These multiple lists, plus the hashmap that maps queue names to each list, is really what the JobQueueManager component is, in the arch diagram in HADOOP-3444 . One question is, should we have a separate class called JobQueueManager that manages the multiple lists and the hashmap, or should we just leave the hashmap as is in the JT code. While there are benefits to having a separate class, I personally prefer the latter approach - code changes will be minimal and we don't really need a level of encapsulation yet. Any preferences?
          Hide
          Vivek Ratan added a comment -

          By the way, sorry Doug, on the long description in this Jira. I'd already created the Jira when you suggested having shorter description fields in HADOOP-3421. Next time.

          Show
          Vivek Ratan added a comment - By the way, sorry Doug, on the long description in this Jira. I'd already created the Jira when you suggested having shorter description fields in HADOOP-3421 . Next time.
          Hemanth Yamijala made changes -
          Link This issue is blocked by HADOOP-3479 [ HADOOP-3479 ]
          Hide
          Vivek Ratan added a comment - - edited

          I'm attaching the first patch (3445.1.patch). This is a partial patch that has support for:

          • for queues and jobs submitted to queues
          • guaranteed capacities, priorities, and user limits
          • redistribution of capacities (without preemption)
            In essence, this patch implements the following requirements (see HADOOP-3421) : 1.1-1.5, 2.1, 3.1-3.3, 4.1.

          The purpose of this patch is to get the basic code reviewed, and there is a non-trivial amount of it. This is not a complete patch. The following remains to be done:

          • preemption, when reclaiming capacity (req 1.6)
          • unit tests
          • others such as ACLs, rereading of configuration etc, which will be tracked elsewhere.
          • cleanup, such as better variable/method names, etc.

          This patch assumes that the patch for HADOOP-3479 is committed or applied.

          Here's some explanation of what this patch incorporates:

          • redistribution of capacity: class RedistributeCapacity implements a runnable class that periodically invokes code to redistribute capacity. The time interval is defined by
            REDISTRIBUTE_CAPACITY_INTERVAL, which has a default value of 5 secs, but can be set in the config file.
          • Since we now have queues of jobs, the ArrayList jobsByPriority is replaced by jobQueues, which is a hashmap of queue names to individual lists of JobInProgress objects.
          • A new class, QueueSchedulingInfo (QSI), has been introduced. This class keeps track of per-queue information required for the new scheduling algorithms. It's really just a collection of fields. queueInfoMap is a hashmap that maps a queue name to its QSI object. We also keep two sorted lists of QSI objects (qsiForMaps and qsiForReduces), one for maps and one for reduces. The class UnusuedCapacityComparator implements Comparator and is used to sort these two lists, based on unused capacities. The JT constructor creates QSI objects for each queue and populates the other data structures accordingly.
          • There's new code to handle redistribution of capacities, along with detailed comments on how the algorithm works. This documentation and code starts at line 1695.
          • getNewTaskForTaskTracker() has been changed. If a TT has free map and reduce slots, we first decide whether to give it a Map or Reduce task. This logic was earlier based on computing M/R loads, but as I had explained in an email to core-dev, it seemed unnecessary and also had a few problems. Now that we have a central scheduler that can look across multiple jobs, the logic to pick a Map or Reduce task can be simplified. I pick one, depending on how many unused Map or Reduce slots the TT has. We can probably do better, but this seems like a decent start. Once we decide whether we need a Map or Reduce task, we pick a queue (based on how far behind the queue is; again, there are probably better/different ways to do this, one of which is suggested). we also pick a queue, and then a job in the queue, based on how much capacity the queue is using and user limits of the highest priority jobs in the queue.
          • submitJob() has also been changed. When a job gets submitted, it gets placed in the right position in the right queue.
          • JobInProgress and TaskInProgress have been updated to keep track of speculative tasks. This lets us ultimately keep task of how many tasks in a job need to run and how many are running, which ties in to capacity planning.
          • JobInProgress is also changed to expose the queue a job is submitted to. If no queue is mentioned in the user's conf, the job is assigned to the first queue (to maintain backwards compatibility).

          Please note that the algorithms in the code are a bit different than what was detailed in the description of this Jira as they've evolved over time. Please also note that these are not the best algorithms, and that it is assumed that over time, we will get a lot better at refining them. But they enable us to get started.

          Show
          Vivek Ratan added a comment - - edited I'm attaching the first patch (3445.1.patch). This is a partial patch that has support for: for queues and jobs submitted to queues guaranteed capacities, priorities, and user limits redistribution of capacities (without preemption) In essence, this patch implements the following requirements (see HADOOP-3421 ) : 1.1-1.5, 2.1, 3.1-3.3, 4.1. The purpose of this patch is to get the basic code reviewed, and there is a non-trivial amount of it. This is not a complete patch. The following remains to be done: preemption, when reclaiming capacity (req 1.6) unit tests others such as ACLs, rereading of configuration etc, which will be tracked elsewhere. cleanup, such as better variable/method names, etc. This patch assumes that the patch for HADOOP-3479 is committed or applied. Here's some explanation of what this patch incorporates: redistribution of capacity: class RedistributeCapacity implements a runnable class that periodically invokes code to redistribute capacity. The time interval is defined by REDISTRIBUTE_CAPACITY_INTERVAL , which has a default value of 5 secs, but can be set in the config file. Since we now have queues of jobs, the ArrayList jobsByPriority is replaced by jobQueues , which is a hashmap of queue names to individual lists of JobInProgress objects. A new class, QueueSchedulingInfo (QSI) , has been introduced. This class keeps track of per-queue information required for the new scheduling algorithms. It's really just a collection of fields. queueInfoMap is a hashmap that maps a queue name to its QSI object. We also keep two sorted lists of QSI objects (qsiForMaps and qsiForReduces), one for maps and one for reduces. The class UnusuedCapacityComparator implements Comparator and is used to sort these two lists, based on unused capacities. The JT constructor creates QSI objects for each queue and populates the other data structures accordingly. There's new code to handle redistribution of capacities, along with detailed comments on how the algorithm works. This documentation and code starts at line 1695. getNewTaskForTaskTracker() has been changed. If a TT has free map and reduce slots, we first decide whether to give it a Map or Reduce task. This logic was earlier based on computing M/R loads, but as I had explained in an email to core-dev, it seemed unnecessary and also had a few problems. Now that we have a central scheduler that can look across multiple jobs, the logic to pick a Map or Reduce task can be simplified. I pick one, depending on how many unused Map or Reduce slots the TT has. We can probably do better, but this seems like a decent start. Once we decide whether we need a Map or Reduce task, we pick a queue (based on how far behind the queue is; again, there are probably better/different ways to do this, one of which is suggested). we also pick a queue, and then a job in the queue, based on how much capacity the queue is using and user limits of the highest priority jobs in the queue. submitJob() has also been changed. When a job gets submitted, it gets placed in the right position in the right queue. JobInProgress and TaskInProgress have been updated to keep track of speculative tasks. This lets us ultimately keep task of how many tasks in a job need to run and how many are running, which ties in to capacity planning. JobInProgress is also changed to expose the queue a job is submitted to. If no queue is mentioned in the user's conf, the job is assigned to the first queue (to maintain backwards compatibility). Please note that the algorithms in the code are a bit different than what was detailed in the description of this Jira as they've evolved over time. Please also note that these are not the best algorithms, and that it is assumed that over time, we will get a lot better at refining them. But they enable us to get started.
          Vivek Ratan made changes -
          Attachment 3445.1.patch [ 12385384 ]
          Arun C Murthy made changes -
          Assignee Vivek Ratan [ vivekr ]
          Hide
          Arun C Murthy added a comment -

          Vivek, this is looking good. Some early comments, I'll post more once I take a detailed look at the core scheduling algorithms:

          1. I'd suggest we move the queue-related infrastructure to it's own files/packages. E.g. QueueScheudlingInfo (should we just call in Queue?). This will help ensure that the already bloated JobTracker...
          2. Should we consider an abstraction to ensure multiple implementations of the Queues? E.g. FIFO v/s best-fit (ala HADOOP-3412)
          3. Should we consider a DFA (HADOOP-1395) to help manipulate the various states of the individual queues (capacity available -> neutral -> over allocated etc.)

          Show
          Arun C Murthy added a comment - Vivek, this is looking good. Some early comments, I'll post more once I take a detailed look at the core scheduling algorithms: 1. I'd suggest we move the queue-related infrastructure to it's own files/packages. E.g. QueueScheudlingInfo (should we just call in Queue?). This will help ensure that the already bloated JobTracker... 2. Should we consider an abstraction to ensure multiple implementations of the Queues? E.g. FIFO v/s best-fit (ala HADOOP-3412 ) 3. Should we consider a DFA ( HADOOP-1395 ) to help manipulate the various states of the individual queues (capacity available -> neutral -> over allocated etc.)
          Hide
          Vivek Ratan added a comment -

          1. I'd suggest we move the queue-related infrastructure to it's own files/packages. E.g. QueueScheudlingInfo (should we just call in Queue?). This will help ensure that the already bloated JobTracker...

          2. Should we consider an abstraction to ensure multiple implementations of the Queues? E.g. FIFO v/s best-fit (ala HADOOP-3412)

          There will be some non-trivial refactoring of the code to support HADOOP-3412. Your point is well taken - we should avoid code-bloat in the JobTracker. Let's wait till 3412 is committed so I can do some refactoring here. In the meantime, maybe it's worth just focusing on the algorithms for deciding what task to pick, and for capacity redistribution and reclaiming.

          3. Should we consider a DFA (HADOOP-1395) to help manipulate the various states of the individual queues (capacity available -> neutral -> over allocated etc.)

          That's a valid suggestion. But, state machines are, in general, hard to code and understand, and I personally avoid them if I can. I don't see a need for a state machine here - we have 5 distinct sets of queues that we compute when we redistribute capacity, and we only compute these sets periodically and only for the purpose of redistribution, so state machines seem like an overkill. We also don't care what the state of a queue was, during the last redistribution. You simply drop a queue into one or more of the 5 buckets, then you transfer/reclaim capacities between queues in various buckets. State machines don't seem like a natural fit.

          If you disagree, what would help is if you can point out sections of the code which are hard to understand and perhaps provide examples of how DFAs would simplify either writing or reading the code, or both.

          Show
          Vivek Ratan added a comment - 1. I'd suggest we move the queue-related infrastructure to it's own files/packages. E.g. QueueScheudlingInfo (should we just call in Queue?). This will help ensure that the already bloated JobTracker... 2. Should we consider an abstraction to ensure multiple implementations of the Queues? E.g. FIFO v/s best-fit (ala HADOOP-3412 ) There will be some non-trivial refactoring of the code to support HADOOP-3412 . Your point is well taken - we should avoid code-bloat in the JobTracker. Let's wait till 3412 is committed so I can do some refactoring here. In the meantime, maybe it's worth just focusing on the algorithms for deciding what task to pick, and for capacity redistribution and reclaiming. 3. Should we consider a DFA ( HADOOP-1395 ) to help manipulate the various states of the individual queues (capacity available -> neutral -> over allocated etc.) That's a valid suggestion. But, state machines are, in general, hard to code and understand, and I personally avoid them if I can. I don't see a need for a state machine here - we have 5 distinct sets of queues that we compute when we redistribute capacity, and we only compute these sets periodically and only for the purpose of redistribution, so state machines seem like an overkill. We also don't care what the state of a queue was, during the last redistribution. You simply drop a queue into one or more of the 5 buckets, then you transfer/reclaim capacities between queues in various buckets. State machines don't seem like a natural fit. If you disagree, what would help is if you can point out sections of the code which are hard to understand and perhaps provide examples of how DFAs would simplify either writing or reading the code, or both.
          Hide
          Arun C Murthy added a comment -

          I spent time on the core scheduling algorithms...

          1. The core algorithms seem to be coming along fairly well. Overall we have 2 kinds of scheduling algorithms: one for balancing needs/capacities of the queues and other for picking a queue and then a task from a job of that queue. We just need to test them both, especially the former, to make sure they get baked in.
          2. I'm concerned about the maintainability of 'redistributeCapacity'. Maybe it would help if we maintained the relevant information such as the state of each queue (i.e. needAndUnder, needAndNeutral etc.). Also, we need to refactor the code to which actually redistributes the capacity.
          3. Similarly we should maintain information rather than rebuild it in updateQueueSchedulingInfo.
          4. We probably should define an ennumeration for the map/reduce index rather than use integers.

          Show
          Arun C Murthy added a comment - I spent time on the core scheduling algorithms... 1. The core algorithms seem to be coming along fairly well. Overall we have 2 kinds of scheduling algorithms: one for balancing needs/capacities of the queues and other for picking a queue and then a task from a job of that queue. We just need to test them both, especially the former, to make sure they get baked in. 2. I'm concerned about the maintainability of 'redistributeCapacity'. Maybe it would help if we maintained the relevant information such as the state of each queue (i.e. needAndUnder, needAndNeutral etc.). Also, we need to refactor the code to which actually redistributes the capacity. 3. Similarly we should maintain information rather than rebuild it in updateQueueSchedulingInfo. 4. We probably should define an ennumeration for the map/reduce index rather than use integers.
          Hide
          Vivek Ratan added a comment -

          Thanks Arun.

          I'm concerned about the maintainability of 'redistributeCapacity'. [...] Also, we need to refactor the code to which actually redistributes the capacity.

          What exactly are you worried about, in terms of maintainability? Will the code be hard to maintain? How so? Regarding refactoring, can be you be more specific? the algorithm for redistributing capacity has a few steps. I've placed some code in helper methods. How differently do you recommend this code be refactored?

          Similarly we should maintain information rather than rebuild it in updateQueueSchedulingInfo.

          The code to maintain the QueueSchedulingInfo structures actively is a bit complex. You need to catch every instance where tasks are updated. Building the structures only when needed is a nice compromise, IMO. They're pretty quick to build, and the code's a lot smaller and focussed in one place. Plus, you build them when you need them, so your algorithms are still correct. Furthermore, you don't need to track all events - you only need to keep track of some cumulatiev numbers, so you might actually do less computation if you build them lazily. Is there a specific reason why you'd like to see information being maintained accurately rather than being rebuilt?

          Having said this, things might change once we look at porting this to 3412.

          4. We probably should define an ennumeration for the map/reduce index rather than use integers.

          Fair point. These really are indexes into an array, and its' extremely unlikely we'll add more enum values. I suspect they will become private to the scheduler, so this may be a moot issue.

          Show
          Vivek Ratan added a comment - Thanks Arun. I'm concerned about the maintainability of 'redistributeCapacity'. [...] Also, we need to refactor the code to which actually redistributes the capacity. What exactly are you worried about, in terms of maintainability? Will the code be hard to maintain? How so? Regarding refactoring, can be you be more specific? the algorithm for redistributing capacity has a few steps. I've placed some code in helper methods. How differently do you recommend this code be refactored? Similarly we should maintain information rather than rebuild it in updateQueueSchedulingInfo. The code to maintain the QueueSchedulingInfo structures actively is a bit complex. You need to catch every instance where tasks are updated. Building the structures only when needed is a nice compromise, IMO. They're pretty quick to build, and the code's a lot smaller and focussed in one place. Plus, you build them when you need them, so your algorithms are still correct. Furthermore, you don't need to track all events - you only need to keep track of some cumulatiev numbers, so you might actually do less computation if you build them lazily. Is there a specific reason why you'd like to see information being maintained accurately rather than being rebuilt? Having said this, things might change once we look at porting this to 3412. 4. We probably should define an ennumeration for the map/reduce index rather than use integers. Fair point. These really are indexes into an array, and its' extremely unlikely we'll add more enum values. I suspect they will become private to the scheduler, so this may be a moot issue.
          Hide
          Arun C Murthy added a comment -

          I'm hoping we can use the 'taskUpdated' interface offered by HADOOP-3412 and it's descendents to keep all relevant information up-to-date i.e. updateQueueSchedulingInfo (not a big deal) and the actual 'state' of the QueueSchedulingInfo objects. The latter will help make 'redistributeCapacity' more readable/maintainable...

          Show
          Arun C Murthy added a comment - I'm hoping we can use the 'taskUpdated' interface offered by HADOOP-3412 and it's descendents to keep all relevant information up-to-date i.e. updateQueueSchedulingInfo (not a big deal) and the actual 'state' of the QueueSchedulingInfo objects. The latter will help make 'redistributeCapacity' more readable/maintainable...
          Hide
          Vivek Ratan added a comment -

          Attaching a new patch (3445.2.patch). I've refactored the scheduler code to support HADOOP-3412. All scheduler-relevant code is in a new class, TaskScheduler3421. Another class, JobQueuesManager keeps track of the sorted jobs, and is the listener for job changes.

          This patch also implements functionality to reclaim resources within a fixed period of time by preempting tasks. This is done through the TaskScheduler3421.ReclaimedResource class.

          What's left:

          • unit tests (I've been using my own, but I need to make them fit the testing framework).
          • re-reading the config file for queues, if it changes (maybe this can go in a separate Jira).

          ACLs are handled in another Jira.

          I still update queue scheduling info lazily, during heartbeat processing. These structures can be kept up-to-date during any task change, by providing and implementing a TaskChangeListener, but I haven't done that for two reasons:

          • I don't think we need to keep these structures up-to-date so often. They're useful only during heartbeats.
          • The code's easier to read too.
            It may be that the code to update these structures takes too much time (I don't think it should, but we'll only know once we profile it). In that case, maybe implementing a task-change interface would be good. An issue with the task change interface, as discussed briefly in HADOOP-3412, is that it's not enough to know what task changed. We need to know the task state before & after it changed, i.e., how exactly did the task state change. It's not clear what the task-change interface will look like. We can debate it if/when we need it.
          Show
          Vivek Ratan added a comment - Attaching a new patch (3445.2.patch). I've refactored the scheduler code to support HADOOP-3412 . All scheduler-relevant code is in a new class, TaskScheduler3421 . Another class, JobQueuesManager keeps track of the sorted jobs, and is the listener for job changes. This patch also implements functionality to reclaim resources within a fixed period of time by preempting tasks. This is done through the TaskScheduler3421.ReclaimedResource class. What's left: unit tests (I've been using my own, but I need to make them fit the testing framework). re-reading the config file for queues, if it changes (maybe this can go in a separate Jira). ACLs are handled in another Jira. I still update queue scheduling info lazily, during heartbeat processing. These structures can be kept up-to-date during any task change, by providing and implementing a TaskChangeListener, but I haven't done that for two reasons: I don't think we need to keep these structures up-to-date so often. They're useful only during heartbeats. The code's easier to read too. It may be that the code to update these structures takes too much time (I don't think it should, but we'll only know once we profile it). In that case, maybe implementing a task-change interface would be good. An issue with the task change interface, as discussed briefly in HADOOP-3412 , is that it's not enough to know what task changed. We need to know the task state before & after it changed, i.e., how exactly did the task state change. It's not clear what the task-change interface will look like. We can debate it if/when we need it.
          Vivek Ratan made changes -
          Attachment 3445.2.patch [ 12387290 ]
          Hide
          Owen O'Malley added a comment -

          I just started going through the patch, but a few preliminary comments:
          1. Remove the 3445 from the class name of the scheduler. The name should be descriptive.
          2. The JavaDoc of the class should describe the scheduler directly rather than referencing the jira.
          3. killTasks needs to be in the scheduler, rather than the generic JobInProgress. Picking which tasks to kill, is clearly in the domain of the scheduler...
          4. The data structures and code seem very complex. I think you need to introduce more abstractions to make the code easier to understand.

          Show
          Owen O'Malley added a comment - I just started going through the patch, but a few preliminary comments: 1. Remove the 3445 from the class name of the scheduler. The name should be descriptive. 2. The JavaDoc of the class should describe the scheduler directly rather than referencing the jira. 3. killTasks needs to be in the scheduler, rather than the generic JobInProgress. Picking which tasks to kill, is clearly in the domain of the scheduler... 4. The data structures and code seem very complex. I think you need to introduce more abstractions to make the code easier to understand.
          Hide
          Tom White added a comment -

          4. We probably should define an ennumeration for the map/reduce index rather than use integers.

          HADOOP-3746 defines a TaskType enum that would be suitable. We should move it to core so it can be shared.

          Show
          Tom White added a comment - 4. We probably should define an ennumeration for the map/reduce index rather than use integers. HADOOP-3746 defines a TaskType enum that would be suitable. We should move it to core so it can be shared.
          Hide
          Owen O'Malley added a comment -

          The current scheduler patch in 3445 is very unmaintainable and would not be straight forward to fix. The current design involves:

          quota = guaranteed capacity for queue
          allocated = allocated capacity for queue
          pending = # pending tasks for queue
          running = # running tasks for queue

          the current patch defines linked lists of queues:

          • Give: queues where allocated > quota
          • NeedAndUnder: pending + running > allocated && quota > allocated
          • NeedAndOver: pending + running > allocated && quota < allocated
          • NeedAndNeutral: pending + running > allocated && quota = allocated
          • NeutralAndOver: pending + running = allocated && quota < allocated
          • GiveAndOver: pending + running < allocated && quota < allocated

          Every 5 seconds, the lists are cleared and rebuilt on the current information and slots allocated to different queues. (There are linked lists both for maps and reduces and a lot of code that switches between working on maps and reduces based on values of 0 or 1.) The code to implement this is very complex and would be very difficult to maintain.

          I would propose a much simpler design that has a single priority queue where the key is running / quota for each queue. When a new slot is available, it is given to the queue with the lowest queue with pending > 0. Note that there is no different between allocated and running in this proposal, which makes it much easier. If you need to reclaim slots, you start from the queue with the largest allocated / quota and take slots until it is equal to the second highest, and then take slots from both of them. This guarantees that you will have a stable schedule and that the queues with the most excess capacity are reclaimed first. Having a stable scheduler that doesn't change the allocations every 5 seconds is very important for managing throughput.

          Other changes:
          1. Implement a scheduler class that handles a single kind of task and have 2 instances of it, one for maps, and one for reduces. This will help a lot.
          2. Move the discussion of the scheduler into the class java doc, rather than an internal hidden comment.
          3. Move the selection of tasks to kill into the scheduler and out of JobInProgress.
          4. Give the scheduler a name that isn't a reference to a jira number.
          5. The scheduler should assign multiple tasks at once.
          6. The user quotas should be done by having the queue count as pending only the tasks that are currently runnable under the user quotas.

          Show
          Owen O'Malley added a comment - The current scheduler patch in 3445 is very unmaintainable and would not be straight forward to fix. The current design involves: quota = guaranteed capacity for queue allocated = allocated capacity for queue pending = # pending tasks for queue running = # running tasks for queue the current patch defines linked lists of queues: Give: queues where allocated > quota NeedAndUnder: pending + running > allocated && quota > allocated NeedAndOver: pending + running > allocated && quota < allocated NeedAndNeutral: pending + running > allocated && quota = allocated NeutralAndOver: pending + running = allocated && quota < allocated GiveAndOver: pending + running < allocated && quota < allocated Every 5 seconds, the lists are cleared and rebuilt on the current information and slots allocated to different queues. (There are linked lists both for maps and reduces and a lot of code that switches between working on maps and reduces based on values of 0 or 1.) The code to implement this is very complex and would be very difficult to maintain. I would propose a much simpler design that has a single priority queue where the key is running / quota for each queue. When a new slot is available, it is given to the queue with the lowest queue with pending > 0. Note that there is no different between allocated and running in this proposal, which makes it much easier. If you need to reclaim slots, you start from the queue with the largest allocated / quota and take slots until it is equal to the second highest, and then take slots from both of them. This guarantees that you will have a stable schedule and that the queues with the most excess capacity are reclaimed first. Having a stable scheduler that doesn't change the allocations every 5 seconds is very important for managing throughput. Other changes: 1. Implement a scheduler class that handles a single kind of task and have 2 instances of it, one for maps, and one for reduces. This will help a lot. 2. Move the discussion of the scheduler into the class java doc, rather than an internal hidden comment. 3. Move the selection of tasks to kill into the scheduler and out of JobInProgress. 4. Give the scheduler a name that isn't a reference to a jira number. 5. The scheduler should assign multiple tasks at once. 6. The user quotas should be done by having the queue count as pending only the tasks that are currently runnable under the user quotas.
          Hide
          Owen O'Malley added a comment -

          As with HADOOP-3746, this scheduler should be made into a contrib module. The problem with not doing so is that the scheduler api is not public and thus each of the schedulers will drop at least a handful of new source files into org.apache.hadoop.mapred. The best way out of that is to have the schedulers placed in org.apache.hadoop.mapred, but locate their source in contrib.

          Show
          Owen O'Malley added a comment - As with HADOOP-3746 , this scheduler should be made into a contrib module. The problem with not doing so is that the scheduler api is not public and thus each of the schedulers will drop at least a handful of new source files into org.apache.hadoop.mapred. The best way out of that is to have the schedulers placed in org.apache.hadoop.mapred, but locate their source in contrib.
          Hide
          Vivek Ratan added a comment -

          Attaching a patch (3445.3.patch).

          The scheduler code has been simplified a fair bit, based on a couple of assumptions:
          1. I've gotten rid of the idea of allocated/target capacity, as it was proving to be confusing. What this means is that queues under capacity get preference over queues running over capacity (earlier, queues over capacity had an extended capacity limit, which made them compete equally with queues considered under capacity).
          2. when killing tasks, we can simplify things by killing as many as we can from the queue that is the most over capacity, then the next one, and so on. My earlier code used a more fair algorithm where it decided how many tasks would we kill from each queue, based on a ratio of their GCs. There are better ways to kill tasks, as noted in the documentation, but for now, this should be OK.

          I have also moved the scheduler code to contrib (under contrib/capacity-scheduler). Oh yeah, this scheduler is called 'Capacity Scheduler'. I've used the same format in the README file that is used in HADOOP-3746.

          I need to add test cases. That will happen soon.

          Show
          Vivek Ratan added a comment - Attaching a patch (3445.3.patch). The scheduler code has been simplified a fair bit, based on a couple of assumptions: 1. I've gotten rid of the idea of allocated/target capacity, as it was proving to be confusing. What this means is that queues under capacity get preference over queues running over capacity (earlier, queues over capacity had an extended capacity limit, which made them compete equally with queues considered under capacity). 2. when killing tasks, we can simplify things by killing as many as we can from the queue that is the most over capacity, then the next one, and so on. My earlier code used a more fair algorithm where it decided how many tasks would we kill from each queue, based on a ratio of their GCs. There are better ways to kill tasks, as noted in the documentation, but for now, this should be OK. I have also moved the scheduler code to contrib (under contrib/capacity-scheduler). Oh yeah, this scheduler is called 'Capacity Scheduler'. I've used the same format in the README file that is used in HADOOP-3746 . I need to add test cases. That will happen soon.
          Vivek Ratan made changes -
          Attachment 3445.3.patch [ 12388913 ]
          Hide
          Vivek Ratan added a comment -

          I would propose a much simpler design that has a single priority queue ...

          A single queue as you describe it, is too simplistic. we need to keep track of queues that have to reclaim capacity, as these queues should be given first preference when assigning a task. Patch 3445.3.patch keeps track of such queues, as well as queues based on #running/capacity. It does keep them in a single queue, though they're really two separate queues.

          Show
          Vivek Ratan added a comment - I would propose a much simpler design that has a single priority queue ... A single queue as you describe it, is too simplistic. we need to keep track of queues that have to reclaim capacity, as these queues should be given first preference when assigning a task. Patch 3445.3.patch keeps track of such queues, as well as queues based on #running/capacity. It does keep them in a single queue, though they're really two separate queues.
          Hide
          Sanjay Radia added a comment -

          >>>>2. when killing tasks, we can simplify things by killing as many as we can from the queue that is the most over capacity, then the next one, and so on. My earlier code used a more fair algorithm where it decided how many tasks would we kill from each queue, based on a ratio of their GCs. There are better ways to kill tasks, as noted in the documentation, but for now, this should be OK.
          <<<<<<

          The preemption and allocation of tasks need to be consistent to avoid trashing.
          If you are allocating first to queues that are furtherest distance from GC (ie the smallest Running/GC ratio) then preemption needs to preempt first from the queue with the highest running/GC ratio (which you do as stated in your comment) but should only preempt sufficient tasks to get that queue to the next highest queue and then prempt from those two queues, ... etc.
          For example if you did not do that, later, when slots become available, the queue that was preempted will soon become the largest (since it has all the jobs pending) and it will once again become a target for preemption.

          Hence in a sense I am suggesting you need to bring the "fairer" feature of you earlier algorithm to your new algorithm; my primary motivation is consistency and hence stability (ie no trashing) and secondary motivation is fairness.

          Show
          Sanjay Radia added a comment - >>>>2. when killing tasks, we can simplify things by killing as many as we can from the queue that is the most over capacity, then the next one, and so on. My earlier code used a more fair algorithm where it decided how many tasks would we kill from each queue, based on a ratio of their GCs. There are better ways to kill tasks, as noted in the documentation, but for now, this should be OK. <<<<<< The preemption and allocation of tasks need to be consistent to avoid trashing. If you are allocating first to queues that are furtherest distance from GC (ie the smallest Running/GC ratio) then preemption needs to preempt first from the queue with the highest running/GC ratio (which you do as stated in your comment) but should only preempt sufficient tasks to get that queue to the next highest queue and then prempt from those two queues, ... etc . For example if you did not do that, later, when slots become available, the queue that was preempted will soon become the largest (since it has all the jobs pending) and it will once again become a target for preemption. Hence in a sense I am suggesting you need to bring the "fairer" feature of you earlier algorithm to your new algorithm; my primary motivation is consistency and hence stability (ie no trashing) and secondary motivation is fairness.
          Hide
          Vivek Ratan added a comment -

          Sanjay, your comments are valid. Killing all tasks from one queue makes that queue more likely to receive a free slot faster than if we killed from more than one queue. As I've written earlier, there are better ways. However, this is not a big concern. For one, killing tasks will be a rare event, not something commonplace. In most cases, queues will be running at full capacity. Most, if not all, queues will have jobs waiting. Given a fair division of capacity among queues, it's very rare that we'll need to preempt. Furthermore, preemption happens only if a queue does not get enough resources within a period of time (usually, minutes). It seems very rare that the entire cluster is filled with tasks that take minutes to complete, and hence need to be killed.

          What I'm really arguing here is that this solution, though far from perfect, is OK for now. It's highly unlikely to cause the system to thrash (there are just too many restrictions that need to happen before the same queue is subjected to a cycle of its tasks being run and killed, then run again, killed again, and so on). But your point is taken. We do need to make this better, and we'll get to it soon, once we finish with the more critical stuff.

          Show
          Vivek Ratan added a comment - Sanjay, your comments are valid. Killing all tasks from one queue makes that queue more likely to receive a free slot faster than if we killed from more than one queue. As I've written earlier, there are better ways. However, this is not a big concern. For one, killing tasks will be a rare event, not something commonplace. In most cases, queues will be running at full capacity. Most, if not all, queues will have jobs waiting. Given a fair division of capacity among queues, it's very rare that we'll need to preempt. Furthermore, preemption happens only if a queue does not get enough resources within a period of time (usually, minutes). It seems very rare that the entire cluster is filled with tasks that take minutes to complete, and hence need to be killed. What I'm really arguing here is that this solution, though far from perfect, is OK for now. It's highly unlikely to cause the system to thrash (there are just too many restrictions that need to happen before the same queue is subjected to a cycle of its tasks being run and killed, then run again, killed again, and so on). But your point is taken. We do need to make this better, and we'll get to it soon, once we finish with the more critical stuff.
          Owen O'Malley made changes -
          Link This issue is blocked by HADOOP-3930 [ HADOOP-3930 ]
          Hide
          Vivek Ratan added a comment -

          Submitting patch (3445.4.patch) with unit test cases, better documentation, and a few small bug fixes. I've used part of the unit test framework Matei developed in HADOOP-3746. Thanks Matei.

          Show
          Vivek Ratan added a comment - Submitting patch (3445.4.patch) with unit test cases, better documentation, and a few small bug fixes. I've used part of the unit test framework Matei developed in HADOOP-3746 . Thanks Matei.
          Vivek Ratan made changes -
          Attachment 3445.4.patch [ 12389145 ]
          Vivek Ratan made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Release Note This patch implements a scheduler for Map-Reduce jobs, called Capacity Task Scheduler (or just Capacity Scheduler), which provides a way to share large clusters. The scheduler provides a number of features which are described in its README file.
          Hide
          Owen O'Malley added a comment -

          Killing all tasks from one queue makes that queue more likely to receive a free slot faster than if we killed from more than one queue

          This is not a justification. Killing too many tasks from the wrong queue is disruptive to the system and will lose progress, which is the the antithesis of the goal.

          For one, killing tasks will be a rare event, not something commonplace

          I don't believe this is true. The usage graph on the current cluster is straight at 100% utilization. Furthermore, it is almost certain that all organization's work will not be constant. In any case, it is a very dangerous assumption to base your design on.

          A single queue as you describe it, is too simplistic.

          Note, that I said a priority queue. You are currently using a list that is resorted on every use, which is much much less efficient.

          Show
          Owen O'Malley added a comment - Killing all tasks from one queue makes that queue more likely to receive a free slot faster than if we killed from more than one queue This is not a justification. Killing too many tasks from the wrong queue is disruptive to the system and will lose progress, which is the the antithesis of the goal. For one, killing tasks will be a rare event, not something commonplace I don't believe this is true. The usage graph on the current cluster is straight at 100% utilization. Furthermore, it is almost certain that all organization's work will not be constant. In any case, it is a very dangerous assumption to base your design on. A single queue as you describe it, is too simplistic. Note, that I said a priority queue. You are currently using a list that is resorted on every use, which is much much less efficient.
          Hide
          Matei Zaharia added a comment - - edited

          Vivek - some of the unit test code I took was from the test for the default scheduler in HADOOP-3412, so since we're all using it, maybe we can make some of those classes shared utilities in the mapred tests eventually.

          Show
          Matei Zaharia added a comment - - edited Vivek - some of the unit test code I took was from the test for the default scheduler in HADOOP-3412 , so since we're all using it, maybe we can make some of those classes shared utilities in the mapred tests eventually.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12389145/3445.4.patch
          against trunk revision 690142.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 4 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          -1 core tests. The patch failed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3144/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3144/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3144/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3144/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12389145/3445.4.patch against trunk revision 690142. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 4 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3144/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3144/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3144/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3144/console This message is automatically generated.
          Hide
          Owen O'Malley added a comment -

          You can use the new method TaskInProgress.getActiveTasks instead of
          numRunningAttempts.

          TaskInProgress.killAnyTask isn't appropriate to add to the
          framework. I think the semantics should be rethough rather than
          killing the first attempt in a task, which will likely kill the
          non-speculative task attempt instead of the speculative task
          attempt. It would be much better to use the new getActiveTasks to find
          the task attempt that is furthest behind.

          The default configuration for this scheduler should not be in the
          framework class ResourceManagerConf. They should be in a class in
          src/contrib/capacity-scheduler.

          I don't understand the meaning of "getFirstQueue in
          ResourceManagerConf. Do you mean default queue? This seems to not
          match HADOOP-3698.

          JobInProgress.killTasks doesn't belong in the JobInProgress. It is
          making scheduling decisions and belongs in the capacity
          scheduler. If you need access to the runningMapCache, make a
          getRunningMapCache() that is package private. It isn't a great API,
          but it keeps the scheduling decisions that are specific to this
          scheduler in an appropriate place. This scheduler should pick which
          task attempt to kill and call TaskInProgress.killTask(attemptid,
          boolean).

          That said, I think the approach is wrong. The queues should kill from
          the most recently launch job and kill the task attempt that has made
          the least progress. That minimized the cost of the killing actions.

          The myDaddy field should be renamed to parent. We don't need to be
          gender specific or fail to use the standard computer science terms.

          Lots of the comments are block comments with "/////////////" that
          should be javadoc, so that the tools understand them. In Hadoop, we
          refrain from using long strings of separators in comments.

          ReclaimedResource.countdown would be better expressed as the time of
          the deadline, so that it doesn't need to be decremented and isn't
          sensitive to the update cycle times.

          Show
          Owen O'Malley added a comment - You can use the new method TaskInProgress.getActiveTasks instead of numRunningAttempts. TaskInProgress.killAnyTask isn't appropriate to add to the framework. I think the semantics should be rethough rather than killing the first attempt in a task, which will likely kill the non-speculative task attempt instead of the speculative task attempt. It would be much better to use the new getActiveTasks to find the task attempt that is furthest behind. The default configuration for this scheduler should not be in the framework class ResourceManagerConf. They should be in a class in src/contrib/capacity-scheduler. I don't understand the meaning of "getFirstQueue in ResourceManagerConf. Do you mean default queue? This seems to not match HADOOP-3698 . JobInProgress.killTasks doesn't belong in the JobInProgress. It is making scheduling decisions and belongs in the capacity scheduler. If you need access to the runningMapCache, make a getRunningMapCache() that is package private. It isn't a great API, but it keeps the scheduling decisions that are specific to this scheduler in an appropriate place. This scheduler should pick which task attempt to kill and call TaskInProgress.killTask(attemptid, boolean). That said, I think the approach is wrong. The queues should kill from the most recently launch job and kill the task attempt that has made the least progress. That minimized the cost of the killing actions. The myDaddy field should be renamed to parent. We don't need to be gender specific or fail to use the standard computer science terms. Lots of the comments are block comments with "/////////////" that should be javadoc, so that the tools understand them. In Hadoop, we refrain from using long strings of separators in comments. ReclaimedResource.countdown would be better expressed as the time of the deadline, so that it doesn't need to be decremented and isn't sensitive to the update cycle times.
          Hide
          Owen O'Malley added a comment -

          Chris and I were talking and he came up with a really simple approach that seems to have all of the properties of your preemption design.

          You keep the priority queue keyed by:

            long deadlineForPreemption;
            float runningOverQuota;
          

          If a queue is either at or over its quota or has no pending tasks, the deadline is set to Long.MAX_VALUE. Therefore, you are guaranteed to satisfy all underserved queues in the order that will violate their SLA. If the deadline for the first item in the queue is < 2 * heartbeat, then you start shooting min(pending, quota - running) tasks from the most over quota queues. (Keeping the running/quota ratio balanced for the over extended queues.) This way you give priority to the most urgent under served queue and you know exactly when to start killing tasks.

          With the current patch, each queue can end up with a new ReclaimedResource object added to the list every 5 seconds for the entire SLA period.

          The one downside to this proposal is that if Q1 is under and gets a new job, and the timer starts at T1 and then gets another job at T2, we will make sure that they get up to quota (or pending = 0) before the timer from T1 expires rather than the T2 timer. This is a little more aggressive that yours, but the implementation is much simpler and easier to reason about. It isn't clear which of the semantics is actually better/fairer, but if we discover that it upsets lots of users, we could extend this model relatively easily.

          Show
          Owen O'Malley added a comment - Chris and I were talking and he came up with a really simple approach that seems to have all of the properties of your preemption design. You keep the priority queue keyed by: long deadlineForPreemption; float runningOverQuota; If a queue is either at or over its quota or has no pending tasks, the deadline is set to Long.MAX_VALUE. Therefore, you are guaranteed to satisfy all underserved queues in the order that will violate their SLA. If the deadline for the first item in the queue is < 2 * heartbeat, then you start shooting min(pending, quota - running) tasks from the most over quota queues. (Keeping the running/quota ratio balanced for the over extended queues.) This way you give priority to the most urgent under served queue and you know exactly when to start killing tasks. With the current patch, each queue can end up with a new ReclaimedResource object added to the list every 5 seconds for the entire SLA period. The one downside to this proposal is that if Q1 is under and gets a new job, and the timer starts at T1 and then gets another job at T2, we will make sure that they get up to quota (or pending = 0) before the timer from T1 expires rather than the T2 timer. This is a little more aggressive that yours, but the implementation is much simpler and easier to reason about. It isn't clear which of the semantics is actually better/fairer, but if we discover that it upsets lots of users, we could extend this model relatively easily.
          Hide
          Vivek Ratan added a comment -

          Note, that I said a priority queue. You are currently using a list that is resorted on every use, which is much much less efficient.

          You're mixing up issues here. A single queue as you suggest (it being a priority queue makes no difference) will not work. I have explained why, in an earlier comment. You need two conceptually separate collections - one for queues that need to reclaim capacity (which should be satisfied first), and one for the others (which can be sorted similar to what you've suggested).

          As for re-sorting the list that I have:
          1. Before re-sorting, I recompute a bunch of stuff for every queue - # of running tasks, # of tasks per users, # of waiting tasks. Only then can you resort the collection. It doesn't make sense to keep a priority queue, because we update the scheduling information for all queues periodically and then resort.
          2. As I have mentioned in my comments very clearly (see TaskSchedulingMgr.assignTasks()), if resorting becomes expensive, you can easily do it once every few heartbeats, rather than every heartbeat. I don't think it'll be expensive, as we'll likely only have a few queues per cluster, but there's an easy fallback if re-sorting becomes a bottleneck.

          Show
          Vivek Ratan added a comment - Note, that I said a priority queue. You are currently using a list that is resorted on every use, which is much much less efficient. You're mixing up issues here. A single queue as you suggest (it being a priority queue makes no difference) will not work. I have explained why, in an earlier comment. You need two conceptually separate collections - one for queues that need to reclaim capacity (which should be satisfied first), and one for the others (which can be sorted similar to what you've suggested). As for re-sorting the list that I have: 1. Before re-sorting, I recompute a bunch of stuff for every queue - # of running tasks, # of tasks per users, # of waiting tasks. Only then can you resort the collection. It doesn't make sense to keep a priority queue, because we update the scheduling information for all queues periodically and then resort. 2. As I have mentioned in my comments very clearly (see TaskSchedulingMgr.assignTasks()), if resorting becomes expensive, you can easily do it once every few heartbeats, rather than every heartbeat. I don't think it'll be expensive, as we'll likely only have a few queues per cluster, but there's an easy fallback if re-sorting becomes a bottleneck.
          Hide
          Vivek Ratan added a comment -

          The default configuration for this scheduler should not be in the framework class ResourceManagerConf. They should be in a class in src/contrib/capacity-scheduler.

          This was to be done by HADOOP-3698, but I have moved ResourceManagerConf to contrib.

          I don't understand the meaning of "getFirstQueue in ResourceManagerConf. Do you mean default queue?

          This is a remnant of a change that was introduced earlier. Initially, the first queue was considered the default queue, but now we expect a queue called 'default' to be present. I have removed this method.

          The queues should kill from the most recently launch job

          The code does do that. It picks the job that ran the last.

          ReclaimedResource.countdown would be better expressed as the time of the deadline, so that it doesn't need to be decremented and isn't sensitive to the update cycle times.

          It is expressed as a deadline. It represents how many cycles of the thread need to happen. I only decrement once in a while to keep the counter low, and as indicated in my comments, that may also not be necessary.

          Show
          Vivek Ratan added a comment - The default configuration for this scheduler should not be in the framework class ResourceManagerConf. They should be in a class in src/contrib/capacity-scheduler. This was to be done by HADOOP-3698 , but I have moved ResourceManagerConf to contrib. I don't understand the meaning of "getFirstQueue in ResourceManagerConf. Do you mean default queue? This is a remnant of a change that was introduced earlier. Initially, the first queue was considered the default queue, but now we expect a queue called 'default' to be present. I have removed this method. The queues should kill from the most recently launch job The code does do that. It picks the job that ran the last. ReclaimedResource.countdown would be better expressed as the time of the deadline, so that it doesn't need to be decremented and isn't sensitive to the update cycle times. It is expressed as a deadline. It represents how many cycles of the thread need to happen. I only decrement once in a while to keep the counter low, and as indicated in my comments, that may also not be necessary.
          Hide
          Vivek Ratan added a comment -

          The latest patch (3445.5.patch) incorporates some of the feedback suggestions. I have also made a couple of bigger changes, as these seem to be causing some grief among folks.

          • The code to kill a task is completely in the scheduler. As before, the job that ran last is picked first. Within that job, we pick tasks that ran the last, again as before. Within a task, I kill the attempt that has made the least progress, as per an earlier suggestion.
          • I no longer kill all tasks from the queue that has the highest ration of #running/GC. Instead, tasks from each queue are killed in ratio of how many tasks over capacity the queue was running. This seems quite fair (there are other fair ways to do thinsg as well), and will avoid the thrashing issue that Sanjay brought up.

          -1 core tests. The patch failed core unit tests.

          I don't know why some core tests are failing, but it is not because of this patch.

          Show
          Vivek Ratan added a comment - The latest patch (3445.5.patch) incorporates some of the feedback suggestions. I have also made a couple of bigger changes, as these seem to be causing some grief among folks. The code to kill a task is completely in the scheduler. As before, the job that ran last is picked first. Within that job, we pick tasks that ran the last, again as before. Within a task, I kill the attempt that has made the least progress, as per an earlier suggestion. I no longer kill all tasks from the queue that has the highest ration of #running/GC. Instead, tasks from each queue are killed in ratio of how many tasks over capacity the queue was running. This seems quite fair (there are other fair ways to do thinsg as well), and will avoid the thrashing issue that Sanjay brought up. -1 core tests. The patch failed core unit tests. I don't know why some core tests are failing, but it is not because of this patch.
          Vivek Ratan made changes -
          Attachment 3445.5.patch [ 12389292 ]
          Hide
          Vivek Ratan added a comment -

          A couple of core tests for TestResourceManagerConf are failing. Will fix that shortly.

          Show
          Vivek Ratan added a comment - A couple of core tests for TestResourceManagerConf are failing. Will fix that shortly.
          Hide
          Vivek Ratan added a comment -

          Need to fix some tests

          Show
          Vivek Ratan added a comment - Need to fix some tests
          Vivek Ratan made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Hide
          Vivek Ratan added a comment -

          Attaching patch (3445.6.patch) which fixes some test cases that were failing earlier.

          Show
          Vivek Ratan added a comment - Attaching patch (3445.6.patch) which fixes some test cases that were failing earlier.
          Vivek Ratan made changes -
          Attachment 3445.6.patch [ 12389326 ]
          Vivek Ratan made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12389326/3445.6.patch
          against trunk revision 691099.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 10 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          -1 core tests. The patch failed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3159/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3159/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3159/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3159/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12389326/3445.6.patch against trunk revision 691099. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 10 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3159/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3159/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3159/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3159/console This message is automatically generated.
          Hide
          Vivek Ratan added a comment -

          The test that is failing is an hdfs test, that should not have anything to do with this patch.

          Show
          Vivek Ratan added a comment - The test that is failing is an hdfs test, that should not have anything to do with this patch.
          Hide
          Owen O'Malley added a comment -

          As the comment in the patch suggests, queueName should move from
          JobInProgress to JobProfile. Actually, this has already been done in
          HADOP-3698, which we should probably go ahead and commit first.

          The waitingMaps and waitingReduces should be renamed to
          countPending

          {Maps,Reduces}

          . The calculation clearly should not add in
          speculative tasks, since they are not pending. It would be even better
          if this code was put in to the capacity scheduler instead of adding
          stuff into JobInProgress.

          getRunningMapCache should return the map instead of the values.

          getRunningTaskWithLeastProgress should be moved to the capacity
          scheduler using getActiveTasks, since there will almost always be 1 or 2
          task attempts to look through.

          The ReclaimedResource list still has countdowns instead of
          deadlines, which means you need to keep a current time in iterations
          and lose an extra 20% due to noise in the iteration times. Both of
          which go away, if you millis from System.getCurrentMillis.

          Show
          Owen O'Malley added a comment - As the comment in the patch suggests, queueName should move from JobInProgress to JobProfile. Actually, this has already been done in HADOP-3698, which we should probably go ahead and commit first. The waitingMaps and waitingReduces should be renamed to countPending {Maps,Reduces} . The calculation clearly should not add in speculative tasks, since they are not pending. It would be even better if this code was put in to the capacity scheduler instead of adding stuff into JobInProgress. getRunningMapCache should return the map instead of the values. getRunningTaskWithLeastProgress should be moved to the capacity scheduler using getActiveTasks, since there will almost always be 1 or 2 task attempts to look through. The ReclaimedResource list still has countdowns instead of deadlines, which means you need to keep a current time in iterations and lose an extra 20% due to noise in the iteration times. Both of which go away, if you millis from System.getCurrentMillis.
          Hide
          Sameer Paranjpye added a comment - - edited

          Thanks for staying on top of this, Owen and Vivek.

          Show
          Sameer Paranjpye added a comment - - edited Thanks for staying on top of this, Owen and Vivek.
          Hide
          Vivek Ratan added a comment -

          The calculation clearly should not add in speculative tasks, since they are not pending.

          I think you need to keep track of speculative tasks. JobInProgress keeps track of running tasks (runningMapTasks and runningReduceTasks). Running tasks include speculative tasks as well. JobInProgress also keeps track of total number of tasks (which is set initially), finished tasks, and failed tasks. So, if I want to compute the number of pending tasks in a job, it has to be #total-#running-#finished-#failed. Except that #running contains speculative tasks as well. So #pending has to be (#total-#running-#finished-#failed)+#speculative, because #total does not include speculative tasks. Optionally, I could add up the number of tasks in nonRunningMapCache and nonLocalMaps, perhaps.

          It would be even better if this code was put in to the capacity scheduler instead of adding stuff into JobInProgress.

          If JobInProgress exposes APIs to return number of running and finished tasks, shouldn't the API for returning the number of pending tasks also be in JobInProgress? pendingMaps() shouldn't be any different than runningMaps() or finishedMaps(), right?

          Show
          Vivek Ratan added a comment - The calculation clearly should not add in speculative tasks, since they are not pending. I think you need to keep track of speculative tasks. JobInProgress keeps track of running tasks (runningMapTasks and runningReduceTasks). Running tasks include speculative tasks as well. JobInProgress also keeps track of total number of tasks (which is set initially), finished tasks, and failed tasks. So, if I want to compute the number of pending tasks in a job, it has to be #total-#running-#finished-#failed. Except that #running contains speculative tasks as well. So #pending has to be (#total-#running-#finished-#failed)+#speculative, because #total does not include speculative tasks. Optionally, I could add up the number of tasks in nonRunningMapCache and nonLocalMaps, perhaps. It would be even better if this code was put in to the capacity scheduler instead of adding stuff into JobInProgress. If JobInProgress exposes APIs to return number of running and finished tasks, shouldn't the API for returning the number of pending tasks also be in JobInProgress? pendingMaps() shouldn't be any different than runningMaps() or finishedMaps(), right?
          Hide
          Vivek Ratan added a comment -

          New patch (3445.7.patch) submitted.

          Show
          Vivek Ratan added a comment - New patch (3445.7.patch) submitted.
          Vivek Ratan made changes -
          Attachment 3445.7.patch [ 12389500 ]
          Hide
          Vivek Ratan added a comment -

          New patch (3445.8.patch) which fixes a bad log statement (which I had forgotten to correct earlier).

          Show
          Vivek Ratan added a comment - New patch (3445.8.patch) which fixes a bad log statement (which I had forgotten to correct earlier).
          Vivek Ratan made changes -
          Attachment 3445.8.patch [ 12389504 ]
          Hide
          Vivek Ratan added a comment -

          HADOOP-3698 introduced some changes that affected this patch. Attaching a new patch (3445.9.patch) that is compatible with the trunk.

          Show
          Vivek Ratan added a comment - HADOOP-3698 introduced some changes that affected this patch. Attaching a new patch (3445.9.patch) that is compatible with the trunk.
          Vivek Ratan made changes -
          Attachment 3445.9.patch [ 12389567 ]
          Hide
          Owen O'Malley added a comment -

          Is there a use case where the guarantee to a queue is different for map and reduce?

          I think there should be a configured default for the queues that don't have an attribute set.

          mapred.capacity-scheduler.default.foo = 12

          and any queue including the default will pick up that value, but the default queue can be changed separately.

          mapred.capacity-scheduler.queue.default.foo = 23

          <property>
            <name>mapred.capacity-scheduler.queue.default.minimum-user-limit-percent</name>
            <value>100</value>
            <description>The minimum percentage of the cluster which can 
              be utilized by a single user.
            </description>
          </property>
          

          This is a confusing variable name. We really need a better one or at the very least a better documentation string.

          Show
          Owen O'Malley added a comment - Is there a use case where the guarantee to a queue is different for map and reduce? I think there should be a configured default for the queues that don't have an attribute set. mapred.capacity-scheduler.default.foo = 12 and any queue including the default will pick up that value, but the default queue can be changed separately. mapred.capacity-scheduler.queue.default.foo = 23 <property> <name>mapred.capacity-scheduler.queue. default .minimum-user-limit-percent</name> <value>100</value> <description>The minimum percentage of the cluster which can be utilized by a single user. </description> </property> This is a confusing variable name. We really need a better one or at the very least a better documentation string.
          Hide
          Owen O'Malley added a comment -

          It also seems like WHEN_TO_KILL_TO_RECLAIM should be set to a multiple of JobTracker heartbeats, rather than a percentage of the expiry time. And it should be set to 3. A fraction of the expiry time is not necessarily enough time for the tasks to be killed.

          I think the configuration file should be changed to reflect that it is specific to this scheduler and it should be a template like hadoop-site.xml is. So the resource-manager-conf.xml would become capacity-scheduler.xml.template and the build.xml would copy the capacity-scheduler.xml.template over to .xml, if it doesn't exist.

          Show
          Owen O'Malley added a comment - It also seems like WHEN_TO_KILL_TO_RECLAIM should be set to a multiple of JobTracker heartbeats, rather than a percentage of the expiry time. And it should be set to 3. A fraction of the expiry time is not necessarily enough time for the tasks to be killed. I think the configuration file should be changed to reflect that it is specific to this scheduler and it should be a template like hadoop-site.xml is. So the resource-manager-conf.xml would become capacity-scheduler.xml.template and the build.xml would copy the capacity-scheduler.xml.template over to .xml, if it doesn't exist.
          Hide
          Vivek Ratan added a comment - - edited

          It also seems like WHEN_TO_KILL_TO_RECLAIM should be set to a multiple of JobTracker heartbeats, rather than a percentage of the expiry time. And it should be set to 3. A fraction of the expiry time is not necessarily enough time for the tasks to be killed.

          Since the code to reclaim capacity runs in a separate thread, I think what you mean is that it should start reclaiming tasks when there is just enough time for 3 heartbeats before the queue's SLA kicks in. That's fine, but it requires the hearbeat interval to be exposed to the scheduler, which means that the TaskTrackerManager interface needs to expose getNextHeartbeatInterval(), which currently is a private method in JobTracker. Changing the TaskTrackerManager interface will cause some other projects to fail. The test classes for the Fair Share Scheduler, for example, would fail. Do we really want to do that at this stage? And if it's OK to do that, am I supposed to make the changes to the code in the Fairshare Scheduler in this patch as well?

          Show
          Vivek Ratan added a comment - - edited It also seems like WHEN_TO_KILL_TO_RECLAIM should be set to a multiple of JobTracker heartbeats, rather than a percentage of the expiry time. And it should be set to 3. A fraction of the expiry time is not necessarily enough time for the tasks to be killed. Since the code to reclaim capacity runs in a separate thread, I think what you mean is that it should start reclaiming tasks when there is just enough time for 3 heartbeats before the queue's SLA kicks in. That's fine, but it requires the hearbeat interval to be exposed to the scheduler, which means that the TaskTrackerManager interface needs to expose getNextHeartbeatInterval(), which currently is a private method in JobTracker. Changing the TaskTrackerManager interface will cause some other projects to fail. The test classes for the Fair Share Scheduler, for example, would fail. Do we really want to do that at this stage? And if it's OK to do that, am I supposed to make the changes to the code in the Fairshare Scheduler in this patch as well?
          Hide
          Hemanth Yamijala added a comment -

          And if it's OK to do that, am I supposed to make the changes to the code in the Fairshare Scheduler in this patch as well?

          Vivek, I did change the test classes that were committed as part of the Fairshare scheduler when I introduced the getQueueManager API. I think it is OK to modify test case classes of other projects to keep trunk compiling and tests running.

          Show
          Hemanth Yamijala added a comment - And if it's OK to do that, am I supposed to make the changes to the code in the Fairshare Scheduler in this patch as well? Vivek, I did change the test classes that were committed as part of the Fairshare scheduler when I introduced the getQueueManager API. I think it is OK to modify test case classes of other projects to keep trunk compiling and tests running.
          Hide
          Vivek Ratan added a comment -

          OK. Then I'll go ahead and change the TaskTrackerManager interface and update the affected files.

          Show
          Vivek Ratan added a comment - OK. Then I'll go ahead and change the TaskTrackerManager interface and update the affected files.
          Hide
          Vivek Ratan added a comment -

          New patch (3445.10.patch) attached

          Show
          Vivek Ratan added a comment - New patch (3445.10.patch) attached
          Vivek Ratan made changes -
          Attachment 3445.10.patch [ 12389750 ]
          Hide
          Owen O'Malley added a comment -

          What size cluster has the scheduler been tested on?

          We need another run through findbugs.

          I think we should have a single configuration parameter that defines the queue capacity, rather than one for maps and one for reduces.

          You have a "new Integer(1)" that will likely cause a findbugs warning. It should either just use auto-boxing or use Integer.valueOf(1).

          Things that we can address if follow up bugs:
          1. The default values for queues should be configurable.
          2. Capacities should be floats, not integers.
          3. Sanity check that all of the capacities are >= 0 and <= 100
          4. Santiy check that the sum of all the capacities are <= 100.

          Show
          Owen O'Malley added a comment - What size cluster has the scheduler been tested on? We need another run through findbugs. I think we should have a single configuration parameter that defines the queue capacity, rather than one for maps and one for reduces. You have a "new Integer(1)" that will likely cause a findbugs warning. It should either just use auto-boxing or use Integer.valueOf(1). Things that we can address if follow up bugs: 1. The default values for queues should be configurable. 2. Capacities should be floats, not integers. 3. Sanity check that all of the capacities are >= 0 and <= 100 4. Santiy check that the sum of all the capacities are <= 100.
          Hide
          Vivek Ratan added a comment -

          You have a "new Integer(1)" that will likely cause a findbugs warning. It should either just use auto-boxing or use Integer.valueOf(1).

          I didn't get any findbugs warning because of this line. There have been no findbugs warnings introduced by this patch.

          I think we should have a single configuration parameter that defines the queue capacity, rather than one for maps and one for reduces.

          You can configure different number of slots for Maps and Reduces per TT. It then stands to reason that the cluster capacity, and hence a queue capacity, can be different for maps and reduces. The ClusterStatus class keeps different values for max map and reduce tasks.

          1. The default values for queues should be configurable.

          Which values are you referring to? They're all configurable, as far as I know, unless I've missed something.

          Capacities should be floats, not integers.

          Queue capacities are expressed as percentages of the grid capacities. Why would they be floats?

          Sanity checks are something we need to add more of.

          Show
          Vivek Ratan added a comment - You have a "new Integer(1)" that will likely cause a findbugs warning. It should either just use auto-boxing or use Integer.valueOf(1). I didn't get any findbugs warning because of this line. There have been no findbugs warnings introduced by this patch. I think we should have a single configuration parameter that defines the queue capacity, rather than one for maps and one for reduces. You can configure different number of slots for Maps and Reduces per TT. It then stands to reason that the cluster capacity, and hence a queue capacity, can be different for maps and reduces. The ClusterStatus class keeps different values for max map and reduce tasks. 1. The default values for queues should be configurable. Which values are you referring to? They're all configurable, as far as I know, unless I've missed something. Capacities should be floats, not integers. Queue capacities are expressed as percentages of the grid capacities. Why would they be floats? Sanity checks are something we need to add more of.
          Hide
          Sameer Paranjpye added a comment -

          You can configure different number of slots for Maps and Reduces per TT. It then stands to reason that the cluster capacity ...

          A single queue capacity expressed as a percentage will convert to different absolute numbers of map and reduce slots, no?

          Show
          Sameer Paranjpye added a comment - You can configure different number of slots for Maps and Reduces per TT. It then stands to reason that the cluster capacity ... A single queue capacity expressed as a percentage will convert to different absolute numbers of map and reduce slots, no?
          Hide
          Vivek Ratan added a comment -

          A single queue capacity expressed as a percentage will convert to different absolute numbers of map and reduce slots, no?

          Yes, but if you're configuring TTs to have different slots for maps and reduces, and hence allowing the cluster to have different numbers of map and reduce slots, then why take away that flexibility from queues? It makes no difference in the code, and I don't think it markedly simplifies configuration. We don't have enough use cases yet to see whether this flexibility will make a difference, but I don't see why we need to place that constraint.

          Show
          Vivek Ratan added a comment - A single queue capacity expressed as a percentage will convert to different absolute numbers of map and reduce slots, no? Yes, but if you're configuring TTs to have different slots for maps and reduces, and hence allowing the cluster to have different numbers of map and reduce slots, then why take away that flexibility from queues? It makes no difference in the code, and I don't think it markedly simplifies configuration. We don't have enough use cases yet to see whether this flexibility will make a difference, but I don't see why we need to place that constraint.
          Hide
          Sameer Paranjpye added a comment -

          The purpose of allowing different numbers of map and reduce slots on TTs is to efficiently use resource on a node. What is the purpose of allowing different numbers in queues? A queue gets a fraction of the clusters resource. Having it get a different fractions of the map and reduce slots is just odd.

          Having two different variables does not seem like flexibility to me. It feels like interface complexity whose merit is far from clear.

          Show
          Sameer Paranjpye added a comment - The purpose of allowing different numbers of map and reduce slots on TTs is to efficiently use resource on a node. What is the purpose of allowing different numbers in queues? A queue gets a fraction of the clusters resource. Having it get a different fractions of the map and reduce slots is just odd. Having two different variables does not seem like flexibility to me. It feels like interface complexity whose merit is far from clear.
          Hide
          Owen O'Malley added a comment -

          1. The default values for queues should be configurable.

          Which values are you referring to? They're all configurable, as far as I know, unless I've missed something.

          I'm referring to the default values:

          public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml";
          public static final int DEFAULT_GUARANTEED_CAPACITY_MAPS = 100;
          public static final int DEFAULT_GUARANTEED_CAPACITY_REDUCES = 100;
          public static final int DEFAULT_RECLAIM_TIME_LIMIT = 300;
          public static final int DEFAULT_MIN_USER_LIMIT_PERCENT = 100;
          

          Queue capacities are expressed as percentages of the grid capacities. Why would they be floats?

          The capacities for some queues may be measured in 1/1000 of a cluster. Don't forget that 0.1% of 3000 machines is still 3 machines. If we can only assign capacity at 1% (30 machines) at a time, that is larger than many jobs need.

          The sanity checks are particularly important because the default guaranteed capacity is 100%. If you define two queues, and don't assign a capacity, it will be over.

          Show
          Owen O'Malley added a comment - 1. The default values for queues should be configurable. Which values are you referring to? They're all configurable, as far as I know, unless I've missed something. I'm referring to the default values: public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml" ; public static final int DEFAULT_GUARANTEED_CAPACITY_MAPS = 100; public static final int DEFAULT_GUARANTEED_CAPACITY_REDUCES = 100; public static final int DEFAULT_RECLAIM_TIME_LIMIT = 300; public static final int DEFAULT_MIN_USER_LIMIT_PERCENT = 100; Queue capacities are expressed as percentages of the grid capacities. Why would they be floats? The capacities for some queues may be measured in 1/1000 of a cluster. Don't forget that 0.1% of 3000 machines is still 3 machines. If we can only assign capacity at 1% (30 machines) at a time, that is larger than many jobs need. The sanity checks are particularly important because the default guaranteed capacity is 100%. If you define two queues, and don't assign a capacity, it will be over.
          Hide
          Sameer Paranjpye added a comment -

          I'm referring to the default values

          Do you mean introducing another batch of config variables? Something like:

          mapred.capacity-scheduler.queue.

          {guaranteed-capacity, reclaim-time-limit, supports-priority, minimum-user-limit-percent}

          which would apply to any queue for which the values weren't specified.

          Show
          Sameer Paranjpye added a comment - I'm referring to the default values Do you mean introducing another batch of config variables? Something like: mapred.capacity-scheduler.queue. {guaranteed-capacity, reclaim-time-limit, supports-priority, minimum-user-limit-percent} which would apply to any queue for which the values weren't specified.
          Hide
          Owen O'Malley added a comment -

          Do you mean introducing another batch of config variables?

          Yes, it is for the queues that don't specify values.

          Show
          Owen O'Malley added a comment - Do you mean introducing another batch of config variables? Yes, it is for the queues that don't specify values.
          Hemanth Yamijala made changes -
          Link This issue blocks HADOOP-4079 [ HADOOP-4079 ]
          Hide
          Sameer Paranjpye added a comment -

          The sanity checks are pretty important. The defaults for queues are nice to have but not essential immediately. So yes they can be introduced in a subsequent JIRA, possibly even after 0.19, since it appears to be a back compatible change.

          Show
          Sameer Paranjpye added a comment - The sanity checks are pretty important. The defaults for queues are nice to have but not essential immediately. So yes they can be introduced in a subsequent JIRA, possibly even after 0.19, since it appears to be a back compatible change.
          Hide
          Owen O'Malley added a comment -

          I have a modification of the patch that adds:
          1. makes a single capacity parameter
          2. makes the capacity a float
          3. requires each capacity be between 0 and 100
          4. requires the sum to be under 100
          5. removes some unneeded imports

          It is working, but one of the unit tests was misconfigured. I'm trying to work through it. Otherwise, I'll get it tomorrow.

          Show
          Owen O'Malley added a comment - I have a modification of the patch that adds: 1. makes a single capacity parameter 2. makes the capacity a float 3. requires each capacity be between 0 and 100 4. requires the sum to be under 100 5. removes some unneeded imports It is working, but one of the unit tests was misconfigured. I'm trying to work through it. Otherwise, I'll get it tomorrow.
          Hide
          Owen O'Malley added a comment -

          Here is the updated patch. I figured out how to fix the test case that was configuring the cluster with 110% of the available capacity.

          Show
          Owen O'Malley added a comment - Here is the updated patch. I figured out how to fix the test case that was configuring the cluster with 110% of the available capacity.
          Owen O'Malley made changes -
          Attachment 3445.11.patch [ 12389908 ]
          Owen O'Malley made changes -
          Attachment 3445.11.patch [ 12389908 ]
          Hide
          Owen O'Malley added a comment -

          I forgot to fix the config and README to reflect the changed configuration parameters.

          Show
          Owen O'Malley added a comment - I forgot to fix the config and README to reflect the changed configuration parameters.
          Owen O'Malley made changes -
          Attachment 3445.12.patch [ 12389909 ]
          Owen O'Malley made changes -
          Fix Version/s 0.19.0 [ 12313211 ]
          Status Patch Available [ 10002 ] Open [ 1 ]
          Hide
          Owen O'Malley added a comment -

          Re-run through hudson.

          Show
          Owen O'Malley added a comment - Re-run through hudson.
          Owen O'Malley made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Hide
          Vivek Ratan added a comment -

          Added a new patch (3445.13.patch).

          • Fixes Owen's modified description of the property 'mapred.capacity-scheduler.queue.default.minimum-user-limit-percent', which is incorrect. I have corrected the description in capacity-scheduler.xml.template and README.
          • fixed a bug in deleting jobs in JobQueuesManager
          Show
          Vivek Ratan added a comment - Added a new patch (3445.13.patch). Fixes Owen's modified description of the property 'mapred.capacity-scheduler.queue.default.minimum-user-limit-percent', which is incorrect. I have corrected the description in capacity-scheduler.xml.template and README. fixed a bug in deleting jobs in JobQueuesManager
          Vivek Ratan made changes -
          Attachment 3445.13.patch [ 12389934 ]
          Hide
          Owen O'Malley added a comment -

          I just committed this.

          Show
          Owen O'Malley added a comment - I just committed this.
          Owen O'Malley made changes -
          Resolution Fixed [ 1 ]
          Hadoop Flags [Reviewed]
          Status Patch Available [ 10002 ] Resolved [ 5 ]
          Robert Chansler made changes -
          Release Note This patch implements a scheduler for Map-Reduce jobs, called Capacity Task Scheduler (or just Capacity Scheduler), which provides a way to share large clusters. The scheduler provides a number of features which are described in its README file.
          Introduced Capacity Task Scheduler.
          Robert Chansler made changes -
          Component/s mapred [ 12310690 ]
          Owen O'Malley made changes -
          Component/s contrib/capacity-sched [ 12312466 ]
          Component/s mapred [ 12310690 ]
          Nigel Daley made changes -
          Status Resolved [ 5 ] Closed [ 6 ]
          Owen O'Malley made changes -
          Component/s contrib/capacity-sched [ 12312466 ]

            People

            • Assignee:
              Vivek Ratan
              Reporter:
              Vivek Ratan
            • Votes:
              0 Vote for this issue
              Watchers:
              19 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development