Hadoop Common
  1. Hadoop Common
  2. HADOOP-4035

Modify the capacity scheduler (HADOOP-3445) to schedule tasks based on memory requirements and task trackers free memory

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.19.0
    • Fix Version/s: 0.20.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Incompatible change, Reviewed
    • Release Note:
      Changed capacity scheduler policy to take note of task memory requirements and task tracker memory availability.

      Description

      HADOOP-3759 introduced configuration variables that can be used to specify memory requirements for jobs, and also modified the tasktrackers to report their free memory. The capacity scheduler in HADOOP-3445 should schedule tasks based on these parameters. A task that is scheduled on a TT that uses more than the default amount of memory per slot can be viewed as effectively using more than one slot, as it would decrease the amount of free memory on the TT by more than the default amount while it runs. The scheduler should make the used capacity account for this additional usage while enforcing limits, etc.

      1. HADOOP-4035-20081202.2.txt
        135 kB
        Hemanth Yamijala
      2. HADOOP-4035-20081202.1.txt
        135 kB
        Hemanth Yamijala
      3. HADOOP-4035-20081202.txt
        135 kB
        Vinod Kumar Vavilapalli
      4. HADOOP-4035-20081128-4.txt
        128 kB
        Vinod Kumar Vavilapalli
      5. HADOOP-4035-20081126.1.txt
        101 kB
        Vinod Kumar Vavilapalli
      6. HADOOP-4035-20081121.txt
        83 kB
        Vinod Kumar Vavilapalli
      7. HADOOP-4035-20081008.txt
        98 kB
        Vinod Kumar Vavilapalli
      8. HADOOP-4035-20081006.1.txt
        89 kB
        Vinod Kumar Vavilapalli
      9. HADOOP-4035-20081006.txt
        75 kB
        Vinod Kumar Vavilapalli
      10. HADOOP-4035-20080918.1.txt
        56 kB
        Vinod Kumar Vavilapalli
      11. 4035.1.patch
        31 kB
        Vinod Kumar Vavilapalli

        Issue Links

          Activity

          Hide
          Hemanth Yamijala added a comment -

          A reasonable assumption to make while computing used capacity is to assume that for all TTs in a cluster, the amount of memory per slot is configured to be the same value. Note that this can be done even if the TTs themselves have different hardware configurations (RAM, CPU slots, etc). This assumption is also reasonable from the perspective that it helps users to easily verify if they need to specify a higher limit for their jobs.

          Based on this assumption, the total number of slots each task is virtually taking is (number of running tasks for the job * the number of slots each task of the job is taking). The latter is a something like (amount of per task memory the job has requested / the memory per slot for the cluster).

          Makes sense ?

          Show
          Hemanth Yamijala added a comment - A reasonable assumption to make while computing used capacity is to assume that for all TTs in a cluster, the amount of memory per slot is configured to be the same value. Note that this can be done even if the TTs themselves have different hardware configurations (RAM, CPU slots, etc). This assumption is also reasonable from the perspective that it helps users to easily verify if they need to specify a higher limit for their jobs. Based on this assumption, the total number of slots each task is virtually taking is (number of running tasks for the job * the number of slots each task of the job is taking). The latter is a something like (amount of per task memory the job has requested / the memory per slot for the cluster). Makes sense ?
          Hide
          Hemanth Yamijala added a comment -

          Opps. Sorry, I messed the description - I meant HADOOP-3759 instead of HADOOP-3749.

          Show
          Hemanth Yamijala added a comment - Opps. Sorry, I messed the description - I meant HADOOP-3759 instead of HADOOP-3749 .
          Hide
          Hemanth Yamijala added a comment -

          Corrected typo in description on JIRA number.

          Show
          Hemanth Yamijala added a comment - Corrected typo in description on JIRA number.
          Hide
          Runping Qi added a comment -

          This issue is related to what is discussed in HADOOP-2776 in the sense that the task tracker should decide how many tasks based on
          the availability of its resources.

          Show
          Runping Qi added a comment - This issue is related to what is discussed in HADOOP-2776 in the sense that the task tracker should decide how many tasks based on the availability of its resources.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Attaching patch that Vivek has written for this issue. I am working on it to take it to completion.

          Show
          Vinod Kumar Vavilapalli added a comment - Attaching patch that Vivek has written for this issue. I am working on it to take it to completion.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Putting up a patch for overall review, though it is not completely polished, doesn't have all possible testcases, and documentation is coarse/incomplete. Am working on these.

          Show
          Vinod Kumar Vavilapalli added a comment - Putting up a patch for overall review, though it is not completely polished, doesn't have all possible testcases, and documentation is coarse/incomplete. Am working on these.
          Hide
          Hemanth Yamijala added a comment -

          Here's a summary of the approach that is implemented in the patch, so we can have a discussion around it.

          There are 2 requirements we are trying to address:

          • When a task is assigned to be run on the tasktracker, the scheduler must ensure that the task's job has a memory requirement that is matched by the free memory on the tasktracker.
          • A user whose job requests for higher resources than usual would decrease the free memory on the tasktracker more than other jobs would. Therefore the user must be 'charged' the additional usage so that he would hit his limits and capacities sooner.

          To handle the first requirement we obviously have to consider what happens when the job in the front of the scheduler's list does not match the tasktracker's memory availability. We considered the following choices:

          • Move on to consider the next job (this can cause starvation ?)
          • Block, and do not look at any other job, subject of course to other limits etc.
          • Some variant of the first option where we move on to consider the next job some configured number of times, and then block.
            Since it is not very clear what would be the right approach, we decided in favor of option 2, which had the advantage that the system behavior is very, very predictable. Of course, this is open for discussion.

          In order to solve the second requirement, we need to match tasks to slots in the scheduler. One simplifying assumption made to achieve this is mentioned in the comments above. We assume that cluster wide, the default memory per slot is a fixed value, and specified in configuration. Note that this does not preclude heterogenity of cluster nodes. By tweaking the maximum memory on the tasktracker and the number of slots, it is possible to have the same value for all nodes in a cluster.

          Using this configuration variable, it is possible to map tasks to slots as follows:
          slots for a job = tasks for a job * (ceil(memory requested by job / configured memory per slot)). All computation of limits and capacities uses this formula for computing slots used / required etc.

          Clearly, this issue requires discussion and consensus. It likely will not make 0.19. But we are hoping we can reach a consensus on the approach.

          Show
          Hemanth Yamijala added a comment - Here's a summary of the approach that is implemented in the patch, so we can have a discussion around it. There are 2 requirements we are trying to address: When a task is assigned to be run on the tasktracker, the scheduler must ensure that the task's job has a memory requirement that is matched by the free memory on the tasktracker. A user whose job requests for higher resources than usual would decrease the free memory on the tasktracker more than other jobs would. Therefore the user must be 'charged' the additional usage so that he would hit his limits and capacities sooner. To handle the first requirement we obviously have to consider what happens when the job in the front of the scheduler's list does not match the tasktracker's memory availability. We considered the following choices: Move on to consider the next job (this can cause starvation ?) Block, and do not look at any other job, subject of course to other limits etc. Some variant of the first option where we move on to consider the next job some configured number of times, and then block. Since it is not very clear what would be the right approach, we decided in favor of option 2, which had the advantage that the system behavior is very, very predictable. Of course, this is open for discussion. In order to solve the second requirement, we need to match tasks to slots in the scheduler. One simplifying assumption made to achieve this is mentioned in the comments above. We assume that cluster wide, the default memory per slot is a fixed value, and specified in configuration. Note that this does not preclude heterogenity of cluster nodes. By tweaking the maximum memory on the tasktracker and the number of slots, it is possible to have the same value for all nodes in a cluster. Using this configuration variable, it is possible to map tasks to slots as follows: slots for a job = tasks for a job * (ceil(memory requested by job / configured memory per slot)). All computation of limits and capacities uses this formula for computing slots used / required etc. Clearly, this issue requires discussion and consensus. It likely will not make 0.19. But we are hoping we can reach a consensus on the approach.
          Hide
          Hemanth Yamijala added a comment -

          We need this for 0.19. The existing HOD set up allows users to submit high RAM jobs. Thus, the capacity scheduler (a HOD replacement) will actually regress this functionality.

          Show
          Hemanth Yamijala added a comment - We need this for 0.19. The existing HOD set up allows users to submit high RAM jobs. Thus, the capacity scheduler (a HOD replacement) will actually regress this functionality.
          Hide
          Owen O'Malley added a comment -

          I think that leaving the slot empty is probably correct. I'd suggest two additions:
          1. If the task requires more memory than any TT, fail it immediately.
          2. If the task requires more memory than the current TT, go to the next job.

          I don't think that forcing all of the slots across the cluster to have the same amount of memory makes sense. I think that taking the physical ram for the TT / # slots on that TT = size of slots on that TT. That would let you calculate the number of slots that would be required on that TT.

          Show
          Owen O'Malley added a comment - I think that leaving the slot empty is probably correct. I'd suggest two additions: 1. If the task requires more memory than any TT, fail it immediately. 2. If the task requires more memory than the current TT, go to the next job. I don't think that forcing all of the slots across the cluster to have the same amount of memory makes sense. I think that taking the physical ram for the TT / # slots on that TT = size of slots on that TT. That would let you calculate the number of slots that would be required on that TT.
          Hide
          Owen O'Malley added a comment -

          How do you deal with map slots versus reduce slots? If you have 8GB of ram, can you run a 6GB map task? Or is 4GB the limit because the other 4GB is for reduces?

          Show
          Owen O'Malley added a comment - How do you deal with map slots versus reduce slots? If you have 8GB of ram, can you run a 6GB map task? Or is 4GB the limit because the other 4GB is for reduces?
          Hide
          Owen O'Malley added a comment -

          Actually, I think it makes a lot more sense to make this act like HADOOP-657 did with disk space. Where we just don't assign tasks if there isn't enough memory (or disk space) left for it to run.

          Show
          Owen O'Malley added a comment - Actually, I think it makes a lot more sense to make this act like HADOOP-657 did with disk space. Where we just don't assign tasks if there isn't enough memory (or disk space) left for it to run.
          Hide
          Hemanth Yamijala added a comment -

          Following an offline discussion with Owen, his proposal was the following:

          • The scheduler assigns a task to a TT only if the amount of free memory reported is greater than the task's requirements.
          • If it doesn't match, we don't move to the next job. That is, we block, thus removing any possible starvation of this job.
          • We don't bother about making this job account for more usage at this point, and handle that problem later, mostly after 0.19.

          Thinking about this, I think the only disadvantage with this approach is that a user who submits a job with high memory requirements could essentially block other users, atleast until his limit is hit.

          So, I would suggest we change the above proposal to not block, but instead move over to the next job. This way, a user with high RAM requirements cannot block other users, and cannot game the system in that way.

          Note that:

          • This is exactly what we do in HADOOP-657 for disk space usage.
          • When we introduce accounting, we can also change the behavior of blocking.

          Can we agree on this ?

          Show
          Hemanth Yamijala added a comment - Following an offline discussion with Owen, his proposal was the following: The scheduler assigns a task to a TT only if the amount of free memory reported is greater than the task's requirements. If it doesn't match, we don't move to the next job. That is, we block, thus removing any possible starvation of this job. We don't bother about making this job account for more usage at this point, and handle that problem later, mostly after 0.19. Thinking about this, I think the only disadvantage with this approach is that a user who submits a job with high memory requirements could essentially block other users, atleast until his limit is hit. So, I would suggest we change the above proposal to not block, but instead move over to the next job. This way, a user with high RAM requirements cannot block other users, and cannot game the system in that way. Note that: This is exactly what we do in HADOOP-657 for disk space usage. When we introduce accounting, we can also change the behavior of blocking. Can we agree on this ?
          Hide
          Owen O'Malley added a comment -

          I think it is important that we not starve jobs, so I think we should not take the next job's task, if the current job's task doesn't fit. I've also filed HADOOP-4306 to change the disk space monitoring the same way.

          It probably makes sense in the future, to have a special case if the TaskTracker is job's task will never fit on this TaskTracker and let other low priority jobs use it. But, it should be a different jira.

          Show
          Owen O'Malley added a comment - I think it is important that we not starve jobs, so I think we should not take the next job's task, if the current job's task doesn't fit. I've also filed HADOOP-4306 to change the disk space monitoring the same way. It probably makes sense in the future, to have a special case if the TaskTracker is job's task will never fit on this TaskTracker and let other low priority jobs use it. But, it should be a different jira.
          Hide
          Hemanth Yamijala added a comment -

          The consensus on the implementation is as follows:

          • Scheduler will assign tasks to a TT only if the amount of free memory is greater than the task's requirements.
          • If the memory requirements don't match, we do NOT move to the next job.
          • We are not mapping tasks to slots at this point.

          We understand point 3 means that the system is less fair than it should (because users of high RAM jobs can cause more slots to go free than other jobs), but in the interest of keeping things simple, we will follow this approach for Hadoop 0.19, and take it up in the future.

          Note that if HADOOP-4306 is addressed, we will be handling scheduling w.r.to disk and memory uniformly.

          Show
          Hemanth Yamijala added a comment - The consensus on the implementation is as follows: Scheduler will assign tasks to a TT only if the amount of free memory is greater than the task's requirements. If the memory requirements don't match, we do NOT move to the next job. We are not mapping tasks to slots at this point. We understand point 3 means that the system is less fair than it should (because users of high RAM jobs can cause more slots to go free than other jobs), but in the interest of keeping things simple, we will follow this approach for Hadoop 0.19, and take it up in the future. Note that if HADOOP-4306 is addressed, we will be handling scheduling w.r.to disk and memory uniformly.
          Hide
          dhruba borthakur added a comment -

          >A reasonable assumption to make while computing used capacity is to assume that for all TTs in a cluster, the amount of memory per slot is configured to be the same value

          I am a little confused about the above statement. It is possible to have two different types of machine in the same cluster.... the only difference being the amount if memory on these types. Since the CPU capacity is the same, I would ideally configure both types of machines to have the same number of slots. However, the memory capacity per slot on one type of machine would be larger than the memory capacity per slot of the other type of machine. It would be nice if the JT/TT can compute the memory capacity per slot and then schedule tasks accordingly.

          Also, the JT scheduler can generate more affinity of reduce tasks to slots with larger memory-capacity-per-slot because reduce tasks possibly take more memory than map tasks.

          Show
          dhruba borthakur added a comment - >A reasonable assumption to make while computing used capacity is to assume that for all TTs in a cluster, the amount of memory per slot is configured to be the same value I am a little confused about the above statement. It is possible to have two different types of machine in the same cluster.... the only difference being the amount if memory on these types. Since the CPU capacity is the same, I would ideally configure both types of machines to have the same number of slots. However, the memory capacity per slot on one type of machine would be larger than the memory capacity per slot of the other type of machine. It would be nice if the JT/TT can compute the memory capacity per slot and then schedule tasks accordingly. Also, the JT scheduler can generate more affinity of reduce tasks to slots with larger memory-capacity-per-slot because reduce tasks possibly take more memory than map tasks.
          Hide
          Hemanth Yamijala added a comment -

          It is possible to have two different types of machine in the same cluster.... the only difference being the amount if memory on these types. Since the CPU capacity is the same, I would ideally configure both types of machines to have the same number of slots.

          Dhruba, agreed. This was also indicated in Owen's comments above. So, this assumption is no longer valid. We had this assumption to help us map tasks to slots easily. This in turn was to meet the 2nd requirement I'd put up above:

          A user whose job requests for higher resources than usual would decrease the free memory on the tasktracker more than other jobs would. Therefore the user must be 'charged' the additional usage so that he would hit his limits and capacities sooner.

          But as I described above, we would like to keep things simple, and not do this mapping for now. It is a little less fair, but we can try out how it works.

          It would be nice if the JT/TT can compute the memory capacity per slot and then schedule tasks accordingly.

          HADOOP-3759 laid down the framework for the TT to do this. This JIRA will address the scheduling aspect.

          Also, the JT scheduler can generate more affinity of reduce tasks to slots with larger memory-capacity-per-slot because reduce tasks possibly take more memory than map tasks.

          We do not differentiate specifications between memory requirements of map and reduce slots currently. Does this seem vital to have ?

          Show
          Hemanth Yamijala added a comment - It is possible to have two different types of machine in the same cluster.... the only difference being the amount if memory on these types. Since the CPU capacity is the same, I would ideally configure both types of machines to have the same number of slots. Dhruba, agreed. This was also indicated in Owen's comments above. So, this assumption is no longer valid. We had this assumption to help us map tasks to slots easily. This in turn was to meet the 2nd requirement I'd put up above: A user whose job requests for higher resources than usual would decrease the free memory on the tasktracker more than other jobs would. Therefore the user must be 'charged' the additional usage so that he would hit his limits and capacities sooner. But as I described above, we would like to keep things simple, and not do this mapping for now. It is a little less fair, but we can try out how it works. It would be nice if the JT/TT can compute the memory capacity per slot and then schedule tasks accordingly. HADOOP-3759 laid down the framework for the TT to do this. This JIRA will address the scheduling aspect. Also, the JT scheduler can generate more affinity of reduce tasks to slots with larger memory-capacity-per-slot because reduce tasks possibly take more memory than map tasks. We do not differentiate specifications between memory requirements of map and reduce slots currently. Does this seem vital to have ?
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Attaching a patch.

          • Changed CapacityTaskScheduler and the default scheduler to accept highRAM jobs. The cluster will be blocked till the job at the head of the queue is served. This is according to the above proposal.
          • Modified TaskTrackerStatus to report free memory for map tasks and for reduce tasks separately, and the total memory available to the TT is distributed between map tasks and reduce tasks in the ratio of their slots. This is needed because we don't want map tasks to use memory allocated to reduce tasks and vice versa.
          • Jobs that cannot be run on any TT will be killed on the first heartbeat of any TT.
            • Jobs cannot run on any TT for two reasons - 1) no TT in the cluster can serve the job's tasks because of the job's very high memory requirements, 2) there were TTs in the cluster that could run the job, but after the job has started, ALL of these TTs have gone down. We need to kill these jobs because they would otherwise block the whole cluster for ever.
            • Maximum size of job that is allowed in the cluster is determined by going through the list of all TTs alive, and seeing the biggest job size that can be supported. Note that this is done on every time assignedTasks is called (i.e. there's a free slot on a TT), but this can be bettered if we have a TaskTrackerListener interface which will tell us precisely when new TTs get added to the cluster or when old TTs expire.
          • Added test-cases for both schedulers testing high-RAM job requirements.
          • Changed CapacityTaskScheduler.TaskSchedulingMgr.Type to an enum instead of a string. It's more convenient this way.
          • Changed a couple of log statements so that they are more helpful in debugging.

          TestHighRAMJobs needs a major rewrite because of the separate free memory values for map and reduce tasks. It doesn't even compile now; will work on that. Negative values for memory on TT or in job may adversely effect the working of this patch. Will investigate on that and see if it needs some fix. This patch can be reviewed irrespective of these two (side) issues, they will add code but won't change what is already done by the patch.

          Show
          Vinod Kumar Vavilapalli added a comment - Attaching a patch. Changed CapacityTaskScheduler and the default scheduler to accept highRAM jobs. The cluster will be blocked till the job at the head of the queue is served. This is according to the above proposal. Modified TaskTrackerStatus to report free memory for map tasks and for reduce tasks separately, and the total memory available to the TT is distributed between map tasks and reduce tasks in the ratio of their slots. This is needed because we don't want map tasks to use memory allocated to reduce tasks and vice versa. Jobs that cannot be run on any TT will be killed on the first heartbeat of any TT. Jobs cannot run on any TT for two reasons - 1) no TT in the cluster can serve the job's tasks because of the job's very high memory requirements, 2) there were TTs in the cluster that could run the job, but after the job has started, ALL of these TTs have gone down. We need to kill these jobs because they would otherwise block the whole cluster for ever. Maximum size of job that is allowed in the cluster is determined by going through the list of all TTs alive, and seeing the biggest job size that can be supported. Note that this is done on every time assignedTasks is called (i.e. there's a free slot on a TT), but this can be bettered if we have a TaskTrackerListener interface which will tell us precisely when new TTs get added to the cluster or when old TTs expire. Added test-cases for both schedulers testing high-RAM job requirements. Changed CapacityTaskScheduler.TaskSchedulingMgr.Type to an enum instead of a string. It's more convenient this way. Changed a couple of log statements so that they are more helpful in debugging. TestHighRAMJobs needs a major rewrite because of the separate free memory values for map and reduce tasks. It doesn't even compile now; will work on that. Negative values for memory on TT or in job may adversely effect the working of this patch. Will investigate on that and see if it needs some fix. This patch can be reviewed irrespective of these two (side) issues, they will add code but won't change what is already done by the patch.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          The separate treatment of free memory for maps and for reduces makes it easy for us to extend the job configuration to be different for maps and reduces. It is an easy extension from here. But I don't know for sure whether this is a requirement or not, or if it is a requirement, whether we wish to do it in this patch or not.

          Show
          Vinod Kumar Vavilapalli added a comment - The separate treatment of free memory for maps and for reduces makes it easy for us to extend the job configuration to be different for maps and reduces. It is an easy extension from here. But I don't know for sure whether this is a requirement or not, or if it is a requirement, whether we wish to do it in this patch or not.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Attaching a new patch. This

          • Fixes TestHighRAMJobs
          • Fixes TestLimitTasksPerJobTaskScheduler. Disables memory related tests for LimitTasksPerJobTaskScheduler as we are not supporting high RAM jobs for that.
          • Fixes visibility of a few methods.
          Show
          Vinod Kumar Vavilapalli added a comment - Attaching a new patch. This Fixes TestHighRAMJobs Fixes TestLimitTasksPerJobTaskScheduler. Disables memory related tests for LimitTasksPerJobTaskScheduler as we are not supporting high RAM jobs for that. Fixes visibility of a few methods.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Running it through Hudson..

          Show
          Vinod Kumar Vavilapalli added a comment - Running it through Hudson..
          Hide
          Vinod Kumar Vavilapalli added a comment -

          To kill an invalid job, the patch uses JobInProgress.kill() which will just initiate the killing but won't finalize jobStatus as killed. It waits for a cleanUp task to be scheduled and run to completion(successfully or not) before marking the jobStatus as killed.

          So, this issue is blocked by HADOOP-4236 in that waiting jobs which warrant huge invalid memory requirements are marked for killing but they never get killed as they are in PREP state and so don't have cleanUp tasks.

          Show
          Vinod Kumar Vavilapalli added a comment - To kill an invalid job, the patch uses JobInProgress.kill() which will just initiate the killing but won't finalize jobStatus as killed. It waits for a cleanUp task to be scheduled and run to completion(successfully or not) before marking the jobStatus as killed. So, this issue is blocked by HADOOP-4236 in that waiting jobs which warrant huge invalid memory requirements are marked for killing but they never get killed as they are in PREP state and so don't have cleanUp tasks.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          I am also blocking this by HADOOP-4287 as we depend on the correctness of pending tasks' calculation.

          Show
          Vinod Kumar Vavilapalli added a comment - I am also blocking this by HADOOP-4287 as we depend on the correctness of pending tasks' calculation.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          New patch.

          • Introducing boolean configuration property mapred.jobtracker.taskScheduler.memory-based-scheduling.enabled to enable/disable scheduling based on job's memory requirements. Defaults to false(feature turned off).
          • Added test-cases to test the above.
          • HADOOP-4261 made JobInProgress.terminateJob package private. Using it to kill jobs that are not yet inited. HADOOP-4236 will eventually fix this to use a single kill job api.
          Show
          Vinod Kumar Vavilapalli added a comment - New patch. Introducing boolean configuration property mapred.jobtracker.taskScheduler.memory-based-scheduling.enabled to enable/disable scheduling based on job's memory requirements. Defaults to false(feature turned off). Added test-cases to test the above. HADOOP-4261 made JobInProgress.terminateJob package private. Using it to kill jobs that are not yet inited. HADOOP-4236 will eventually fix this to use a single kill job api.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Running it through Hudson. It didn't even pick up the patch the first time around.

          Show
          Vinod Kumar Vavilapalli added a comment - Running it through Hudson. It didn't even pick up the patch the first time around.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          HADOOP-4236 will eventually fix this to use a single kill job api.

          Blocking this JIRA by HADOOP-4236.

          Show
          Vinod Kumar Vavilapalli added a comment - HADOOP-4236 will eventually fix this to use a single kill job api. Blocking this JIRA by HADOOP-4236 .
          Hide
          Hemanth Yamijala added a comment -

          Some comments:

          JobConf:

          • I think it is OK to expose whether memory based scheduling is enabled as an API.

          CapacityTaskScheduler:

          • jobFitsOnTT: if job has not requested for any memory, we promise it atleast defaultMemoryPerSlot on TT. So, I think this method should still check for that part.
          • Since we already have a map/reduce based TaskSchedulingMgr, can we implement jobFitsOnTT to not have checks based on whether it's map or reduce task ? One way to do that would be to define an abstract getFreeVirtualMemoryForTask() in TaskSchedulingMgr and implement it in the MapSchedulingMgr to return resourceStatus.getFreeVirtualMemoryForMaps() and so on.
          • InAdequateResourcesException should be InadequateResourcesException. Does it need to extend IOException ?
          • updateResourcesInformation: If for any one TT there is DISABLED_VIRTUAL_MEMORY_LIMIT, we don't need to proceed in the loop - a small optimization ?
          • Also, this need not be done if memory management is disabled.
          • jip.isKillInProgress() – I think this is going to be changed. Will this trigger jobCompleted events ? This should be checked with the solution of HADOOP-4053.
          • Can we somehow avoid duplicating the following code between CapacityTaskScheduler and JobQueueTaskScheduler:
            • jobFitsOnTT
            • updateResourcesInformation()
            • killing of jobs
              It is significant logic and avoiding code duplication might help.

          I need to review the changes to the testcases still.

          Show
          Hemanth Yamijala added a comment - Some comments: JobConf: I think it is OK to expose whether memory based scheduling is enabled as an API. CapacityTaskScheduler: jobFitsOnTT : if job has not requested for any memory, we promise it atleast defaultMemoryPerSlot on TT. So, I think this method should still check for that part. Since we already have a map/reduce based TaskSchedulingMgr , can we implement jobFitsOnTT to not have checks based on whether it's map or reduce task ? One way to do that would be to define an abstract getFreeVirtualMemoryForTask() in TaskSchedulingMgr and implement it in the MapSchedulingMgr to return resourceStatus.getFreeVirtualMemoryForMaps() and so on. InAdequateResourcesException should be InadequateResourcesException . Does it need to extend IOException ? updateResourcesInformation : If for any one TT there is DISABLED_VIRTUAL_MEMORY_LIMIT, we don't need to proceed in the loop - a small optimization ? Also, this need not be done if memory management is disabled. jip.isKillInProgress() – I think this is going to be changed. Will this trigger jobCompleted events ? This should be checked with the solution of HADOOP-4053 . Can we somehow avoid duplicating the following code between CapacityTaskScheduler and JobQueueTaskScheduler : jobFitsOnTT updateResourcesInformation() killing of jobs It is significant logic and avoiding code duplication might help. I need to review the changes to the testcases still.
          Hide
          Owen O'Malley added a comment -

          This patch is a big risky patch that isn't a bug fix and thus I don't think should go into 0.19.

          Show
          Owen O'Malley added a comment - This patch is a big risky patch that isn't a bug fix and thus I don't think should go into 0.19.
          Hide
          Hemanth Yamijala added a comment -

          The test cases look good. Just minor nits:

          • The StringUtils import is never used in TestCapacityScheduler
          • Changes to testUserLimits4 are unrelated to this change. I think if it should be improved, it should be done in a separate JIRA.
          Show
          Hemanth Yamijala added a comment - The test cases look good. Just minor nits: The StringUtils import is never used in TestCapacityScheduler Changes to testUserLimits4 are unrelated to this change. I think if it should be improved, it should be done in a separate JIRA.
          Hide
          Amareshwari Sriramadasu added a comment -

          There is code such as in the patch:
          + if (!jip.isKillInProgress())

          { + jip.kill(); + }

          Here, you need not check killingInProgress for the job, because a kill while killingInProgress will be no-op. And also if the user has to see the failure as framework killing the job, it should be jip.fail() (sothat job state is FAILED).

          Show
          Amareshwari Sriramadasu added a comment - There is code such as in the patch: + if (!jip.isKillInProgress()) { + jip.kill(); + } Here, you need not check killingInProgress for the job, because a kill while killingInProgress will be no-op. And also if the user has to see the failure as framework killing the job, it should be jip.fail() (sothat job state is FAILED).
          Hide
          Hemanth Yamijala added a comment -

          This patch is a big risky patch that isn't a bug fix and thus I don't think should go into 0.19.

          While working on this patch, we found out that we need to report free memory per number of map slots and reduce slots as two separate variables from the task tracker as part of the ResourceStatus object in TaskTrackerStatus. I think at least this must get into Hadoop 0.19, as otherwise, there may be backwards compatibility issues that come up later. Also, some documentation describing the feature should be removed, in order to not confuse users about the availability of the feature.

          Does this make sense ?

          Show
          Hemanth Yamijala added a comment - This patch is a big risky patch that isn't a bug fix and thus I don't think should go into 0.19. While working on this patch, we found out that we need to report free memory per number of map slots and reduce slots as two separate variables from the task tracker as part of the ResourceStatus object in TaskTrackerStatus . I think at least this must get into Hadoop 0.19, as otherwise, there may be backwards compatibility issues that come up later. Also, some documentation describing the feature should be removed, in order to not confuse users about the availability of the feature. Does this make sense ?
          Hide
          Hemanth Yamijala added a comment -

          After a detailed offline conversation with Owen, we decided to not do this for Hadoop 0.19. However, we also reviewed the current solution, and where we want to go, and filed HADOOP-4439 to help us get there easily in future.

          Show
          Hemanth Yamijala added a comment - After a detailed offline conversation with Owen, we decided to not do this for Hadoop 0.19. However, we also reviewed the current solution, and where we want to go, and filed HADOOP-4439 to help us get there easily in future.
          Hide
          Vivek Ratan added a comment -

          Here's a summary of what this patch should be doing.

          TTs report the amount of free memory available on their node (as described in HADOOP-3759 and HADOOP-4439), which is equal to the total VM assigned for Hadoop tasks on that node (mapred.tasktracker.tasks.maxmemory) minus the VM guaranteed to the tasks that are already running. The CapacityScheduler looks at this free memory to decide if it can run a task, which has its own memory needs. If the task requires more memory than is available on the TT, the Scheduler returns nothing to the TT (thus forcing it to finish up what it is running and eventually having enough free memory).

          In order to make sure that no job asks for memory that is more than what a TT has available, we should have a cluster-wide limit on the amount of VM a job can ask for its tasks. If this limit is set, and a job asks for too much, the job should not be accepted by the JT in submitJob(). If the limit is not set, jobs cannot be rejected based on their memory requirements.

          Show
          Vivek Ratan added a comment - Here's a summary of what this patch should be doing. TTs report the amount of free memory available on their node (as described in HADOOP-3759 and HADOOP-4439 ), which is equal to the total VM assigned for Hadoop tasks on that node ( mapred.tasktracker.tasks.maxmemory ) minus the VM guaranteed to the tasks that are already running. The CapacityScheduler looks at this free memory to decide if it can run a task, which has its own memory needs. If the task requires more memory than is available on the TT, the Scheduler returns nothing to the TT (thus forcing it to finish up what it is running and eventually having enough free memory). In order to make sure that no job asks for memory that is more than what a TT has available, we should have a cluster-wide limit on the amount of VM a job can ask for its tasks. If this limit is set, and a job asks for too much, the job should not be accepted by the JT in submitJob(). If the limit is not set, jobs cannot be rejected based on their memory requirements.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          [....] the job should not be accepted by the JT in submitJob(). If the limit is not set, jobs cannot be rejected based on their memory requirements.

          Irrespective of the method to specify the maximum limits on job requirements to prevent gaming of the system, either via a configuration or by calculating it dynamically, the job rejection should not be done by JT, but by our scheduler itself. Clearly, this maximum limit is owned by our scheduler.

          Show
          Vinod Kumar Vavilapalli added a comment - [....] the job should not be accepted by the JT in submitJob(). If the limit is not set, jobs cannot be rejected based on their memory requirements. Irrespective of the method to specify the maximum limits on job requirements to prevent gaming of the system, either via a configuration or by calculating it dynamically, the job rejection should not be done by JT, but by our scheduler itself. Clearly, this maximum limit is owned by our scheduler.
          Hide
          Owen O'Malley added a comment -

          Now that we are doing this for 0.20, the memory configuration should be changed to be a fraction for physical RAM. Ie. mapred.tasktracker.memory.max-fraction which takes a float and computes the maximum. Operations teams don't want to specify the memory on each machine. They want to have a global configuration for all of the slaves.

          It is important to report this total to the JT, so that it can be displayed and/or used by the scheduler. If you want to include the redundant information about the allocated information that is ok. But the max available should be sent too.

          Defining a cluster configurable limit for the max memory for any task is fine.

          Show
          Owen O'Malley added a comment - Now that we are doing this for 0.20, the memory configuration should be changed to be a fraction for physical RAM. Ie. mapred.tasktracker.memory.max-fraction which takes a float and computes the maximum. Operations teams don't want to specify the memory on each machine. They want to have a global configuration for all of the slaves. It is important to report this total to the JT, so that it can be displayed and/or used by the scheduler. If you want to include the redundant information about the allocated information that is ok. But the max available should be sent too. Defining a cluster configurable limit for the max memory for any task is fine.
          Hide
          dhruba borthakur added a comment -

          Hi Owen, If the cluster has machines with different memory sizes (e.g. some TTs have 8GB memory while other TTs have 16GB), then specifying a memory percentage might not work well.

          Show
          dhruba borthakur added a comment - Hi Owen, If the cluster has machines with different memory sizes (e.g. some TTs have 8GB memory while other TTs have 16GB), then specifying a memory percentage might not work well.
          Hide
          Owen O'Malley added a comment -

          That is exactly the case that I'm worried about. It will be a pain to have some configs that say 9G and some that say 17G. It would be easier to configure 1.1 and let the TT scale it to the appropriate amount of ram.

          Show
          Owen O'Malley added a comment - That is exactly the case that I'm worried about. It will be a pain to have some configs that say 9G and some that say 17G. It would be easier to configure 1.1 and let the TT scale it to the appropriate amount of ram.
          Hide
          Matei Zaharia added a comment -

          Maybe we can specify an amount of memory to leave free (e.g. 512 MB), rather than a percentage?

          Show
          Matei Zaharia added a comment - Maybe we can specify an amount of memory to leave free (e.g. 512 MB), rather than a percentage?
          Hide
          Owen O'Malley added a comment -

          I guess that would work, but in general it works better if we have ratios instead since they automatically scale as hardware improves. It would also need to support negative values, if you want to allocate more than ram (obviously into swap or assuming that the tasks won't actually use it). I suspect that in most cases, it would be negative, which seems less user-friendly.

          Show
          Owen O'Malley added a comment - I guess that would work, but in general it works better if we have ratios instead since they automatically scale as hardware improves. It would also need to support negative values, if you want to allocate more than ram (obviously into swap or assuming that the tasks won't actually use it). I suspect that in most cases, it would be negative, which seems less user-friendly.
          Hide
          Vivek Ratan added a comment -

          In HADOOP-4523, in my first comment, I've suggested the following, based on feedback from some of our Ops folks:

          Another improvement is to let mapred.tasktracker.tasks.maxmemory be set by an external script, which lets Ops control what this value should be. A slightly less desirable option, as indicated in some offline discussions with Allen W, is to set this value to be an absolute number ("hadoop may use X amount") or an offset of the total amount of memory on the machine ("hadoop may use all but 4g").

          Note that this value is suggested to be linked to total VM on the machine, not RAM.

          Show
          Vivek Ratan added a comment - In HADOOP-4523 , in my first comment, I've suggested the following, based on feedback from some of our Ops folks: Another improvement is to let mapred.tasktracker.tasks.maxmemory be set by an external script, which lets Ops control what this value should be. A slightly less desirable option, as indicated in some offline discussions with Allen W, is to set this value to be an absolute number ("hadoop may use X amount") or an offset of the total amount of memory on the machine ("hadoop may use all but 4g"). Note that this value is suggested to be linked to total VM on the machine, not RAM.
          Hide
          Allen Wittenauer added a comment -

          > I guess that would work, but in general it works better if we have ratios instead since they automatically
          > scale as hardware improves.

          I disagree. Matei is right on.

          This value needs to be an offset of the total amount of memory on the machine ("hadoop may use all but 4g"). Percentages don't really work well here because any ops team worth its salt knows exactly how much they need to reserve for its own stuff, the OS, monitoring probes, etc., all the background processes that sort of run in the background as noise. That size is almost guaranteed to be a constant on similar gear with the same OS. [.. and if one is doing a radically heterogeneous cluster, they've got other problems besides this one!]

          Setting this to a percentage is actually going to leave memory on the table. In our real-world grids, every rack has a node with 16g phys ram in it with the rest of the nodes being 8g phys ram. All nodes have 24g of swap. So our numbers are 32g and 40g. Setting this to 87% (I think?--the fact that I'm not sure I have this value right should be another hint!) to reserve 4g of VM means that we lose 2g of mem that could be available to Hadoop on the 16g RAM nodes!

          Show
          Allen Wittenauer added a comment - > I guess that would work, but in general it works better if we have ratios instead since they automatically > scale as hardware improves. I disagree. Matei is right on. This value needs to be an offset of the total amount of memory on the machine ("hadoop may use all but 4g"). Percentages don't really work well here because any ops team worth its salt knows exactly how much they need to reserve for its own stuff, the OS, monitoring probes, etc., all the background processes that sort of run in the background as noise. That size is almost guaranteed to be a constant on similar gear with the same OS. [.. and if one is doing a radically heterogeneous cluster, they've got other problems besides this one!] Setting this to a percentage is actually going to leave memory on the table. In our real-world grids, every rack has a node with 16g phys ram in it with the rest of the nodes being 8g phys ram. All nodes have 24g of swap. So our numbers are 32g and 40g. Setting this to 87% (I think?--the fact that I'm not sure I have this value right should be another hint!) to reserve 4g of VM means that we lose 2g of mem that could be available to Hadoop on the 16g RAM nodes!
          Hide
          dhruba borthakur added a comment -

          > dhruba borthakur - 27/Oct/08 10:02 AM Hi Owen, If the cluster has machines with different memory sizes (e.g. some TTs have 8GB
          > memory while other TTs have 16GB), then specifying a memory percentage might not work well.

          I too agree with Allen. If machines have different memory sizes, then specifying a percentage of memory might not work very well.

          Show
          dhruba borthakur added a comment - > dhruba borthakur - 27/Oct/08 10:02 AM Hi Owen, If the cluster has machines with different memory sizes (e.g. some TTs have 8GB > memory while other TTs have 16GB), then specifying a memory percentage might not work well. I too agree with Allen. If machines have different memory sizes, then specifying a percentage of memory might not work very well.
          Hide
          Owen O'Malley added a comment -

          I guess I'm ok with it as a delta from total virtual memory, although how to detect the virtual memory in a generic manner is an interesting question. Maybe as I proposed over in HADOOP-4523, we need a plugin that could provide OS-specific/site functionality.

          Note that if we are using virtual memory, then we absolutely need a different configuration for the amount of virtual memory that we'd like to schedule to. We do not want the scheduler to put 4 10G tasks on a machine with 8G ram and 32G swap. That number should be based on RAM. So, I'd propose that we extend the plugin interface as:

          public abstract class MemoryPlugin {
            public abstract long getVirtualMemorySize(Configuration conf);
            public abstract long getRamSize(Configuration conf);
          }
          

          I'd propose that these values be the real values and that we have a configured offset for both values.

          mapred.tasktracker.virtualmemory.reserved (subtracted off of virtual memory)
          mapred.tasktracker.memory.reserved (subtracted off of physical ram, before reporting to JT)

          Jobs should then define a soft and hard limit for their memory usage. If a task goes over the hard limit, it should be killed immediately.

          The scheduler should only allocate tasks if
          sum(soft limits of tasks) <= TT ram
          sum(hard limits of tasks) <= TT virtual memory

          Thoughts?

          Show
          Owen O'Malley added a comment - I guess I'm ok with it as a delta from total virtual memory, although how to detect the virtual memory in a generic manner is an interesting question. Maybe as I proposed over in HADOOP-4523 , we need a plugin that could provide OS-specific/site functionality. Note that if we are using virtual memory, then we absolutely need a different configuration for the amount of virtual memory that we'd like to schedule to. We do not want the scheduler to put 4 10G tasks on a machine with 8G ram and 32G swap. That number should be based on RAM. So, I'd propose that we extend the plugin interface as: public abstract class MemoryPlugin { public abstract long getVirtualMemorySize(Configuration conf); public abstract long getRamSize(Configuration conf); } I'd propose that these values be the real values and that we have a configured offset for both values. mapred.tasktracker.virtualmemory.reserved (subtracted off of virtual memory) mapred.tasktracker.memory.reserved (subtracted off of physical ram, before reporting to JT) Jobs should then define a soft and hard limit for their memory usage. If a task goes over the hard limit, it should be killed immediately. The scheduler should only allocate tasks if sum(soft limits of tasks) <= TT ram sum(hard limits of tasks) <= TT virtual memory Thoughts?
          Hide
          Vivek Ratan added a comment -

          Since the issue of dealing with memory-intensive and badly behaved jobs has spanned more than one Jira, here's the latest summary on the overall proposal (following some offline discussions).

          The problem, as stated originally in HADOOP-3581, is that certain badly-behaved jobs end up using too much memory on a node and can bring down that node. We need to prevent this. A related requirement, as described in HADOOP-3759, is that the system respects different, and legitimate, memory requirements of different jobs.

          There are two independent parts to solving this problem: monitoring and scheduling. Let's look at monitoring first.

          Monitoring
          ----------------

          We want to ensure that the sum total of virtual memory (VM) usage by all tasks does not go over a limit (call this the max-VM-per-node limit). That's really what brings down a machine. To detect badly behaved jobs, we want to associate a limit with each task (call this the max-VM-per-task limit) such that a task is considered badly behaved if its VM usage goes over this limit. Think of the max-VM-per-task limit as a kill limit. A TT monitors each task for its memory usage (this includes the memory used by the task's descendants). If a task's memory usage goes over its max-VM-per-task limit, that task is killed. This monitoring has been implemented in HADOOP-3581. In addition, a TT monitors the total memory usage of all tasks spawned by the TT. If this value goes over the max-VM-per-node limit, the TT needs to kill one or more tasks. As a simple solution, the TT can kill one or more tasks that started most recently. This approach has been suggested in HADOOP-4523. Tasks that are killed because they went over their memory limit should be treated as failed, since they violated their contract. Tasks that are killed because the sum total of memory usage was over a limit should be treated as killed, since it's not really their fault.

          How do we specify these limits?

          • for max-VM-per-node: HADOOP-3581 provides a config option, mapred.tasktracker.tasks.maxmemory , which acts as the max-VM-per-node limit. As per discussions in this Jira, and in HADOOP-4523, this needs to be enhanced. mapred.tasktracker.tasks.maxmemory should be replaced by mapred.tasktracker.virtualmemory.reserved, which indicates an offset (in MB?). max-VM-per-node is then the total VM on the machine, minus this offset. How do we get the total VM on the machine? This can be done by the plugin interface that Owen proposed earlier.
          • for max-VM-per-task: HADOOP-3759 and HADOOP-4439 define a cluster-wide configuration, mapred.task.default.maxmemory, that describes the default maximum VM associated per task. Rename it to mapred.task.default.maxvm for consistency. This is the default max-VM-per-task limit associated with a task. To support jobs that need higher or lower limits, this value can be overridden by individual jobs. A job can set a config value, mapred.task.maxvm, which overrides mapred.task.default.maxvm for all tasks for that job.
          • Furthermore, as described earlier in this Jira, we want to prevent users from setting mapred.task.maxvm to an arbitrarily high number and thus gaming the system. To do this, there should be a cluster-wide setting, mapred.task.limit.maxvm, that limits the value of mapred.task.maxvm. If mapred.task.maxvm is set to a value higher than mapred.task.limit.maxvm, the job should not run. Either this check can be done in the JT when a job is submitted, or a scheduler can fail the job if it detects this situation.

          Note that the monitoring process can be disabled if mapred.tasktracker.virtualmemory.reserved is not present, or has some default negative value.

          Scheduling
          -----------------

          In order to prevent tasks using too much memory, a scheduler can ensure that it limits the number of tasks running on a node based on how much free memory is available and how much a task needs. The Capacity Scheduler will do this, though we cannot enforce all schedulers to support this feature. As per HADOOP-3759, TTs report, in each heartbeat, how much free VM they have (which is equal to max-VM-per-node minus the sum of max-VM-per-task for each running task). The Capacity Scheduler needs to ensure that:

          1. there is enough VM for a new task to run. This it does by comparing the task's requirement (its max-VM-per-task limit) to the free VM available in the TT.
          2. there is enough RAM available for a task so that there is not a lot of page swapping and thrashing when tasks run. This is much harder to figure out and it's not even clear what it means to have 'enough RAM available' for a task. A simple proposal, to get us started, is to assume a fraction of the max-VM-per-task limit as the 'RAM limit' for a task. Call this the max-RAM-per-task limit, and think of it as a scheduling limit. For a task to be scheduled, its max-RAM-per-task limit should be less than the total RAM on a TT minus the sum of max-RAM-per-task limits of tasks running on the TT. This also implies that a TT should report its free RAM (the total RAM on the node minus the sum of the max-RAM-per-task limits for each running task.

          Just as with the handling of VM, we may want to use a part of the RAM for scheduling TT tasks, and not all of it. If so, we can introduce a config value, mapred.tasktracker.ram.reserved, which indicates an offset (in MB?). The amount of RAM available to the TT tasks is then the total RAM on the machine, minus this offset. How do we get the total RAM on the machine? By the same plugin interface through which we obtain total VM.

          How do we specify a task's max-RAM-per-task limit? There is a system-wide default value, mapred.capacity-scheduler.default.ramlimit, expressed as a percentage. A task's default max-RAM-per-task limit is equal to the task's max-VM-per-task limit times this value. We may start by setting mapred.capacity-scheduler.default.ramlimit to 50 or 33%. In order to let individual jobs override this default, a job can set a config value, mapred.task.maxram, expressed in MB, which then becomes the task's max-RAM-per-task limit. Furthermore, as with VM settings, we want to prevent users from setting mapred.task.maxram to an arbitrarily high number and thus gaming the system. To do this, there should be a cluster-wide setting, mapred.task.limit.maxram, that limits the value of mapred.task.maxram. If mapred.task.maxram is set to a value higher than mapred.task.limit.maxram, the job should not run. Either this check can be done in the JT when a job is submitted, or a scheduler can fail the job if it detects this situation.

          The Capacity Scheduler, when it picks a task to run, will check if both the task's RAM limit and VM limit can be satisfied. If so, the task is given to the TT. If not, nothing is given to the TT (i.e., the cluster blocks till at least one TT has enough memory). We will not block forever because we limit what the task can ask for, and these limits should be set lower than the RAM and VM on each TT. In order to tax users on their job's requirements, we may charge them for what the value they set per task, but for now, there is no penalty associated with the value set for mapred.task.maxmemory by a user for a job.

          Open issues
          -------------------

          Based on the writeup above, I'm summarizing a few of the open issues (mostly minor):

          1. Should the memory-related config values be expressed in MB or GB or KB or just bytes? MB sounds good to me.
          2. If a job's specified VM or RAM task limit is higher than the max limit, that job shouldn't be allowed to run. Should the JT reject the job when it is submitted, or should the scheduler do it, by failing the job? The argument for the former is that these limits apply to all schedulers, but then again, they are scheduling-based limits, so they maybe they should be done in each of the schedulers. In the latter case, if a scheduler does not support scheduling based on memory limits, it can just ignore these settings and run the job. So the latter option seems better.
          3. Should the Capacity Scheduler use the entire RAM of a TT when making a scheduling decision, or an offset? Given that the RAM fractions are not very precise (they're based on fractions of the VM), an offset doesn't make much of a difference (you could tweak mapred.capacity-scheduler.default.ramlimit to achieve what the offset would), and adds an extra config value. At the same time, part of the RAM is blocked for non-Hadoop stuff, and an offset does make things symmetrical.
          Show
          Vivek Ratan added a comment - Since the issue of dealing with memory-intensive and badly behaved jobs has spanned more than one Jira, here's the latest summary on the overall proposal (following some offline discussions). The problem, as stated originally in HADOOP-3581 , is that certain badly-behaved jobs end up using too much memory on a node and can bring down that node. We need to prevent this. A related requirement, as described in HADOOP-3759 , is that the system respects different, and legitimate, memory requirements of different jobs. There are two independent parts to solving this problem: monitoring and scheduling. Let's look at monitoring first. Monitoring ---------------- We want to ensure that the sum total of virtual memory (VM) usage by all tasks does not go over a limit (call this the max-VM-per-node limit). That's really what brings down a machine. To detect badly behaved jobs, we want to associate a limit with each task (call this the max-VM-per-task limit) such that a task is considered badly behaved if its VM usage goes over this limit. Think of the max-VM-per-task limit as a kill limit. A TT monitors each task for its memory usage (this includes the memory used by the task's descendants). If a task's memory usage goes over its max-VM-per-task limit, that task is killed. This monitoring has been implemented in HADOOP-3581 . In addition, a TT monitors the total memory usage of all tasks spawned by the TT. If this value goes over the max-VM-per-node limit, the TT needs to kill one or more tasks. As a simple solution, the TT can kill one or more tasks that started most recently. This approach has been suggested in HADOOP-4523 . Tasks that are killed because they went over their memory limit should be treated as failed, since they violated their contract. Tasks that are killed because the sum total of memory usage was over a limit should be treated as killed, since it's not really their fault. How do we specify these limits? for max-VM-per-node : HADOOP-3581 provides a config option, mapred.tasktracker.tasks.maxmemory , which acts as the max-VM-per-node limit. As per discussions in this Jira, and in HADOOP-4523 , this needs to be enhanced. mapred.tasktracker.tasks.maxmemory should be replaced by mapred.tasktracker.virtualmemory.reserved , which indicates an offset (in MB?). max-VM-per-node is then the total VM on the machine, minus this offset. How do we get the total VM on the machine? This can be done by the plugin interface that Owen proposed earlier. for max-VM-per-task : HADOOP-3759 and HADOOP-4439 define a cluster-wide configuration, mapred.task.default.maxmemory , that describes the default maximum VM associated per task. Rename it to mapred.task.default.maxvm for consistency. This is the default max-VM-per-task limit associated with a task. To support jobs that need higher or lower limits, this value can be overridden by individual jobs. A job can set a config value, mapred.task.maxvm , which overrides mapred.task.default.maxvm for all tasks for that job. Furthermore, as described earlier in this Jira, we want to prevent users from setting mapred.task.maxvm to an arbitrarily high number and thus gaming the system. To do this, there should be a cluster-wide setting, mapred.task.limit.maxvm , that limits the value of mapred.task.maxvm . If mapred.task.maxvm is set to a value higher than mapred.task.limit.maxvm , the job should not run. Either this check can be done in the JT when a job is submitted, or a scheduler can fail the job if it detects this situation. Note that the monitoring process can be disabled if mapred.tasktracker.virtualmemory.reserved is not present, or has some default negative value. Scheduling ----------------- In order to prevent tasks using too much memory, a scheduler can ensure that it limits the number of tasks running on a node based on how much free memory is available and how much a task needs. The Capacity Scheduler will do this, though we cannot enforce all schedulers to support this feature. As per HADOOP-3759 , TTs report, in each heartbeat, how much free VM they have (which is equal to max-VM-per-node minus the sum of max-VM-per-task for each running task). The Capacity Scheduler needs to ensure that: there is enough VM for a new task to run. This it does by comparing the task's requirement (its max-VM-per-task limit) to the free VM available in the TT. there is enough RAM available for a task so that there is not a lot of page swapping and thrashing when tasks run. This is much harder to figure out and it's not even clear what it means to have 'enough RAM available' for a task. A simple proposal, to get us started, is to assume a fraction of the max-VM-per-task limit as the 'RAM limit' for a task. Call this the max-RAM-per-task limit, and think of it as a scheduling limit. For a task to be scheduled, its max-RAM-per-task limit should be less than the total RAM on a TT minus the sum of max-RAM-per-task limits of tasks running on the TT. This also implies that a TT should report its free RAM (the total RAM on the node minus the sum of the max-RAM-per-task limits for each running task. Just as with the handling of VM, we may want to use a part of the RAM for scheduling TT tasks, and not all of it. If so, we can introduce a config value, mapred.tasktracker.ram.reserved , which indicates an offset (in MB?). The amount of RAM available to the TT tasks is then the total RAM on the machine, minus this offset. How do we get the total RAM on the machine? By the same plugin interface through which we obtain total VM. How do we specify a task's max-RAM-per-task limit? There is a system-wide default value, mapred.capacity-scheduler.default.ramlimit , expressed as a percentage. A task's default max-RAM-per-task limit is equal to the task's max-VM-per-task limit times this value. We may start by setting mapred.capacity-scheduler.default.ramlimit to 50 or 33%. In order to let individual jobs override this default, a job can set a config value, mapred.task.maxram , expressed in MB, which then becomes the task's max-RAM-per-task limit. Furthermore, as with VM settings, we want to prevent users from setting mapred.task.maxram to an arbitrarily high number and thus gaming the system. To do this, there should be a cluster-wide setting, mapred.task.limit.maxram , that limits the value of mapred.task.maxram . If mapred.task.maxram is set to a value higher than mapred.task.limit.maxram , the job should not run. Either this check can be done in the JT when a job is submitted, or a scheduler can fail the job if it detects this situation. The Capacity Scheduler, when it picks a task to run, will check if both the task's RAM limit and VM limit can be satisfied. If so, the task is given to the TT. If not, nothing is given to the TT (i.e., the cluster blocks till at least one TT has enough memory). We will not block forever because we limit what the task can ask for, and these limits should be set lower than the RAM and VM on each TT. In order to tax users on their job's requirements, we may charge them for what the value they set per task, but for now, there is no penalty associated with the value set for mapred.task.maxmemory by a user for a job. Open issues ------------------- Based on the writeup above, I'm summarizing a few of the open issues (mostly minor): Should the memory-related config values be expressed in MB or GB or KB or just bytes? MB sounds good to me. If a job's specified VM or RAM task limit is higher than the max limit, that job shouldn't be allowed to run. Should the JT reject the job when it is submitted, or should the scheduler do it, by failing the job? The argument for the former is that these limits apply to all schedulers, but then again, they are scheduling-based limits, so they maybe they should be done in each of the schedulers. In the latter case, if a scheduler does not support scheduling based on memory limits, it can just ignore these settings and run the job. So the latter option seems better. Should the Capacity Scheduler use the entire RAM of a TT when making a scheduling decision, or an offset? Given that the RAM fractions are not very precise (they're based on fractions of the VM), an offset doesn't make much of a difference (you could tweak mapred.capacity-scheduler.default.ramlimit to achieve what the offset would), and adds an extra config value. At the same time, part of the RAM is blocked for non-Hadoop stuff, and an offset does make things symmetrical.
          Hide
          Devaraj Das added a comment -

          On the scheduling, wouldn't it be nice if all schedulers could use this feature? One option there is to implement the policy in all the schedulers. But given this issue is targetted for 19.1, the other option is to do the check within the JobInProgress just like we do the check for blacklisted TTs (where we don't give a task to a blacklisted TT). Specifically, couldn't the JobInProgress.shouldRunOnTaskTracker method do this check and not assign the TT a task taking into account the memory parameters (all the information related to memory parameters are available at this point via TaskTrackerStatus and the jobconf of the job)?
          One more line of argument could be that we are actually just doing greedy scheduling w.r.t the memory related parameters. So this base level greedy scheduling should be in a place that is in the code path of all schedulers, i.e., in the JobInProgress.shouldRunOnTaskTracker. If some scheduler tries to do something better than that, they always can do so since control is given to the scheduler code first (assignTasks).
          Thoughts?

          Show
          Devaraj Das added a comment - On the scheduling, wouldn't it be nice if all schedulers could use this feature? One option there is to implement the policy in all the schedulers. But given this issue is targetted for 19.1, the other option is to do the check within the JobInProgress just like we do the check for blacklisted TTs (where we don't give a task to a blacklisted TT). Specifically, couldn't the JobInProgress.shouldRunOnTaskTracker method do this check and not assign the TT a task taking into account the memory parameters (all the information related to memory parameters are available at this point via TaskTrackerStatus and the jobconf of the job)? One more line of argument could be that we are actually just doing greedy scheduling w.r.t the memory related parameters. So this base level greedy scheduling should be in a place that is in the code path of all schedulers, i.e., in the JobInProgress.shouldRunOnTaskTracker. If some scheduler tries to do something better than that, they always can do so since control is given to the scheduler code first (assignTasks). Thoughts?
          Hide
          Matei Zaharia added a comment -

          +1 on putting it in JobInProgress.shouldRunOnTaskTracker, this sounds like the right place for it if there aren't any technical obstacles.

          Show
          Matei Zaharia added a comment - +1 on putting it in JobInProgress.shouldRunOnTaskTracker, this sounds like the right place for it if there aren't any technical obstacles.
          Hide
          Vivek Ratan added a comment -

          While I agree that you want to make it easier for various schedulers to share common functionality, keep in mind that different schedulers may choose to behave differently if a TT does not have enough memory for a task. The Capacity Scheduler, for example, chooses to block, i.e., it prefers returning nothing to the TT, so that it doesn't starve a job with high-mem requirements. However, another Scheduler might choose to look at the next job, or find some other task to give the TT.

          For this reason, you don't want to put the memory checks in shouldRunOnTaskTracker(). Schedulers will behave differently if a TT is blacklisted, versus if the TT doesn't have enough free memory.

          Now, you could argue that we can add a new method to JobInProgress, something like isMemoryAvailable(), that decides if the TT has enough free memory. You could call it from obtainNewMapTask() or obtainNewReduceTask(), but again, you'd have to modify what these methods return to the schedulers. If obtainNewMapTask, for example, returns no task, the Scheduler needs to know why. It can behave differently if there was no task to run, or if the TT was blacklisted, or if the TT didn't have any free memory. This will make things messy.

          I think there is a lot of scheduling code in the JobInProgress object that needs to be moved into the schedulers. IMO, JobInProgress, and other objects, should expose their data structures, and maybe simpler methods that decide whether a TT is blacklisted or if a TT has enough free memory (methods that return the same response irrespective of schedulers). The various Schedulers should compose these methods as they seem fit. One may check for user quotas before it checks for memory fit (maybe the former is a faster check) while another may do something else. This does imply a fair bit of refactoring, and could be a longer term effort. This will also help share common code across Schedulers.

          Short term, I think it's better that each scheduler decide if it wants to support memory checks when scheduling, what it does if the TT does not have enough free mem, and implement that individually. To ease this, maybe you can put the logic of deciding whether a TT has enough free memory for a task in a separate method in JobInProgress, but call that from each Scheduler, not from another JobInProgress method.

          Show
          Vivek Ratan added a comment - While I agree that you want to make it easier for various schedulers to share common functionality, keep in mind that different schedulers may choose to behave differently if a TT does not have enough memory for a task. The Capacity Scheduler, for example, chooses to block, i.e., it prefers returning nothing to the TT, so that it doesn't starve a job with high-mem requirements. However, another Scheduler might choose to look at the next job, or find some other task to give the TT. For this reason, you don't want to put the memory checks in shouldRunOnTaskTracker() . Schedulers will behave differently if a TT is blacklisted, versus if the TT doesn't have enough free memory. Now, you could argue that we can add a new method to JobInProgress, something like isMemoryAvailable(), that decides if the TT has enough free memory. You could call it from obtainNewMapTask() or obtainNewReduceTask(), but again, you'd have to modify what these methods return to the schedulers. If obtainNewMapTask, for example, returns no task, the Scheduler needs to know why. It can behave differently if there was no task to run, or if the TT was blacklisted, or if the TT didn't have any free memory. This will make things messy. I think there is a lot of scheduling code in the JobInProgress object that needs to be moved into the schedulers. IMO, JobInProgress, and other objects, should expose their data structures, and maybe simpler methods that decide whether a TT is blacklisted or if a TT has enough free memory (methods that return the same response irrespective of schedulers). The various Schedulers should compose these methods as they seem fit. One may check for user quotas before it checks for memory fit (maybe the former is a faster check) while another may do something else. This does imply a fair bit of refactoring, and could be a longer term effort. This will also help share common code across Schedulers. Short term, I think it's better that each scheduler decide if it wants to support memory checks when scheduling, what it does if the TT does not have enough free mem, and implement that individually. To ease this, maybe you can put the logic of deciding whether a TT has enough free memory for a task in a separate method in JobInProgress, but call that from each Scheduler, not from another JobInProgress method.
          Hide
          Matei Zaharia added a comment -

          That makes sense Vivek. I'd be very glad to see more scheduling moved out of JobInProgress (and hopefully into some utility class that different people can use). Right now it's hard to implement some scheduling strategies, such as changing the speculative execution policy or the locality policy, without rewriting a lot of the code in JobInProgress. Also, as more scheduling functionality gets added, JobInProgress just grows larger. It would be nice if it was more of a data object than logic + data.

          Show
          Matei Zaharia added a comment - That makes sense Vivek. I'd be very glad to see more scheduling moved out of JobInProgress (and hopefully into some utility class that different people can use). Right now it's hard to implement some scheduling strategies, such as changing the speculative execution policy or the locality policy, without rewriting a lot of the code in JobInProgress. Also, as more scheduling functionality gets added, JobInProgress just grows larger. It would be nice if it was more of a data object than logic + data.
          Hide
          Hemanth Yamijala added a comment -

          Should the memory-related config values be expressed in MB or GB or KB or just bytes? MB sounds good to me.

          The other parameter we have in hadoop related to memory is mapred.child.ulimit which is specified in KB. I think expressing these values in KB would keep things consistent.

          If a job's specified VM or RAM task limit is higher than the max limit, that job shouldn't be allowed to run. Should the JT reject the job when it is submitted, or should the scheduler do it, by failing the job?

          I think scheduler failing the job is more consistent if the scheduling decisions are being made in the scheduler.

          Should the Capacity Scheduler use the entire RAM of a TT when making a scheduling decision, or an offset?

          I am not really sure either way. Given earlier discussions we've had that virtual memory is what really matters, I am guessing we don't need it.

          Regarding the config variable names, a few concerns/suggestions:

          • mapred.tasktracker.virtualmemory.reserved: This seems like specifying the amount of memory reserved for Hadoop, whereas it means the opposite. Can we call it mapred.tasktracker.vmem.excluded ?
          • We are using 'virtualmemory', and 'vm' to represent virtual memory. Should we consistently name it as 'vmem' everywhere ?
          • Similar to excluded, rename variables to mapred.task.maxvmem.default and mapred.task.maxvmem.limit ?

          Does this make sense ?

          Show
          Hemanth Yamijala added a comment - Should the memory-related config values be expressed in MB or GB or KB or just bytes? MB sounds good to me. The other parameter we have in hadoop related to memory is mapred.child.ulimit which is specified in KB. I think expressing these values in KB would keep things consistent. If a job's specified VM or RAM task limit is higher than the max limit, that job shouldn't be allowed to run. Should the JT reject the job when it is submitted, or should the scheduler do it, by failing the job? I think scheduler failing the job is more consistent if the scheduling decisions are being made in the scheduler. Should the Capacity Scheduler use the entire RAM of a TT when making a scheduling decision, or an offset? I am not really sure either way. Given earlier discussions we've had that virtual memory is what really matters, I am guessing we don't need it. Regarding the config variable names, a few concerns/suggestions: mapred.tasktracker.virtualmemory.reserved: This seems like specifying the amount of memory reserved for Hadoop, whereas it means the opposite. Can we call it mapred.tasktracker.vmem.excluded ? We are using 'virtualmemory', and 'vm' to represent virtual memory. Should we consistently name it as 'vmem' everywhere ? Similar to excluded, rename variables to mapred.task.maxvmem.default and mapred.task.maxvmem.limit ? Does this make sense ?
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Should the Capacity Scheduler use the entire RAM of a TT when making a scheduling decision, or an offset?
          I am not really sure either way. Given earlier discussions we've had that virtual memory is what really matters, I am guessing we don't need it.

          Having the offset value also for RAM will make configuration as well as scheduling logic symmetric w.r.t both vmem and ram, IMO.

          Similar to excluded, rename variables to mapred.task.maxvmem.default and mapred.task.maxvmem.limit ?

          Or as Owen/Sameer proposed, can we call them mapred.task.memory.hardlimit.default and mapred.task.memory.hardlimit? And substituting softlimit for RAM?

          Show
          Vinod Kumar Vavilapalli added a comment - Should the Capacity Scheduler use the entire RAM of a TT when making a scheduling decision, or an offset? I am not really sure either way. Given earlier discussions we've had that virtual memory is what really matters, I am guessing we don't need it. Having the offset value also for RAM will make configuration as well as scheduling logic symmetric w.r.t both vmem and ram, IMO. Similar to excluded, rename variables to mapred.task.maxvmem.default and mapred.task.maxvmem.limit ? Or as Owen/Sameer proposed, can we call them mapred.task.memory.hardlimit.default and mapred.task.memory.hardlimit? And substituting softlimit for RAM?
          Hide
          Allen Wittenauer added a comment -

          > I think expressing these values in KB would keep things consistent.

          I think the system should be smart enough that if I specify "4g" or "4m" or "4k" it knows what I'm talking about. Are we really going to make ops people write massively long integers in 2008? It annoys me when apps aren't "smart" about values. [He says, having just had to set some memory related values in another application that ended up looking like 1073741824 . Nice and readable.... ]

          Show
          Allen Wittenauer added a comment - > I think expressing these values in KB would keep things consistent. I think the system should be smart enough that if I specify "4g" or "4m" or "4k" it knows what I'm talking about. Are we really going to make ops people write massively long integers in 2008? It annoys me when apps aren't "smart" about values. [He says, having just had to set some memory related values in another application that ended up looking like 1073741824 . Nice and readable.... ]
          Hide
          steve_l added a comment -

          >[He says, having just had to set some memory related values in another application that ended up looking like 1073741824 . Nice and readable.... ]

          There's a lot to be said for configuration languages with basic math operators, though then you end up worrying about order of precedence so end up bracketing everything just to be sure

          dfs.namenode.decommission.interval (5 * 60);

          [He says, editing a .spec file where the ${} ant properties get expanded during build time but the RPM may be rebuilt on a separate machine with the %{} properties expanded either at rpmbuild time or on the target machine, and which trial and error seems the only way to be sure what happens]

          Show
          steve_l added a comment - > [He says, having just had to set some memory related values in another application that ended up looking like 1073741824 . Nice and readable.... ] There's a lot to be said for configuration languages with basic math operators, though then you end up worrying about order of precedence so end up bracketing everything just to be sure dfs.namenode.decommission.interval (5 * 60); [He says, editing a .spec file where the ${} ant properties get expanded during build time but the RPM may be rebuilt on a separate machine with the %{} properties expanded either at rpmbuild time or on the target machine, and which trial and error seems the only way to be sure what happens]
          Hide
          Doug Cutting added a comment -

          Allen> I think the system should be smart enough that if I specify "4g" or "4m" or "4k" it knows what I'm talking about.

          Steve> There's a lot to be said for configuration languages with basic math operators [ ... ]

          These sound like great enhancements to the Configuration API. The getInt() and getLong() methods could implement these. But that should probably be in a separate issue. And, if it's to be implemented, the parameter in this issue should then be raw bytes, no?

          Show
          Doug Cutting added a comment - Allen> I think the system should be smart enough that if I specify "4g" or "4m" or "4k" it knows what I'm talking about. Steve> There's a lot to be said for configuration languages with basic math operators [ ... ] These sound like great enhancements to the Configuration API. The getInt() and getLong() methods could implement these. But that should probably be in a separate issue. And, if it's to be implemented, the parameter in this issue should then be raw bytes, no?
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Some details about configuration:

          • pmem denotes the limits of job and TT w.r.t physical memory
          • vmem denotes the limits of job and TT w.r.t virtual memory

          Job Configuration:

          Already addressed cases:

          • If pmem, and vmem are both specified, or both not specified(disabled/job doesn't care), we use them as they are.
          • If pmem is not specified but vmem is specified, the above proposal is to adjust pmem to be a percentage(say P) of vmem.

          Cases not addressed:

          • The proposal doesn't address the edge case when pmem is specified but vmem is not specified. I propose that, in the similar vein as above, we adjust vmem to be (100/P) of pmem.

          TT Configuration:

          Already addressed cases:

          • If offsets for vmem and pmem are both specified,
            • TT takes care of overflowing tasks itself by doing virtual memory management,
            • Scheduler uses both vmem and pmem for scheduling and using the latter controls thrashing.
          • If offsets for both vmem and pmem are not specified,
            • TT doesn't care about overflowing tasks and disables virtual memory management,
            • Scheduler neglects scheduling based on vmem and pmem and cannot attempt avoiding thrashing/task overflow.

          Cases not addressed:

          • If offset for vmem is specified but not for pmem. We have two alternative approaches here.
            • We already calculate the total pmem reporting. So, take the pmem offset to be zero and use the total pmem available and (vmem - vmemoffset) for scheduling. vmemoffset by default will be zero. So, we just need another configuration and a field in TaskTrackerStatus to specify whether vmem is disabled/enabled. Note that today we overload total vmem and by extension vmemoffset to also specify enabling/disabling of task memory management.
            • Only use vmem for scheduling and don't care about pmem and thus make no attempt to avoid any possible thrashing.
          • If offset for pmem is specified but not for vmem. We have two alternative approaches here also.
            • We already calculate the total vmem reporting. So, take the vmem offset to be zero and use the total vmem available and (pmem - pmemoffset) for scheduling.
            • Only use pmem for scheduling to avoid any possible thrashing, but don't care about vmem and thus make no attempt to avoid any possible overflowing of tasks.

          I prefer the first solutions to the later in both the above cases, as we are making our best efforts to prevent thrashing and task overflowing in those solutions. Thoughts?

          Show
          Vinod Kumar Vavilapalli added a comment - Some details about configuration: pmem denotes the limits of job and TT w.r.t physical memory vmem denotes the limits of job and TT w.r.t virtual memory Job Configuration: Already addressed cases: If pmem, and vmem are both specified, or both not specified(disabled/job doesn't care), we use them as they are. If pmem is not specified but vmem is specified, the above proposal is to adjust pmem to be a percentage(say P) of vmem. Cases not addressed: The proposal doesn't address the edge case when pmem is specified but vmem is not specified. I propose that, in the similar vein as above, we adjust vmem to be (100/P) of pmem. TT Configuration: Already addressed cases: If offsets for vmem and pmem are both specified, TT takes care of overflowing tasks itself by doing virtual memory management, Scheduler uses both vmem and pmem for scheduling and using the latter controls thrashing. If offsets for both vmem and pmem are not specified, TT doesn't care about overflowing tasks and disables virtual memory management, Scheduler neglects scheduling based on vmem and pmem and cannot attempt avoiding thrashing/task overflow. Cases not addressed: If offset for vmem is specified but not for pmem. We have two alternative approaches here. We already calculate the total pmem reporting. So, take the pmem offset to be zero and use the total pmem available and (vmem - vmemoffset) for scheduling. vmemoffset by default will be zero. So, we just need another configuration and a field in TaskTrackerStatus to specify whether vmem is disabled/enabled. Note that today we overload total vmem and by extension vmemoffset to also specify enabling/disabling of task memory management. Only use vmem for scheduling and don't care about pmem and thus make no attempt to avoid any possible thrashing. If offset for pmem is specified but not for vmem. We have two alternative approaches here also. We already calculate the total vmem reporting. So, take the vmem offset to be zero and use the total vmem available and (pmem - pmemoffset) for scheduling. Only use pmem for scheduling to avoid any possible thrashing, but don't care about vmem and thus make no attempt to avoid any possible overflowing of tasks. I prefer the first solutions to the later in both the above cases, as we are making our best efforts to prevent thrashing and task overflowing in those solutions. Thoughts?
          Hide
          Vivek Ratan added a comment -

          As Vinod has brought up, there are some edge cases and details missing in the summary that we need to cover.

          We want monitoring to work independent of scheduler support, i.e., even if the scheduler you're using does not support memory-based scheduling, you may still want to make sure the TTs monitor memory usage on their machines and kill tasks if too much memory is used. Based on what we've described in the summary, the following three configuration settings are required for the TT to do monitoring: mapred.tasktracker.virtualmemory.reserved (the offset of total VM on the machine), mapred.task.default.maxvm (the default for maximum VM per task), and mapred.task.limit.maxvm (the upper limit on the max VM per task). It is proposed that:

          • if one or more of these three values are missing in the configuration, the TT disables monitoring and logs an appropriate message.
          • At startup, the TT should also make sure that mapred.task.default.maxvm is not greater than mapred.task.limit.maxvm. If it is, the TT logs a message and disables monitoring.
          • if all three are present, the TT has enough information to compute the max-VM-per-task limit for each task it runs and can successfully monitor memory usage.
          • Without scheduler support, the TT can get a task whose max-VM-per-task limit is higher than mapred.task.limit.maxvm (i.e., the user-set value for a job's mapred.task.maxvm can be higher than mapred.task.limit.maxvm). In such a case, the TT can choose to fail the task, or it may still run the task while logging the problem. IMO, the former seems too harsh and not something that the TT should possibly decide just based on its settings for monitoring. In the latter case, the TT can still continue monitoring, but may end up killing the wrong task if the sum of VMs used is over the max-VM-per-node limit. I propose we do the latter.

          The TT also needs to report memory information to the schedulers. As per HADOOP-3759, TTs currently report, in each heartbeat, how much free VM they have (which is equal to max-VM-per-node minus the sum of max-VM-per-task for each running task). This makes sense if monitoring is on, and the three necessary VM config values are defined. If they're not, and the TT cannot determine its free VM, what should it report?

          • It can report -1, or some such value, indicating that it cannot compute free VM.
          • If we let schedulers decide how they want to behave in the absence of monitoring, or rather in the absence of the necessary VM config values being defined, a TT should always report how much total VM (as well as RAM) it has, as well as its value for mapred.tasktracker.virtualmemory.reserved.

          I propose the latter. TTs always report how much VM&RAM they have on their system, and what offset settings they have. They're the only ones who have this information, and this approach gives a lot of flexibility to the schedulers in terms of how to use that information.

          What about schedulers? The Capacity Scheduler should do the following:

          • If any of the three mandatory VM settings are not set, it should not schedule based on VM or RAM. The value of mapred.tasktracker.virtualmemory.reserved comes from the TT while the other two can be read by the scheduler from its own config file.
          • If the mandatory VM values are set, as well as the mandatory RAM values (mapred.capacity-scheduler.default.ramlimit, mapred.task.limit.maxram), the scheduler uses both VM and RAM settings to schedule, as defined in the earlier summary.
          • If the mandatory VM values are set, but one or more of the mandatory RAM values are not, the scheduler only uses VM values for scheduling.

          It's possible that other schedulers may choose a different algorithm. What's important is that they have all the available information, which they should as per this proposal.

          Show
          Vivek Ratan added a comment - As Vinod has brought up, there are some edge cases and details missing in the summary that we need to cover. We want monitoring to work independent of scheduler support, i.e., even if the scheduler you're using does not support memory-based scheduling, you may still want to make sure the TTs monitor memory usage on their machines and kill tasks if too much memory is used. Based on what we've described in the summary, the following three configuration settings are required for the TT to do monitoring: mapred.tasktracker.virtualmemory.reserved (the offset of total VM on the machine), mapred.task.default.maxvm (the default for maximum VM per task), and mapred.task.limit.maxvm (the upper limit on the max VM per task). It is proposed that: if one or more of these three values are missing in the configuration, the TT disables monitoring and logs an appropriate message. At startup, the TT should also make sure that mapred.task.default.maxvm is not greater than mapred.task.limit.maxvm . If it is, the TT logs a message and disables monitoring. if all three are present, the TT has enough information to compute the max-VM-per-task limit for each task it runs and can successfully monitor memory usage. Without scheduler support, the TT can get a task whose max-VM-per-task limit is higher than mapred.task.limit.maxvm (i.e., the user-set value for a job's mapred.task.maxvm can be higher than mapred.task.limit.maxvm ). In such a case, the TT can choose to fail the task, or it may still run the task while logging the problem. IMO, the former seems too harsh and not something that the TT should possibly decide just based on its settings for monitoring. In the latter case, the TT can still continue monitoring, but may end up killing the wrong task if the sum of VMs used is over the max-VM-per-node limit. I propose we do the latter. The TT also needs to report memory information to the schedulers. As per HADOOP-3759 , TTs currently report, in each heartbeat, how much free VM they have (which is equal to max-VM-per-node minus the sum of max-VM-per-task for each running task). This makes sense if monitoring is on, and the three necessary VM config values are defined. If they're not, and the TT cannot determine its free VM, what should it report? It can report -1, or some such value, indicating that it cannot compute free VM. If we let schedulers decide how they want to behave in the absence of monitoring, or rather in the absence of the necessary VM config values being defined, a TT should always report how much total VM (as well as RAM) it has, as well as its value for mapred.tasktracker.virtualmemory.reserved . I propose the latter. TTs always report how much VM&RAM they have on their system, and what offset settings they have. They're the only ones who have this information, and this approach gives a lot of flexibility to the schedulers in terms of how to use that information. What about schedulers? The Capacity Scheduler should do the following: If any of the three mandatory VM settings are not set, it should not schedule based on VM or RAM. The value of mapred.tasktracker.virtualmemory.reserved comes from the TT while the other two can be read by the scheduler from its own config file. If the mandatory VM values are set, as well as the mandatory RAM values ( mapred.capacity-scheduler.default.ramlimit , mapred.task.limit.maxram ), the scheduler uses both VM and RAM settings to schedule, as defined in the earlier summary . If the mandatory VM values are set, but one or more of the mandatory RAM values are not, the scheduler only uses VM values for scheduling. It's possible that other schedulers may choose a different algorithm. What's important is that they have all the available information, which they should as per this proposal.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Initial patch for overall code review. This needs only a bit of cleanup, but unit tests need to be fixed, they don't even compile now. Testing on a real cluster pending.

          Show
          Vinod Kumar Vavilapalli added a comment - Initial patch for overall code review. This needs only a bit of cleanup, but unit tests need to be fixed, they don't even compile now. Testing on a real cluster pending.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Attaching a new patch.

          • Fixed the testcase TestCapacityScheduler and added tests to verify memory based scheduling.
          • Free memory computation is moved to the scheduler from the TT, so removed TestHighRAMJobs and included TestTTMemoryReporting that verifies that memory values are reported correctly by the TT.
          • While writing test-cases found and fixed a bug in JobQueuesManager.jobCompleted (+146).
              if (qi.runningJobs.remove(oldInfo) != null) {
                qi.waitingJobs.remove(oldInfo);
              }
              

          Should be

              if (qi.runningJobs.remove(oldInfo) == null) {
                qi.waitingJobs.remove(oldInfo);
              }
              

          This definitely needs to be fixed in previous versions too.

          • Added killJob(JobID) and getJob(JobID) methods to TaskTrackerManager. Scheduler needs access to these methods. This is an interface change and so I had to modify TestJobQueueTaskScheduler and TestFairScheduler too. Should we just change TaskTrackerManager to be an abstract class, may be in another JIRA?
          • Minor documentation fixes and code-refactoring over the previous patch.
          Show
          Vinod Kumar Vavilapalli added a comment - Attaching a new patch. Fixed the testcase TestCapacityScheduler and added tests to verify memory based scheduling. Free memory computation is moved to the scheduler from the TT, so removed TestHighRAMJobs and included TestTTMemoryReporting that verifies that memory values are reported correctly by the TT. While writing test-cases found and fixed a bug in JobQueuesManager.jobCompleted (+146). if (qi.runningJobs.remove(oldInfo) != null ) { qi.waitingJobs.remove(oldInfo); } Should be if (qi.runningJobs.remove(oldInfo) == null ) { qi.waitingJobs.remove(oldInfo); } This definitely needs to be fixed in previous versions too. Added killJob(JobID) and getJob(JobID) methods to TaskTrackerManager. Scheduler needs access to these methods. This is an interface change and so I had to modify TestJobQueueTaskScheduler and TestFairScheduler too. Should we just change TaskTrackerManager to be an abstract class, may be in another JIRA? Minor documentation fixes and code-refactoring over the previous patch.
          Hide
          Hemanth Yamijala added a comment -

          While writing test-cases found and fixed a bug in JobQueuesManager.jobCompleted (+146).

          Vinod, good catch. Thanks for finding this out. I have filed HADOOP-4731 to address this. I think this should be committed first.

          Show
          Hemanth Yamijala added a comment - While writing test-cases found and fixed a bug in JobQueuesManager.jobCompleted (+146). Vinod, good catch. Thanks for finding this out. I have filed HADOOP-4731 to address this. I think this should be committed first.
          Hide
          Hemanth Yamijala added a comment -

          I've started looking at this patch. Here are a few initial comments. I am still to look at scheduling and test cases:

          Configuration:

          • We can now introduce the config variables back into hadoop-defaults.xml
          • I think the variables should be in bytes. As Doug mentioned in comments above, we should move to supporting formats mentioning units like 'KB' in a separate JIRA. When we do that, it makes more sense to say that if no unit is specified, it is the lowest possible value which will be bytes. Hence treating it as bytes here will support backwards compatibility easily.
          • As I've mentioned above, I still recommend changing the term 'reserved' to 'excluded'. Also, I would recommend consistent names for the variables.. for e.g. we can use pmem and vmem everywhere to indicate physical and virtual memory.
          • In the javadoc for the JobConf variables we should have a note asking readers to refer to the documentation of the scheduler being used to see how it does memory based scheduling.

          Monitoring:

          • Can we change the TODO in TaskMemoryManagerThread to remove the "I'm not comfortable..." part. We should still explain the alternative that you've mentioned in the comment, though.
          • We should also do a sanity check that the reserved limit is < the total memory, and turn off monitoring if it's not.
          • MemoryCalculatorPlugin requires ASF, Likewise LinuxMemoryCalculatorPlugin
          Show
          Hemanth Yamijala added a comment - I've started looking at this patch. Here are a few initial comments. I am still to look at scheduling and test cases: Configuration: We can now introduce the config variables back into hadoop-defaults.xml I think the variables should be in bytes. As Doug mentioned in comments above, we should move to supporting formats mentioning units like 'KB' in a separate JIRA. When we do that, it makes more sense to say that if no unit is specified, it is the lowest possible value which will be bytes. Hence treating it as bytes here will support backwards compatibility easily. As I've mentioned above, I still recommend changing the term 'reserved' to 'excluded'. Also, I would recommend consistent names for the variables.. for e.g. we can use pmem and vmem everywhere to indicate physical and virtual memory. In the javadoc for the JobConf variables we should have a note asking readers to refer to the documentation of the scheduler being used to see how it does memory based scheduling. Monitoring: Can we change the TODO in TaskMemoryManagerThread to remove the "I'm not comfortable..." part. We should still explain the alternative that you've mentioned in the comment, though. We should also do a sanity check that the reserved limit is < the total memory, and turn off monitoring if it's not. MemoryCalculatorPlugin requires ASF, Likewise LinuxMemoryCalculatorPlugin
          Hide
          Hemanth Yamijala added a comment -

          Comments on the scheduling portion:

          • The config value for % of VMEM for RAM should be in capacity scheduler's conf - no ? I am OK leaving it in JobConf as well. But in comments above, it seemed like we were going to define this in the CapacityScheduler config.
          • killJobsWithInvalidRequirements is looking at every single job in the waiting list of jobs in a queue. This would be a very costly operation. I think instead, just before a scheduler considers a job for scheduling, it should check for this, maybe by looking at the count of running maps / reduces to decide if it's looking at it the first time.
          • In getTaskFromJob, we are using a switch on Type being Map/Reduce. We should instead use the pattern of defining an abstract method in TaskSchedulingMgr, and implementing it appropriately for Map/Reduce in the specific type of TaskSchedulingMgr. So, pendingTasks would be an API in TaskSchedulingMgr.
          • If scheduling based on memory is disabled, we are logging at INFO level a statement regarding this. This might get to be too verbose since most jobs aren't going to specify any memory requirements. Switch to a debug level.
          • Should we cache the value for the jobId -> value for running jobs. It will be low on memory and will be more performant maybe if we use a HashMap. Maybe we should consider this after benchmarking the capacity scheduler with and without this patch.
          • We should definitely cache the value of the JobInProgress across computation of reservedPmem and reservedVmem. Actually the lookup is done thrice - twice in getting reservedPmem.
          • Rename usedUpVmem to getVmemReservedForTasks(taskTracker), likewise for usedUpPmem.
          • isSchedulingBasedOnMemoryEnabled - for consistency, rename to isSchedulingBasedOnVmemEnabled
          • It seems a little complicated to have the decision to lookup a job in the queue, or another queue to be in TaskLookupResult. I think a simpler model would be for the TaskLookupResult to only state the reason why it did not find a task, and then let callers decide what to do next. This will make it simpler to change scheduling decisions if required later.
          • For better readability, I am thinking if all the memory related scheduling should be moved to a separate class that will be used by the CapacityTaskScheduler, maybe a class like MemoryMatcher. This can have a package private method boolean matchesMemoryRequirements(JobInProgress jip, TaskTrackerStatus ttStatus). Essentially move over everything starting from TTHasEnoughMemoryForJob to this class. In future, if these algorithms seem reusable this class can move out of the capacity scheduler into mapred itself.
          Show
          Hemanth Yamijala added a comment - Comments on the scheduling portion: The config value for % of VMEM for RAM should be in capacity scheduler's conf - no ? I am OK leaving it in JobConf as well. But in comments above, it seemed like we were going to define this in the CapacityScheduler config. killJobsWithInvalidRequirements is looking at every single job in the waiting list of jobs in a queue. This would be a very costly operation. I think instead, just before a scheduler considers a job for scheduling, it should check for this, maybe by looking at the count of running maps / reduces to decide if it's looking at it the first time. In getTaskFromJob, we are using a switch on Type being Map/Reduce. We should instead use the pattern of defining an abstract method in TaskSchedulingMgr, and implementing it appropriately for Map/Reduce in the specific type of TaskSchedulingMgr. So, pendingTasks would be an API in TaskSchedulingMgr. If scheduling based on memory is disabled, we are logging at INFO level a statement regarding this. This might get to be too verbose since most jobs aren't going to specify any memory requirements. Switch to a debug level. Should we cache the value for the jobId -> value for running jobs. It will be low on memory and will be more performant maybe if we use a HashMap. Maybe we should consider this after benchmarking the capacity scheduler with and without this patch. We should definitely cache the value of the JobInProgress across computation of reservedPmem and reservedVmem. Actually the lookup is done thrice - twice in getting reservedPmem. Rename usedUpVmem to getVmemReservedForTasks(taskTracker), likewise for usedUpPmem. isSchedulingBasedOnMemoryEnabled - for consistency, rename to isSchedulingBasedOnVmemEnabled It seems a little complicated to have the decision to lookup a job in the queue, or another queue to be in TaskLookupResult. I think a simpler model would be for the TaskLookupResult to only state the reason why it did not find a task, and then let callers decide what to do next. This will make it simpler to change scheduling decisions if required later. For better readability, I am thinking if all the memory related scheduling should be moved to a separate class that will be used by the CapacityTaskScheduler, maybe a class like MemoryMatcher. This can have a package private method boolean matchesMemoryRequirements(JobInProgress jip, TaskTrackerStatus ttStatus). Essentially move over everything starting from TTHasEnoughMemoryForJob to this class. In future, if these algorithms seem reusable this class can move out of the capacity scheduler into mapred itself.
          Hide
          Hemanth Yamijala added a comment -

          Test cases seem fine. Only one enhancement request is that in testClusterBlockingForLackOfMemory(), you can submit a normal job after submitting the high ram jobs, and then verify that no task assignment is done. This way we can be sure that even if there are jobs that fit in memory that can run, we don't schedule anything.

          Show
          Hemanth Yamijala added a comment - Test cases seem fine. Only one enhancement request is that in testClusterBlockingForLackOfMemory() , you can submit a normal job after submitting the high ram jobs, and then verify that no task assignment is done. This way we can be sure that even if there are jobs that fit in memory that can run, we don't schedule anything.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Attaching a new patch.

          • killJobsWithInvalidRequirements is now done in scheduler.jobAdded() itself. It is O(1) now. Invalid jobs get rejected rightaway, jobAdded throws an IOException and this exception gets propagated to the client with a message.
          • Documented the configuration properties in hadoop-default.xml and capacity-scheduler-conf.xml.
          • Leaving the config names to be "reserved" instead of "excluded" as they truly specified what are reserved by TT for its own and system's usage.
          • Incorporated the rest of the comments.

          ant test-patch gave all +1 except a -1 for findBugs. This is regarding unsynchronized access to TaskTrackerStatus object. This patch does the right thing, but there are other earlier unsynchronized accesses. The changes in this patch triggered an old findBugs warning.

          I've run core and contrib tests. They both built successfully.

          Things to be done later in other jiras

          • Caching the value for the jobId -> value for running jobs.
          • Information on UI stating the reason why a job is killed when it specifies invalid requirement.
          • TT reports total and reserved memory values. These values don't change in general, so if possible, they can be reported only once.
          • Resource information should be visible on TTs UI pages.
          • Configuration should support specifying values in KB, MB, GB etc.
          • In case of no scheduler support, TaskTrackerMemoryManager thread should fail jobs that specify their memory requirements to be more than the cluster-wide upper limits. It currently just logs that such that happened and silently ignores it (See TODO in TaskTrackerMemoryManager class).
          • Need forrest documentation of how capacity scheduler deals with memory based scheduling.
          Show
          Vinod Kumar Vavilapalli added a comment - Attaching a new patch. killJobsWithInvalidRequirements is now done in scheduler.jobAdded() itself. It is O(1) now. Invalid jobs get rejected rightaway, jobAdded throws an IOException and this exception gets propagated to the client with a message. Documented the configuration properties in hadoop-default.xml and capacity-scheduler-conf.xml. Leaving the config names to be "reserved" instead of "excluded" as they truly specified what are reserved by TT for its own and system's usage. Incorporated the rest of the comments. ant test-patch gave all +1 except a -1 for findBugs. This is regarding unsynchronized access to TaskTrackerStatus object. This patch does the right thing, but there are other earlier unsynchronized accesses. The changes in this patch triggered an old findBugs warning. I've run core and contrib tests. They both built successfully. Things to be done later in other jiras Caching the value for the jobId -> value for running jobs. Information on UI stating the reason why a job is killed when it specifies invalid requirement. TT reports total and reserved memory values. These values don't change in general, so if possible, they can be reported only once. Resource information should be visible on TTs UI pages. Configuration should support specifying values in KB, MB, GB etc. In case of no scheduler support, TaskTrackerMemoryManager thread should fail jobs that specify their memory requirements to be more than the cluster-wide upper limits. It currently just logs that such that happened and silently ignores it (See TODO in TaskTrackerMemoryManager class). Need forrest documentation of how capacity scheduler deals with memory based scheduling.
          Hide
          Hemanth Yamijala added a comment -

          Looking good. Few comments:

          • ProcfsBasedProcessTree.getCumulativeVmem is still dividing the values by 1024, and hence treating the values as kB.
          • Job recovery which is calling jobAdded will fail if any job has an invalid memory specification due to the new IOException being thrown. Though this is an unlikely case, in general, I think job recovery shouldn't fail due to one faulty job.
          • InterTrackerProtocol's version should change
          • Some log statements at info level would be too verbose. Recommend moving them to debug level. Including:
            • initializing jobid statement in JobInProgress
            • info statement in matchesMemoryRequirements
          • The memory related APIs in JobInProgress need not be public, no ?
          • For the MemoryPlugin, I think we should use the ReflectionUtils mechanism. By default, the config value for this could be null, and this would mean that we would use the operating system specific class. If it is not null, then am object of the appropriate class would be used. Then the class that you are using for testing purposes can move to the test code and need not be in the production code.
          • Properties in the capacity scheduler conf should be defined in capacity-scheduler.xml.template and should be read from the capacity scheduler conf object and not from JT's configuration object.
          • I would suggest a few changes to the names in TaskLookupStatus, such as TASK_FOUND, NO_TASK_IN_JOB, NO_TASK_IN_QUEUE, NO_TASK_MATCHING_MEMORY_REQUIREMENTS
          Show
          Hemanth Yamijala added a comment - Looking good. Few comments: ProcfsBasedProcessTree.getCumulativeVmem is still dividing the values by 1024, and hence treating the values as kB. Job recovery which is calling jobAdded will fail if any job has an invalid memory specification due to the new IOException being thrown. Though this is an unlikely case, in general, I think job recovery shouldn't fail due to one faulty job. InterTrackerProtocol's version should change Some log statements at info level would be too verbose. Recommend moving them to debug level. Including: initializing jobid statement in JobInProgress info statement in matchesMemoryRequirements The memory related APIs in JobInProgress need not be public, no ? For the MemoryPlugin, I think we should use the ReflectionUtils mechanism. By default, the config value for this could be null, and this would mean that we would use the operating system specific class. If it is not null, then am object of the appropriate class would be used. Then the class that you are using for testing purposes can move to the test code and need not be in the production code. Properties in the capacity scheduler conf should be defined in capacity-scheduler.xml.template and should be read from the capacity scheduler conf object and not from JT's configuration object. I would suggest a few changes to the names in TaskLookupStatus, such as TASK_FOUND, NO_TASK_IN_JOB, NO_TASK_IN_QUEUE, NO_TASK_MATCHING_MEMORY_REQUIREMENTS
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Attaching new patch.

          I had done changes to capacity-scheduler.xml.template earlier itself, somehow that got missed in the patch, adding it now.

          The testcase TestCapacityScheduler.testHighMemoryJobWithInvalidRequirements was failing intermittently because of timing issues with JobInitializationPoller. Fixed that now by using the ControlledJobIntializationPoller.

          Incorporated the rest of the review comments too.

          Show
          Vinod Kumar Vavilapalli added a comment - Attaching new patch. I had done changes to capacity-scheduler.xml.template earlier itself, somehow that got missed in the patch, adding it now. The testcase TestCapacityScheduler.testHighMemoryJobWithInvalidRequirements was failing intermittently because of timing issues with JobInitializationPoller. Fixed that now by using the ControlledJobIntializationPoller. Incorporated the rest of the review comments too.
          Hide
          Hemanth Yamijala added a comment -

          New patch which updates documentation a bit. Also moved DummyMemoryCalculatorPlugin to test code. Otherwise patch is looking good. +1

          I am running some sanity tests to make sure things are fine.

          Show
          Hemanth Yamijala added a comment - New patch which updates documentation a bit. Also moved DummyMemoryCalculatorPlugin to test code. Otherwise patch is looking good. +1 I am running some sanity tests to make sure things are fine.
          Hide
          Hemanth Yamijala added a comment -

          There was a mistake in the patch related to the configuration in hadoop-defaults. Cancelling. Will upload a new patch after fixing that.

          Show
          Hemanth Yamijala added a comment - There was a mistake in the patch related to the configuration in hadoop-defaults. Cancelling. Will upload a new patch after fixing that.
          Hide
          Hemanth Yamijala added a comment -

          This patch fixes the following:

          • Fixes the problem mentioned with hadoop-default.xml
          • Fixes a findbugs warning that was coming with test-patch.

          test-patch output with this patch is:
          [exec] +1 overall.

          [exec] +1 @author. The patch does not contain any @author tags.

          [exec] +1 tests included. The patch appears to include 24 new or modified tests.

          [exec] +1 javadoc. The javadoc tool did not generate any warning messages.

          [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.

          [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity.

          Running core and contrib tests. I will commit this patch if they pass.

          Show
          Hemanth Yamijala added a comment - This patch fixes the following: Fixes the problem mentioned with hadoop-default.xml Fixes a findbugs warning that was coming with test-patch. test-patch output with this patch is: [exec] +1 overall. [exec] +1 @author. The patch does not contain any @author tags. [exec] +1 tests included. The patch appears to include 24 new or modified tests. [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] +1 Eclipse classpath. The patch retains Eclipse classpath integrity. Running core and contrib tests. I will commit this patch if they pass.
          Hide
          Hemanth Yamijala added a comment -

          All core and contrib tests passed on my machine.

          So, I committed this. Thanks, Vinod !

          Show
          Hemanth Yamijala added a comment - All core and contrib tests passed on my machine. So, I committed this. Thanks, Vinod !
          Hide
          Hudson added a comment -

          Integrated in Hadoop-trunk #677 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/677/)
          . Support memory based scheduling in capacity scheduler. Contributed by Vinod Kumar Vavilapalli.

          Show
          Hudson added a comment - Integrated in Hadoop-trunk #677 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/677/ ) . Support memory based scheduling in capacity scheduler. Contributed by Vinod Kumar Vavilapalli.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          The list of tests that I ran on a real cluster:

          • Disable TT memory management and memory based scheduling. Run a sleep job. Job should run successfully without any tasks failing (irrespective of its virtual memory usage).
          • Disable TT memory management and memory based scheduling. Run a sleep job after configuring its mapred.task.maxvmem and mapred.task.maxpmem to large values. Job should run successfully without any tasks failing.

          Enable both TT memory management and scheduling based on memory for the remaining tests.

          • Verifying invalid jobs: Configure a sleep job such that job's mapred.task.maxvmem oversteps cluster-wide mapred.task.limit.maxvmem. Job should be rejected by the scheduler.
          • Verifying invalid jobs: Configure a sleep job such that job's mapred.task.maxpmem oversteps cluster-wide mapred.task.limit.maxpmem. Job should be rejected by the scheduler.
          • Verifying default values: Submit a sleep job without any memory related configuration. It should take up the cluster-wide values specified via mapred.task.default.maxvmem and mapred.task.default-pmem-percentage-in-vmem. This should be verified by looking at the job.xml from JT UI.

          The following currently work only on Linux. mapred.task.limit.maxvmem and mapred.task.limit.maxpmem should be considerably high so as to not interfere with jobs.

          • Verifying vmem based scheduling: Start cluster with 1 map and 1 reduce slots on all TTs. First submit a sleep job job1 with map tasks occupying the whole cluster and only one reduce task. Job's mapred.task.maxvmem should be such that it is less than (TT total vmem - mapred.tasktracker.vmem.reserved). TT total vmem can be calculated by running "cat /proc/meminfo" and adding up MemTotal and SwapTotal. While the tasks of this job are still running, submit another sleep job job2 with one map tasks and reduce tasks occupying the whole cluster with the same mapred.task.maxvmem. This job should be blocked by the cluster till tasks of the first job start finishing. Now submit job3 identical to job1. This should be blocked till tasks of job2 start finishing.
          • Verifying pmem based scheduling: Start cluster with 1 map and 1 reduce slots on all TTs. First submit a sleep job job1 with map tasks occupying the whole cluster and only one reduce task. Job's mapred.task.maxpmem should be such that it is less than (TT total pmem - mapred.tasktracker.pmem.reserved). TT total pmem can be obtained from the MemTotal field of the output of "cat /proc/meminfo". While the tasks of this job are still running, submit another sleep job job2 with one map tasks and reduce tasks occupying the whole cluster with the same mapred.task.maxpmem. This job should be blocked by the cluster till tasks of the first job start finishing. Now submit job3 identical to job1. This should be blocked till tasks of job2 start finishing.
          Show
          Vinod Kumar Vavilapalli added a comment - The list of tests that I ran on a real cluster: Disable TT memory management and memory based scheduling. Run a sleep job. Job should run successfully without any tasks failing (irrespective of its virtual memory usage). Disable TT memory management and memory based scheduling. Run a sleep job after configuring its mapred.task.maxvmem and mapred.task.maxpmem to large values. Job should run successfully without any tasks failing. Enable both TT memory management and scheduling based on memory for the remaining tests. Verifying invalid jobs: Configure a sleep job such that job's mapred.task.maxvmem oversteps cluster-wide mapred.task.limit.maxvmem. Job should be rejected by the scheduler. Verifying invalid jobs: Configure a sleep job such that job's mapred.task.maxpmem oversteps cluster-wide mapred.task.limit.maxpmem. Job should be rejected by the scheduler. Verifying default values: Submit a sleep job without any memory related configuration. It should take up the cluster-wide values specified via mapred.task.default.maxvmem and mapred.task.default-pmem-percentage-in-vmem. This should be verified by looking at the job.xml from JT UI. The following currently work only on Linux. mapred.task.limit.maxvmem and mapred.task.limit.maxpmem should be considerably high so as to not interfere with jobs. Verifying vmem based scheduling: Start cluster with 1 map and 1 reduce slots on all TTs. First submit a sleep job job1 with map tasks occupying the whole cluster and only one reduce task. Job's mapred.task.maxvmem should be such that it is less than (TT total vmem - mapred.tasktracker.vmem.reserved). TT total vmem can be calculated by running "cat /proc/meminfo" and adding up MemTotal and SwapTotal. While the tasks of this job are still running, submit another sleep job job2 with one map tasks and reduce tasks occupying the whole cluster with the same mapred.task.maxvmem. This job should be blocked by the cluster till tasks of the first job start finishing. Now submit job3 identical to job1. This should be blocked till tasks of job2 start finishing. Verifying pmem based scheduling: Start cluster with 1 map and 1 reduce slots on all TTs. First submit a sleep job job1 with map tasks occupying the whole cluster and only one reduce task. Job's mapred.task.maxpmem should be such that it is less than (TT total pmem - mapred.tasktracker.pmem.reserved). TT total pmem can be obtained from the MemTotal field of the output of "cat /proc/meminfo". While the tasks of this job are still running, submit another sleep job job2 with one map tasks and reduce tasks occupying the whole cluster with the same mapred.task.maxpmem. This job should be blocked by the cluster till tasks of the first job start finishing. Now submit job3 identical to job1. This should be blocked till tasks of job2 start finishing.
          Hide
          Robert Chansler added a comment -

          Edit release note for publication.

          Show
          Robert Chansler added a comment - Edit release note for publication.

            People

            • Assignee:
              Vinod Kumar Vavilapalli
              Reporter:
              Hemanth Yamijala
            • Votes:
              0 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development