Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-532

Allow admins of the Capacity Scheduler to set a hard-limit on the capacity of a queue

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.21.0
    • Component/s: capacity-sched
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Provided ability in the capacity scheduler to limit the number of slots that can be concurrently used per queue at any given time.

      Description

      For jobs which call external services, (eg: distcp, crawlers) user/admin should be able to control max parallel tasks spawned. There should be a mechanism to cap the capacity available for a queue/job.

      1. MAPREDUCE-532-1.patch
        17 kB
        rahul k singh
      2. MAPREDUCE-532-2.patch
        16 kB
        rahul k singh
      3. MAPREDUCE-532-3.patch
        19 kB
        rahul k singh
      4. MAPREDUCE-532-4.patch
        33 kB
        rahul k singh
      5. MAPREDUCE-532-5.patch
        28 kB
        rahul k singh
      6. MAPREDUCE-532-6.patch
        28 kB
        Hemanth Yamijala
      7. MAPREDUCE-532-7.patch
        28 kB
        Hemanth Yamijala
      8. MAPREDUCE-532-20.patch
        28 kB
        Hemanth Yamijala

        Activity

        Hide
        Devaraj Das added a comment -

        Does HADOOP-5170 address the requirements?

        Show
        Devaraj Das added a comment - Does HADOOP-5170 address the requirements?
        Hide
        Rajiv Chittajallu added a comment -

        For job limits, Yes. Can we add similar limits per queue?

        mapred.max.maps.per.queue and mapred.max.reduces.per.queue

        Show
        Rajiv Chittajallu added a comment - For job limits, Yes. Can we add similar limits per queue? mapred.max.maps.per.queue and mapred.max.reduces.per.queue
        Hide
        rahul k singh added a comment -

        Problem statement : There should be a option to control no of tasks
        run from the queue at any point of time.There should be mechanism to
        cap the capacity available for a queue/job.

        There are 2 approaches :

        1.Define a configuration variable ,
        "mapred.capacity-scheduler.queue.<queue-name>.max.map.slots". Likewise
        for reduces. This value is the maximum slots that can be used in a
        queue at any point of time. So for example assuming above config value
        is 100 , not more than 100 tasks would be in the queue at any point of
        time, assuming each task takes one slot. Typically the queue capacity
        should be equal to this limit. However, since the queue capacity is
        expressed as a percentage, it is likely to change, for e.g. if nodes
        go down or new ones are added. We were thinking of handling this
        discrepency by capping the queue capacity at the limit if required.
        So, if queue capacity is more than this limit, excess capacity will be
        used by the other queues. If queue capacity is less than the above
        limit , then the limit would be the queue capacity - as in the current
        implementation.

        2.Define a configuration variable ,
        "mapred.capacity-scheduler.queue.<queue-name>.fixedCapacity". This is
        a Boolean variable , once set to true , would make sure that queue
        capacity is the hard limit for the queue. So for example: cluster size
        is 200 , and queue capacity is 10% so hard limit is always 10% of the
        cluster capacity. The problem with this approach is that the limit
        becomes dynamic , that is , if extra nodes are added to the cluster
        hard limit can actually increase . Given the use case this might not
        be desirable.

        For the expressed use case, solution 1 seems more deterministic and
        controlled. Does this work ?

        Note : All the calculations in Capacity scheduler are slot based ,so we
        have been using task and slot interchangeably. If the queue gets high
        RAM jobs, it might hit the limit earlier with fewer tasks. But this
        keeps the implementation simple and easy to follow.

        Show
        rahul k singh added a comment - Problem statement : There should be a option to control no of tasks run from the queue at any point of time.There should be mechanism to cap the capacity available for a queue/job. There are 2 approaches : 1.Define a configuration variable , "mapred.capacity-scheduler.queue.<queue-name>.max.map.slots". Likewise for reduces. This value is the maximum slots that can be used in a queue at any point of time. So for example assuming above config value is 100 , not more than 100 tasks would be in the queue at any point of time, assuming each task takes one slot. Typically the queue capacity should be equal to this limit. However, since the queue capacity is expressed as a percentage, it is likely to change, for e.g. if nodes go down or new ones are added. We were thinking of handling this discrepency by capping the queue capacity at the limit if required. So, if queue capacity is more than this limit, excess capacity will be used by the other queues. If queue capacity is less than the above limit , then the limit would be the queue capacity - as in the current implementation. 2.Define a configuration variable , "mapred.capacity-scheduler.queue.<queue-name>.fixedCapacity". This is a Boolean variable , once set to true , would make sure that queue capacity is the hard limit for the queue. So for example: cluster size is 200 , and queue capacity is 10% so hard limit is always 10% of the cluster capacity. The problem with this approach is that the limit becomes dynamic , that is , if extra nodes are added to the cluster hard limit can actually increase . Given the use case this might not be desirable. For the expressed use case, solution 1 seems more deterministic and controlled. Does this work ? Note : All the calculations in Capacity scheduler are slot based ,so we have been using task and slot interchangeably. If the queue gets high RAM jobs, it might hit the limit earlier with fewer tasks. But this keeps the implementation simple and easy to follow.
        Hide
        Rajiv Chittajallu added a comment -

        I would prefer option 1. When we wan't to rate limit, absolute numbers is always good.

        Show
        Rajiv Chittajallu added a comment - I would prefer option 1. When we wan't to rate limit, absolute numbers is always good.
        Hide
        rahul k singh added a comment -

        This patch still need docs related changes.

        Show
        rahul k singh added a comment - This patch still need docs related changes.
        Hide
        rahul k singh added a comment -

        removing System.out.println statement from the patch

        Show
        rahul k singh added a comment - removing System.out.println statement from the patch
        Hide
        rahul k singh added a comment -

        adding patch with doc changes and new change which solves the user limit problem

        Show
        rahul k singh added a comment - adding patch with doc changes and new change which solves the user limit problem
        Hide
        Hemanth Yamijala added a comment -

        This is looking ok. Some comments (mostly minor):

        • I would prefer the name LIMIT instead of 'CAP' everywhere.
        • The formatting in CapacityTaskScheduler.start() where the new QueueSchedulingInfo is being created seems to be indented in too many lines. Can we fold them ?
        • Methods introduced in TaskSchedulingInfo need not be public.
        • areTasksInQueueOverCap - the getTSI call is repeated enough number of times to call once and cache.
        • Since capacity is in terms of slots, I think we should compare against numSlotsOccupied as opposed to numRunningTasks. This also includes reserved tasktrackers in case we are dealing with high memory jobs.
        • Just to be safe, I would recommend this check is for >=, rather than ==.
        • Documentation of the maxTaskCap variable in capacity scheduler refers to 'map' slots, where it could be both.
        • Currently we display the current # of slots in a queue in the UI. This could be lesser than the % of the cluster capacity configured if the limit parameter is defined and is lower. I think that might be confusing to the user.
        • In the display, can we shorten the name, like maybe "Map Tasks Limit" instead of "Maximum map tasks in a queue at a time :". I also think it may be OK to not have a line separator for the limits, but club them with the Queue configuration section.

        I am still to look at the test cases.

        Show
        Hemanth Yamijala added a comment - This is looking ok. Some comments (mostly minor): I would prefer the name LIMIT instead of 'CAP' everywhere. The formatting in CapacityTaskScheduler.start() where the new QueueSchedulingInfo is being created seems to be indented in too many lines. Can we fold them ? Methods introduced in TaskSchedulingInfo need not be public. areTasksInQueueOverCap - the getTSI call is repeated enough number of times to call once and cache. Since capacity is in terms of slots, I think we should compare against numSlotsOccupied as opposed to numRunningTasks. This also includes reserved tasktrackers in case we are dealing with high memory jobs. Just to be safe, I would recommend this check is for >=, rather than ==. Documentation of the maxTaskCap variable in capacity scheduler refers to 'map' slots, where it could be both. Currently we display the current # of slots in a queue in the UI. This could be lesser than the % of the cluster capacity configured if the limit parameter is defined and is lower. I think that might be confusing to the user. In the display, can we shorten the name, like maybe "Map Tasks Limit" instead of "Maximum map tasks in a queue at a time :". I also think it may be OK to not have a line separator for the limits, but club them with the Queue configuration section. I am still to look at the test cases.
        Hide
        Hemanth Yamijala added a comment -

        Some comments on test cases:

        • Can we please include a short comment for the test cases describing what they do.
        • We are using assert to check for some of the conditions. This does not work if the assert is not enabled. We should use assertNull instead.
        • In all cases, I think it will be good to test the actual task returned, using the checkAssignment method. This keeps the logic of the test easy to understand.
        • We also need a test case for user limits in the face of queue limits I suppose - to ensure the user limits are being computed based on the reduced queue limits.
        • Also, we need a test case in the face of high RAM jobs - to make sure we count reservations for hitting queue limits as well.
        Show
        Hemanth Yamijala added a comment - Some comments on test cases: Can we please include a short comment for the test cases describing what they do. We are using assert to check for some of the conditions. This does not work if the assert is not enabled. We should use assertNull instead. In all cases, I think it will be good to test the actual task returned, using the checkAssignment method. This keeps the logic of the test easy to understand. We also need a test case for user limits in the face of queue limits I suppose - to ensure the user limits are being computed based on the reduced queue limits. Also, we need a test case in the face of high RAM jobs - to make sure we count reservations for hitting queue limits as well.
        Hide
        Arun C Murthy added a comment -

        After discussions I'm changing the direction of this jira to simply allow for a configured hard-limit on the capacity of queues.

        With this feature the consumers of the 'external service' will have to submit to a special queue for the service/resource, there-by limiting the fan-in for the service.

        Show
        Arun C Murthy added a comment - After discussions I'm changing the direction of this jira to simply allow for a configured hard-limit on the capacity of queues. With this feature the consumers of the 'external service' will have to submit to a special queue for the service/resource, there-by limiting the fan-in for the service.
        Hide
        rahul k singh added a comment -

        Implemented the hemanth's changes and also added testcases for high ram jobs and user limit

        Show
        rahul k singh added a comment - Implemented the hemanth's changes and also added testcases for high ram jobs and user limit
        Hide
        Hemanth Yamijala added a comment -

        This is looking good. I have few minor comments:

        • Documentation of TaskSchedulingInfo.setCapacity needs to be updated. It is referring to setting the minimum value.
        • TaskSchedulingInfo.toString() - The remaining count in case maximum task limit is less than capacity should be computed as maxTaskLimit-capacity.
        • Also, we are not displaying the configured task limit anywhere.
        • TaskSchedulingInfo.areTasksInQueueOverLimit - Documentation needs updating as we check for >= and also it is not running tasks, but occupied slots.
        • In testHighMemoryBlockingWithMaxLimit, after the first map task is assigned, we check that no more map tasks are being assigned. This is correct. But just to be sure, we should also check that no reservations are created for the high ram job. One way to check that would be to make sure that checkOccupiedSlots does not change for map slots after the reduce of the second job is scheduled on tt1.
        • testUserLimitsWithMaxLimits has some indentation errors.
        • The 'delta' parameter introduced in checkOccupiedSlots is not very intuitive - can we do two things:
          • define an overloaded method, leaving the first one intact and use that for all the calls where 0,0 is passed as the last two parameters,
          • and also document the parameters usage in the second method. This will make the number of changes very less in the test class.
        Show
        Hemanth Yamijala added a comment - This is looking good. I have few minor comments: Documentation of TaskSchedulingInfo.setCapacity needs to be updated. It is referring to setting the minimum value. TaskSchedulingInfo.toString() - The remaining count in case maximum task limit is less than capacity should be computed as maxTaskLimit-capacity. Also, we are not displaying the configured task limit anywhere. TaskSchedulingInfo.areTasksInQueueOverLimit - Documentation needs updating as we check for >= and also it is not running tasks, but occupied slots. In testHighMemoryBlockingWithMaxLimit, after the first map task is assigned, we check that no more map tasks are being assigned. This is correct. But just to be sure, we should also check that no reservations are created for the high ram job. One way to check that would be to make sure that checkOccupiedSlots does not change for map slots after the reduce of the second job is scheduled on tt1. testUserLimitsWithMaxLimits has some indentation errors. The 'delta' parameter introduced in checkOccupiedSlots is not very intuitive - can we do two things: define an overloaded method, leaving the first one intact and use that for all the calls where 0,0 is passed as the last two parameters, and also document the parameters usage in the second method. This will make the number of changes very less in the test class.
        Hide
        rahul k singh added a comment -

        Applied the suggestion from hemanth

        Show
        rahul k singh added a comment - Applied the suggestion from hemanth
        Hide
        Hemanth Yamijala added a comment -

        The attached patch (MAPREDUCE-532-6.patch) fixes a missed comment about not displaying the slot limit if configured. It also changes the display string about capacities a little bit.

        Show
        Hemanth Yamijala added a comment - The attached patch ( MAPREDUCE-532 -6.patch) fixes a missed comment about not displaying the slot limit if configured. It also changes the display string about capacities a little bit.
        Hide
        Hemanth Yamijala added a comment -

        Results of test-patch:

             [exec] -1 overall.
             [exec]
             [exec]     +1 @author.  The patch does not contain any @author tags.
             [exec]
             [exec]     +1 tests included.  The patch appears to include 3 new or modified tests.
             [exec]
             [exec]     +1 javadoc.  The javadoc tool did not generate any warning messages.
             [exec]
             [exec]     +1 javac.  The applied patch does not increase the total number of javac compiler warnings.
             [exec]
             [exec]     +1 findbugs.  The patch does not introduce any new Findbugs warnings.
             [exec]
             [exec]     -1 Eclipse classpath. The patch causes the Eclipse classpath to differ from the contents of the lib directories.
             [exec]
             [exec]     +1 release audit.  The applied patch does not increase the total number of release audit warnings.
        

        The -1 related to Eclipse classpath is because of a mismatch in ivy jars. I am told this does not need to be worried about.

        The test changes only the capacity scheduler code base, and all unit tests of capacity scheduler pass.

        Show
        Hemanth Yamijala added a comment - Results of test-patch: [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 3 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] -1 Eclipse classpath. The patch causes the Eclipse classpath to differ from the contents of the lib directories. [exec] [exec] +1 release audit. The applied patch does not increase the total number of release audit warnings. The -1 related to Eclipse classpath is because of a mismatch in ivy jars. I am told this does not need to be worried about. The test changes only the capacity scheduler code base, and all unit tests of capacity scheduler pass.
        Hide
        Hemanth Yamijala added a comment -

        The attached patch (MAPREDUCE-532-7.patch) corrects a missed Forrest documentation tag which was causing the docs build to fail. Verified docs are generated properly with this patch.

        Show
        Hemanth Yamijala added a comment - The attached patch ( MAPREDUCE-532 -7.patch) corrects a missed Forrest documentation tag which was causing the docs build to fail. Verified docs are generated properly with this patch.
        Hide
        Hemanth Yamijala added a comment -

        I just committed this to trunk. Thanks, Rahul !

        Show
        Hemanth Yamijala added a comment - I just committed this to trunk. Thanks, Rahul !
        Hide
        Hemanth Yamijala added a comment -

        Attached patch (MAPREDUCE-532-20.patch) is taken to apply against the Yahoo! Hadoop distribution at version 20. This is NOT to be committed externally.

        Show
        Hemanth Yamijala added a comment - Attached patch ( MAPREDUCE-532 -20.patch) is taken to apply against the Yahoo! Hadoop distribution at version 20. This is NOT to be committed externally.
        Hide
        Hudson added a comment -

        Integrated in Hadoop-Mapreduce-trunk #15 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Mapreduce-trunk/15/)

        Show
        Hudson added a comment - Integrated in Hadoop-Mapreduce-trunk #15 (See http://hudson.zones.apache.org/hudson/job/Hadoop-Mapreduce-trunk/15/ )

          People

          • Assignee:
            rahul k singh
            Reporter:
            Rajiv Chittajallu
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development