Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-961

ResourceAwareLoadManager to dynamically decide new tasks based on current CPU/memory load on TaskTracker(s)

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.22.0
    • Fix Version/s: None
    • Component/s: contrib/fair-share
    • Labels:
      None
    • Tags:
      fb

      Description

      Design and develop a ResouceAwareLoadManager for the FairShare scheduler that dynamically decides how many maps/reduces to run on a particular machine based on the CPU/Memory/diskIO/network usage in that machine. The amount of resources currently used on each task tracker is being fed into the ResourceAwareLoadManager in real-time via an entity that is external to Hadoop.

      1. ResourceScheduling.pdf
        338 kB
        Scott Chen
      2. MAPREDUCE-961-v4.patch
        27 kB
        Scott Chen
      3. MAPREDUCE-961-v3.patch
        51 kB
        Scott Chen
      4. MAPREDUCE-961-v2.patch
        139 kB
        Scott Chen
      5. HIVE-961.patch
        127 kB
        Scott Chen

        Issue Links

          Activity

          Hide
          Scott Chen added a comment -

          @Arun: Here is the overall design document. I hope this provides a coherent big picture of the design. Let me know what you think.

          Our goal is to collect resource information for tasks and tasktrackers and schedule tasks based on the overall information. But since MAPREDUCE-220 (per task information) is currently blocked by MAPREDUCE-901. We may first finish MAPREDUCE-1218 (per node information) and do some resource based scheduling based only on the per node resource information.

          Show
          Scott Chen added a comment - @Arun: Here is the overall design document. I hope this provides a coherent big picture of the design. Let me know what you think. Our goal is to collect resource information for tasks and tasktrackers and schedule tasks based on the overall information. But since MAPREDUCE-220 (per task information) is currently blocked by MAPREDUCE-901 . We may first finish MAPREDUCE-1218 (per node information) and do some resource based scheduling based only on the per node resource information.
          Hide
          Scott Chen added a comment -

          I created a sub-task for this one for collecting the TaskTracker resource in MAPREDUCE-1218.
          I will factor out the codes for TaskTracker resource collecting there and leave the scheduling related codes here.

          Show
          Scott Chen added a comment - I created a sub-task for this one for collecting the TaskTracker resource in MAPREDUCE-1218 . I will factor out the codes for TaskTracker resource collecting there and leave the scheduling related codes here.
          Hide
          Scott Chen added a comment -

          @Matei: I have implemented your pseudo code about the task type order and posted in MAPREDUCE-1198. Can you help me review it?

          Show
          Scott Chen added a comment - @Matei: I have implemented your pseudo code about the task type order and posted in MAPREDUCE-1198 . Can you help me review it?
          Hide
          Hadoop QA added a comment -

          +1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12424888/MAPREDUCE-961-v4.patch
          against trunk revision 835968.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 9 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          +1 core tests. The patch passed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/242/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/242/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/242/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/242/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - +1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12424888/MAPREDUCE-961-v4.patch against trunk revision 835968. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 9 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/242/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/242/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/242/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/242/console This message is automatically generated.
          Hide
          Scott Chen added a comment -

          I will write an overall design document and post it here soon.

          Show
          Scott Chen added a comment - I will write an overall design document and post it here soon.
          Hide
          Scott Chen added a comment -

          Thanks for the comment, Arun. I have changed the patch a lot following the suggestion froms Matei and Vinod. The last patch is total different from the first one. I am sorry about the confusion.

          The following is the design
          1. We obtain the available memory on the TT using MemoryCalculatorPlugin. Originally this class calculates only total memory only, we add a slight change so that it also computes the available memory.
          2. The information is reported with TaskTrackerStatus.ResourceStatus back to JT.
          3. In MemBasedLoadManager, we look at the available memory on TT, the maximum memory per task (from jobConf) and a configured reserved memory on TT. If (available memory - task memory > reserved memory), we return true which allows scheduler to lauch the task.

          The initial idea also includes using the memory usage of a job collecting in the cluster. Right now we only use the value obtained in jobConf. After MAPREDUCE-220 is done, we can use the task memory estimated by the previous tasks.

          Show
          Scott Chen added a comment - Thanks for the comment, Arun. I have changed the patch a lot following the suggestion froms Matei and Vinod. The last patch is total different from the first one. I am sorry about the confusion. The following is the design 1. We obtain the available memory on the TT using MemoryCalculatorPlugin. Originally this class calculates only total memory only, we add a slight change so that it also computes the available memory. 2. The information is reported with TaskTrackerStatus.ResourceStatus back to JT. 3. In MemBasedLoadManager, we look at the available memory on TT, the maximum memory per task (from jobConf) and a configured reserved memory on TT. If (available memory - task memory > reserved memory), we return true which allows scheduler to lauch the task. The initial idea also includes using the memory usage of a job collecting in the cluster. Right now we only use the value obtained in jobConf. After MAPREDUCE-220 is done, we can use the task memory estimated by the previous tasks.
          Hide
          Arun C Murthy added a comment -

          I'm looking at all the comments here and I cannot find a single, coherent, design for this feature - a major one. Can you please put up a design?

          I'd first like to understand/debate the design before I look at the patch.

          Show
          Arun C Murthy added a comment - I'm looking at all the comments here and I cannot find a single, coherent, design for this feature - a major one. Can you please put up a design? I'd first like to understand/debate the design before I look at the patch.
          Hide
          Scott Chen added a comment -

          Fixed the failed test in TestTTMemoryReporting. This test will run on linux only. Previously I was running test on mac so I did not catch the this failure.

          Show
          Scott Chen added a comment - Fixed the failed test in TestTTMemoryReporting. This test will run on linux only. Previously I was running test on mac so I did not catch the this failure.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12424390/MAPREDUCE-961-v3.patch
          against trunk revision 835237.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 18 new or modified tests.

          -1 patch. The patch command could not apply the patch.

          Console output: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/240/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12424390/MAPREDUCE-961-v3.patch against trunk revision 835237. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 18 new or modified tests. -1 patch. The patch command could not apply the patch. Console output: http://hudson.zones.apache.org/hudson/job/Mapreduce-Patch-h6.grid.sp2.yahoo.net/240/console This message is automatically generated.
          Hide
          Scott Chen added a comment -

          I have filed the task scheduling order issue in MAPREDUCE-1198. I am working on it.

          Show
          Scott Chen added a comment - I have filed the task scheduling order issue in MAPREDUCE-1198 . I am working on it.
          Hide
          Scott Chen added a comment -

          Lots of things changed in this patch.
          1. Make MemoryCalculatorPlugin.java collect available memory on the TT.
          2. The available memory information is added in TaskTrackerStatus.ResourceStatus and transmit by heartbeat
          3. MemBasedLoadManager launch the task if [Avalable memory - Max memory per task > Reserved Memory]
          where Memory per Task is from existing configuration and reserved memory needs to be configured.

          I have also fixed the problem that Matei pointed out to return super.canLaunchTask().

          @Matei: About the other suggestion about alternatively launching different types of task, I will open another JIRA and work on that. Because it is not part of MemBasedLoadManager. Thanks.

          Show
          Scott Chen added a comment - Lots of things changed in this patch. 1. Make MemoryCalculatorPlugin.java collect available memory on the TT. 2. The available memory information is added in TaskTrackerStatus.ResourceStatus and transmit by heartbeat 3. MemBasedLoadManager launch the task if [Avalable memory - Max memory per task > Reserved Memory] where Memory per Task is from existing configuration and reserved memory needs to be configured. I have also fixed the problem that Matei pointed out to return super.canLaunchTask(). @Matei: About the other suggestion about alternatively launching different types of task, I will open another JIRA and work on that. Because it is not part of MemBasedLoadManager. Thanks.
          Hide
          Scott Chen added a comment -

          I see. Your pseudo code is very clear. Thanks.
          I think this definitely makes more sense than first looking at all the map tasks and then looking at all the reduce tasks.

          Show
          Scott Chen added a comment - I see. Your pseudo code is very clear. Thanks. I think this definitely makes more sense than first looking at all the map tasks and then looking at all the reduce tasks.
          Hide
          Matei Zaharia added a comment -

          Actually I have a minor bug in the second code snippet... instead of picking one task type and breaking, you should sort the task types by whoever has the least number of active tasks on the node, and try both task types. For example, if there are fewer maps on the node than reduces, first look for a map, and then for a reduce. Only break if neither task type has tasks to launch.

          Show
          Matei Zaharia added a comment - Actually I have a minor bug in the second code snippet... instead of picking one task type and breaking, you should sort the task types by whoever has the least number of active tasks on the node, and try both task types. For example, if there are fewer maps on the node than reduces, first look for a map, and then for a reduce. Only break if neither task type has tasks to launch.
          Hide
          Matei Zaharia added a comment -

          I see. So we may need to count the previous continuously launched map tasks in canLauch task and return a false if there are too many map tasks launched in a row. Is this correct?

          Yes, either that or to change the scheduler to alternate between looking for a map and looking for a reduce. Right now the logic in there is organized as:

          for taskType in {MAP, REDUCE}:
            while true:
              if canLaunchTask(..., taskType):
               try to find a job with pending task
               if found a job:
                 launch task
               else:
                 break
          

          It should become something like this:

          while true:
            pick taskType to try next (if node has fewer maps than reduces, choose map; else choose reduce)
            if canLaunchTask(..., taskType):
             try to find a job with pending task
             if found a job:
               launch task
             else:
               break
          
          Show
          Matei Zaharia added a comment - I see. So we may need to count the previous continuously launched map tasks in canLauch task and return a false if there are too many map tasks launched in a row. Is this correct? Yes, either that or to change the scheduler to alternate between looking for a map and looking for a reduce. Right now the logic in there is organized as: for taskType in {MAP, REDUCE}: while true : if canLaunchTask(..., taskType): try to find a job with pending task if found a job: launch task else : break It should become something like this: while true : pick taskType to try next ( if node has fewer maps than reduces, choose map; else choose reduce) if canLaunchTask(..., taskType): try to find a job with pending task if found a job: launch task else : break
          Hide
          Scott Chen added a comment -

          Hi Matei,

          Thanks for the comment.

          1. I agree. It is good to keep this option.
          2. You are right about MemBasedLoadManger.canLaunchTask. I will make the necessary change.
          3. I agree. There should still be max slots limit. But we can make it higher since now we have the resource monitoring.
          4.1 There are some per task memory limit configuration (HADOOP-5881). But those are for virtual memory. We may need something similar but for physical memory.
          4.2 I see. So we may need to count the previous continuously launched map tasks in canLauch task and return a false if there are too many map tasks launched in a row. Is this correct?

          You comments are very useful. I will make the necessary changes and upload a new patch soon. Let me know if you have other suggestions. Thanks!

          Show
          Scott Chen added a comment - Hi Matei, Thanks for the comment. 1. I agree. It is good to keep this option. 2. You are right about MemBasedLoadManger.canLaunchTask. I will make the necessary change. 3. I agree. There should still be max slots limit. But we can make it higher since now we have the resource monitoring. 4.1 There are some per task memory limit configuration ( HADOOP-5881 ). But those are for virtual memory. We may need something similar but for physical memory. 4.2 I see. So we may need to count the previous continuously launched map tasks in canLauch task and return a false if there are too many map tasks launched in a row. Is this correct? You comments are very useful. I will make the necessary changes and upload a new patch soon. Let me know if you have other suggestions. Thanks!
          Hide
          Matei Zaharia added a comment -

          Hi Scott and Dhruba,

          I've looked at the patch a little bit and have a few comments:

          1. I agree with Dhruba that it would be good to have the option of running multiple Hadoop clusters in parallel. It's also good design to allow the metrics data to be consumed by multiple sources.
          2. In MemBasedLoadManager.canLaunchTask, you are returning true in some cases and saying that this is "equivalent to the case of using only CapBasedLoadManager". How is that happening? I think you would need to return super.canLaunchTask(...), not true. The Fair Scheduler itself doesn't look at slot counts.
          3. It might be useful to use the max map slots / max reduce slots settings as upper bounds on the total number of tasks on each node, to limit the number of processes launched. In this case an administrator could configure the slots higher (e.g. 20 map slots and 10 reduce slots), and the node utilization would be used to determine when fewer than this number of tasks should be launched. Otherwise, a job with very low-utilization tasks could cause hundreds of processes to be launched on each node.
          4. Have you thought in detail about how the MemBasedLoadManager will work when the scheduler tries to launch multiple tasks per heartbeat (part of MAPREDUCE-706)? I think there are two questions:
            • First, you will need to cap the number of tasks launched per heartbeat based on free memory on the node, so that we don't end up launching too many tasks and overcommitting memory. One way to do this might be to count tasks we schedule against the free memory on the node, and conservatively estimate them to each use 2 GB or something (admin-configurable).
            • Second, it's important to launch both reduces and maps if both types of tasks are available. The current multiple-task-per-heartbeat code in MAPREDUCE-706 (and in all the other schedulers as far as I know) will first try to launch map tasks until canLaunchTask(TaskType.MAP) returns false (or until there are no pending map tasks), and will the look for pending reduce tasks. With the current MemBasedLoadManager, this would starve reduces whenever there are pending maps. It would be better to alternate between the two task types if both are available.
          Show
          Matei Zaharia added a comment - Hi Scott and Dhruba, I've looked at the patch a little bit and have a few comments: I agree with Dhruba that it would be good to have the option of running multiple Hadoop clusters in parallel. It's also good design to allow the metrics data to be consumed by multiple sources. In MemBasedLoadManager.canLaunchTask, you are returning true in some cases and saying that this is "equivalent to the case of using only CapBasedLoadManager". How is that happening? I think you would need to return super.canLaunchTask(...), not true. The Fair Scheduler itself doesn't look at slot counts. It might be useful to use the max map slots / max reduce slots settings as upper bounds on the total number of tasks on each node, to limit the number of processes launched. In this case an administrator could configure the slots higher (e.g. 20 map slots and 10 reduce slots), and the node utilization would be used to determine when fewer than this number of tasks should be launched. Otherwise, a job with very low-utilization tasks could cause hundreds of processes to be launched on each node. Have you thought in detail about how the MemBasedLoadManager will work when the scheduler tries to launch multiple tasks per heartbeat (part of MAPREDUCE-706 )? I think there are two questions: First, you will need to cap the number of tasks launched per heartbeat based on free memory on the node, so that we don't end up launching too many tasks and overcommitting memory. One way to do this might be to count tasks we schedule against the free memory on the node, and conservatively estimate them to each use 2 GB or something (admin-configurable). Second, it's important to launch both reduces and maps if both types of tasks are available. The current multiple-task-per-heartbeat code in MAPREDUCE-706 (and in all the other schedulers as far as I know) will first try to launch map tasks until canLaunchTask(TaskType.MAP) returns false (or until there are no pending map tasks), and will the look for pending reduce tasks. With the current MemBasedLoadManager, this would starve reduces whenever there are pending maps. It would be better to alternate between the two task types if both are available.
          Hide
          Scott Chen added a comment -

          I have submit a patch in MAPREDUCE-1167 to include RSS information in ProcfsBasedProcessTree.
          The final goal is to gather CPU and memory information from ProcfsBasedProcessTree and send the information though heartbeat as Vinod suggested.

          Show
          Scott Chen added a comment - I have submit a patch in MAPREDUCE-1167 to include RSS information in ProcfsBasedProcessTree. The final goal is to gather CPU and memory information from ProcfsBasedProcessTree and send the information though heartbeat as Vinod suggested.
          Hide
          Scott Chen added a comment -

          After deploying on our cluster, I have fixed several minor things. Here's the new patch.
          It seems this works well on our cluster. I will run more real-world tests on it.

          I haven't integrated this in JT and TT. I will do that once I have time.

          One more thought: we use PS and use grep to find job_id pattern and ppid = tasktracker pid.
          This method has a benefit that it dose not depend on TT to find relevant processes.
          With this approach, we are able to find some "orphan" jobs (TT does not track them but they are running) on our cluster.

          Show
          Scott Chen added a comment - After deploying on our cluster, I have fixed several minor things. Here's the new patch. It seems this works well on our cluster. I will run more real-world tests on it. I haven't integrated this in JT and TT. I will do that once I have time. One more thought: we use PS and use grep to find job_id pattern and ppid = tasktracker pid. This method has a benefit that it dose not depend on TT to find relevant processes. With this approach, we are able to find some "orphan" jobs (TT does not track them but they are running) on our cluster.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          We are planing to use this on MAPREDUCE-1044. The idea is to use the information to decide whether to move a TT from one cluster to another. I will also survey the possibility of using metrics api. Thanks.

          I see. Metrics do make sense there. Please continue if it really works for you.

          Thank you both, @Dhruba and @Scott, for being accommodating! Thanks!

          Show
          Vinod Kumar Vavilapalli added a comment - We are planing to use this on MAPREDUCE-1044 . The idea is to use the information to decide whether to move a TT from one cluster to another. I will also survey the possibility of using metrics api. Thanks. I see. Metrics do make sense there. Please continue if it really works for you. Thank you both, @Dhruba and @Scott, for being accommodating! Thanks!
          Hide
          Scott Chen added a comment -

          >how do you intend to use it?
          We are planing to use this on MAPREDUCE-1044. The idea is to use the information to decide whether to move a TT from one cluster to another. I will also survey the possibility of using metrics api. Thanks.

          >Perhaps. But I think eventually we should move inside the framework.
          I agree. Moving this inside the framework is a better design. I will follow your suggestion to build this inside the framework. In the meantime, we will keep testing these independent daemons on our clusters.

          Show
          Scott Chen added a comment - >how do you intend to use it? We are planing to use this on MAPREDUCE-1044 . The idea is to use the information to decide whether to move a TT from one cluster to another. I will also survey the possibility of using metrics api. Thanks. >Perhaps. But I think eventually we should move inside the framework. I agree. Moving this inside the framework is a better design. I will follow your suggestion to build this inside the framework. In the meantime, we will keep testing these independent daemons on our clusters.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          I think you got my point.

          The reason why we built them as separate daemons is mainly because we want this to run on multiple map-reduce clusters as Dhruba mentioned.

          will definitely study how to put these daemons inside TT and JT. I think one possibility is that we build them inside TT and JT but still provide the RPC interface in Collector.

          If we need information on multiple clusters, we can go to the corresponding Collectors and get them via RPC.

          That sounds a slightly different use-case to me. Metrics api can be used for this.

          Just curious: how do you intend to use it? Currently scheduler is very tightly coupled with a single cluster/JobTracker. Information exposed by multiple clusters is currently unusable by any single cluster unless you have some external components. If indeed you have external components outside of mapred, metrics api seems the correct tool. Thoughts?

          Also, at this stage, it is easy to test these daemons without the dependency on JT or TT. We can easily change/restart these daemons without affecting the map-reduce cluster.

          Perhaps. But I think eventually we should move inside the framework. In any case, for a clean design, we can still factor them out to well defined classes and so. Once that is done, if ever we want to move them out into separate daemons, it won't be infinitely complex.

          Show
          Vinod Kumar Vavilapalli added a comment - I think you got my point. The reason why we built them as separate daemons is mainly because we want this to run on multiple map-reduce clusters as Dhruba mentioned. will definitely study how to put these daemons inside TT and JT. I think one possibility is that we build them inside TT and JT but still provide the RPC interface in Collector. If we need information on multiple clusters, we can go to the corresponding Collectors and get them via RPC. That sounds a slightly different use-case to me. Metrics api can be used for this. Just curious: how do you intend to use it? Currently scheduler is very tightly coupled with a single cluster/JobTracker. Information exposed by multiple clusters is currently unusable by any single cluster unless you have some external components. If indeed you have external components outside of mapred, metrics api seems the correct tool. Thoughts? Also, at this stage, it is easy to test these daemons without the dependency on JT or TT. We can easily change/restart these daemons without affecting the map-reduce cluster. Perhaps. But I think eventually we should move inside the framework. In any case, for a clean design, we can still factor them out to well defined classes and so. Once that is done, if ever we want to move them out into separate daemons, it won't be infinitely complex.
          Hide
          Scott Chen added a comment -

          @Vinod:

          Thank you for the suggestions. Combining the resource monitoring daemon in TaskTracker and the Collector in Jobtracker is a really good idea.
          I just repeat your points to see if I get them:
          1. A lot of codes/logic can be reused such as the HeartBeats mechanism.
          2. Information can be more cohesive (TaskTrackerStatus.ResourceStatus holds all the utilization information)
          3. Monitoring daemon can access information of the TaskTracker (taskid, jobid...)
          4. Collector can access information of the JobTracker (jobid, user, #map tasks, #reduce tasks...)

          The reason why we built them as separate daemons is mainly because we want this to run on multiple map-reduce clusters as Dhruba mentioned.
          Also, at this stage, it is easy to test these daemons without the dependency on JT or TT. We can easily change/restart these daemons without affecting the map-reduce cluster.

          I will definitely study how to put these daemons inside TT and JT. I think one possibility is that we build them inside TT and JT but still provide the RPC interface in Collector.
          If we need information on multiple clusters, we can go to the corresponding Collectors and get them via RPC.

          @Dhruba:

          Thanks. Reused the code in ProcfsBasedProcessTree is a good idea. But this class does not provide the CPU usage information.
          I will see how to reuse this class to get both information.

          Show
          Scott Chen added a comment - @Vinod: Thank you for the suggestions. Combining the resource monitoring daemon in TaskTracker and the Collector in Jobtracker is a really good idea. I just repeat your points to see if I get them: 1. A lot of codes/logic can be reused such as the HeartBeats mechanism. 2. Information can be more cohesive (TaskTrackerStatus.ResourceStatus holds all the utilization information) 3. Monitoring daemon can access information of the TaskTracker (taskid, jobid...) 4. Collector can access information of the JobTracker (jobid, user, #map tasks, #reduce tasks...) The reason why we built them as separate daemons is mainly because we want this to run on multiple map-reduce clusters as Dhruba mentioned. Also, at this stage, it is easy to test these daemons without the dependency on JT or TT. We can easily change/restart these daemons without affecting the map-reduce cluster. I will definitely study how to put these daemons inside TT and JT. I think one possibility is that we build them inside TT and JT but still provide the RPC interface in Collector. If we need information on multiple clusters, we can go to the corresponding Collectors and get them via RPC. @Dhruba: Thanks. Reused the code in ProcfsBasedProcessTree is a good idea. But this class does not provide the CPU usage information. I will see how to reuse this class to get both information.
          Hide
          dhruba borthakur added a comment -

          > The TaskTracker knows well about jobs/tasks/sub-processes

          I like the idea of using ProcfsBasedProcessTree for finding total memory used by the subtree. Scott: is this possible to do?

          I like the idea of integrating the Collector with the JobTracker in future. Let's see if we can configure it such that the Collector can run inside the JobTracker or outside it depending on the configuration specified by the adminstrator. I would seriously like to keep the option open where I have one Collector for multiple JobTrackers.. helps when I have too many map-reduce clusters floating around. do you think this is ok with you?

          Show
          dhruba borthakur added a comment - > The TaskTracker knows well about jobs/tasks/sub-processes I like the idea of using ProcfsBasedProcessTree for finding total memory used by the subtree. Scott: is this possible to do? I like the idea of integrating the Collector with the JobTracker in future. Let's see if we can configure it such that the Collector can run inside the JobTracker or outside it depending on the configuration specified by the adminstrator. I would seriously like to keep the option open where I have one Collector for multiple JobTrackers.. helps when I have too many map-reduce clusters floating around. do you think this is ok with you?
          Hide
          Vinod Kumar Vavilapalli added a comment -

          Hastily looked at the patch. Some comments overall.

          • We already have a complete reporting framework in the form of HeartBeats and don't think we need new setup of daemons yet.
            • InterTrackerProtocol can substitute CollectorProtocol
            • Reporting can be done via TaskTrackerStatus.ResourceStatus at the TaskTracker via HeartBeat
            • Aggregate collection can be via the same TaskTrackerStatus.ResourceStatus at the JobTracker via HeartBeat
            • Getters for the utilization stats can be abstracted so schedulers can use them directly from the JT.
          • The collecting framework of this patch can still be retained - particularly aggregating stats over all the TaskTrackers and per job can be retained on the JobTracker side via a Collector entity embedded in JobTracker.
          • The TaskTracker knows well about jobs/tasks/sub-processes. The current job-based aggregation of stats on the TaskTracker is weak because it identifies tasks corresponding to a job by grepping for job-ids in 'ps' . This is very error-prone. This complication arises from the fact that we are decoupling guaging from TaskTrackers into separate daemons. This can be made concrete by doing it in TaskTracker address space itself similar to what TaskMemoryManager does. TaskTracker already maintains the state we need and knows which tasks belong to this job, and which processes belong to a task. One more reason strengthening the argument against a new framework.
          • The MemBasedLoadManager will still work but by obtaining information via TaskTrackerStatus and the Collector embedded in JobTracker.

          I am just trying to see if we can leverage already existing (and well tested) code.

          The good news is that we can retrofit the current patch to do most of the above hardly losing any code. Particularly most of the guaging utilities are reusable without any changes. Thoughts?

          Show
          Vinod Kumar Vavilapalli added a comment - Hastily looked at the patch. Some comments overall. We already have a complete reporting framework in the form of HeartBeats and don't think we need new setup of daemons yet. InterTrackerProtocol can substitute CollectorProtocol Reporting can be done via TaskTrackerStatus.ResourceStatus at the TaskTracker via HeartBeat Aggregate collection can be via the same TaskTrackerStatus.ResourceStatus at the JobTracker via HeartBeat Getters for the utilization stats can be abstracted so schedulers can use them directly from the JT. The collecting framework of this patch can still be retained - particularly aggregating stats over all the TaskTrackers and per job can be retained on the JobTracker side via a Collector entity embedded in JobTracker. The TaskTracker knows well about jobs/tasks/sub-processes. The current job-based aggregation of stats on the TaskTracker is weak because it identifies tasks corresponding to a job by grepping for job-ids in 'ps' . This is very error-prone. This complication arises from the fact that we are decoupling guaging from TaskTrackers into separate daemons. This can be made concrete by doing it in TaskTracker address space itself similar to what TaskMemoryManager does. TaskTracker already maintains the state we need and knows which tasks belong to this job, and which processes belong to a task. One more reason strengthening the argument against a new framework. The MemBasedLoadManager will still work but by obtaining information via TaskTrackerStatus and the Collector embedded in JobTracker. I am just trying to see if we can leverage already existing (and well tested) code. The good news is that we can retrofit the current patch to do most of the above hardly losing any code. Particularly most of the guaging utilities are reusable without any changes. Thoughts?
          Hide
          Scott Chen added a comment -

          1. This patch implements class org.apache.hadoop.mapred.MemBasedLoadManager which looks at the memory usage of the TaskTracker and the Job of the task to determine whether to load this task.
          2. This patch also contains a resource utilization monitoring daemon which run on each of the TaskTrackers and a information collecting daemon called Collector which collects and aggregates the resource utilization information submitted from each monitoring daemons.

          Note that this is somewhat similar to org.apache.hadoop.mapred.TaskMemoryManagerThread but different. Because we are collecting information based both on TaskTracker and Jobs. TaskMemoryManagerThread does not know the memory usage based on Hadoop job. Also, we are reporting the information back to a master called Collector and resolve the memory issue in the jobtracker level where TaskMemoryManagerThread resolves this issue in the tasktracker level.

          Show
          Scott Chen added a comment - 1. This patch implements class org.apache.hadoop.mapred.MemBasedLoadManager which looks at the memory usage of the TaskTracker and the Job of the task to determine whether to load this task. 2. This patch also contains a resource utilization monitoring daemon which run on each of the TaskTrackers and a information collecting daemon called Collector which collects and aggregates the resource utilization information submitted from each monitoring daemons. Note that this is somewhat similar to org.apache.hadoop.mapred.TaskMemoryManagerThread but different. Because we are collecting information based both on TaskTracker and Jobs. TaskMemoryManagerThread does not know the memory usage based on Hadoop job. Also, we are reporting the information back to a master called Collector and resolve the memory issue in the jobtracker level where TaskMemoryManagerThread resolves this issue in the tasktracker level.
          Hide
          dhruba borthakur added a comment -

          > 1. Are you planning to use an existing monitoring system or create your own?

          Most exiting monitoring systems cannot be used as it is because they do not report statistics based on hadoop jobs. The system has to report CPU/memory etc per hadoop job.

          > 2. Will this make mapred.tasktracker.

          {map|reduce}.tasks.maximum obsolete?
          This is very likely to happen. I imagine that the scheduler can continue to schedule tasks (even though it exceeds may mapred.tasktracker.{map|reduce}

          .tasks.maximum because it can find that there is plenty of CPU left on a particular slave machine).

          Show
          dhruba borthakur added a comment - > 1. Are you planning to use an existing monitoring system or create your own? Most exiting monitoring systems cannot be used as it is because they do not report statistics based on hadoop jobs. The system has to report CPU/memory etc per hadoop job. > 2. Will this make mapred.tasktracker. {map|reduce}.tasks.maximum obsolete? This is very likely to happen. I imagine that the scheduler can continue to schedule tasks (even though it exceeds may mapred.tasktracker.{map|reduce} .tasks.maximum because it can find that there is plenty of CPU left on a particular slave machine).
          Hide
          Jaideep Dhok added a comment -

          I have two questions:
          1. Are you planning to use an existing monitoring system or create your own?
          and
          2. Will this make mapred.tasktracker.

          {map|reduce}

          .tasks.maximum obsolete? What I mean to ask is since task assignment will be based on the state of the resources, will these settings still be necessary?

          Show
          Jaideep Dhok added a comment - I have two questions: 1. Are you planning to use an existing monitoring system or create your own? and 2. Will this make mapred.tasktracker. {map|reduce} .tasks.maximum obsolete? What I mean to ask is since task assignment will be based on the state of the resources, will these settings still be necessary?
          Hide
          Chris Douglas added a comment -

          Dhruba and Nigel request that I restore the tag.

          Show
          Chris Douglas added a comment - Dhruba and Nigel request that I restore the tag.
          Hide
          Chris Douglas added a comment -

          Let's not play this game.

          Show
          Chris Douglas added a comment - Let's not play this game.
          Hide
          dhruba borthakur added a comment -

          Just a unique string to help me query JIRAS I am very interested in.

          Show
          dhruba borthakur added a comment - Just a unique string to help me query JIRAS I am very interested in.
          Hide
          Chris Douglas added a comment -

          What does the "fb" tag denote?

          Show
          Chris Douglas added a comment - What does the "fb" tag denote?

            People

            • Assignee:
              Scott Chen
              Reporter:
              dhruba borthakur
            • Votes:
              3 Vote for this issue
              Watchers:
              22 Start watching this issue

              Dates

              • Created:
                Updated:

                Development