Hadoop Common
  1. Hadoop Common
  2. HADOOP-249

Improving Map -> Reduce performance and Task JVM reuse

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.3.0
    • Fix Version/s: 0.19.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Enabled task JVMs to be reused via the job config mapred.job.reuse.jvm.num.tasks.

      Description

      These patches are really just to make Hadoop start trotting. It is still at least an order of magnitude slower than it should be, but I think these patches are a good start.

      I've created two patches for clarity. They are not independent, but could easily be made so.

      The disk-zoom patch is a performance trifecta: less disk IO, less disk space, less CPU, and overall a tremendous improvement. The patch is based on the following observation: every piece of data from a map hits the disk once on the mapper, and 3 (+plus sorting) times on the reducer. Further, the entire input for the reduce step is sorted together maximizing the sort time. This patch causes:

      1) the mapper to sort the relatively small fragments at the mapper which causes two hits to the disk, but they are smaller files.
      2) the reducer copies the map output and may merge (if more than 100 outputs are present) with a couple of other outputs at copy time. No sorting is done since the map outputs are sorted.
      3) the reducer will merge the map outputs on the fly in memory at reduce time.

      I'm attaching the performance graph (with just the disk-zoom patch) to show the results. This benchmark uses a random input and null output to remove any DFS performance influences. The cluster of 49 machines I was running on had limited disk space, so I was only able to run to a certain size on unmodified Hadoop. With the patch we use 1/3 the amount of disk space.

      The second patch allows the task tracker to reuse processes to avoid the over-head of starting the JVM. While JVM startup is relatively fast, restarting a Task causes disk IO and DFS operations that have a negative impact on the rest of the system. When a Task finishes, rather than exiting, it reads the next task to run from stdin. We still isolate the Task runtime from TaskTracker, but we only pay the startup penalty once.

      This second patch also fixes two performance issues not related to JVM reuse. (The reuse just makes the problems glaring.) First, the JobTracker counts all jobs not just the running jobs to decide the load on a tracker. Second, the TaskTracker should really ask for a new Task as soon as one finishes rather than wait the 10 secs.

      I've been benchmarking the code alot, but I don't have access to a really good cluster to try the code out on, so please treat it as experimental. I would love to feedback.

      There is another obvious thing to change: ReduceTasks should start after the first batch of MapTasks complete, so that 1) they have something to do, and 2) they are running on the fastest machines.

      1. 249-final.patch
        86 kB
        Devaraj Das
      2. 249-after-review.patch
        83 kB
        Devaraj Das
      3. 249-with-jvmID.patch
        54 kB
        Devaraj Das
      4. 249-3.patch
        30 kB
        Devaraj Das
      5. 249.2.patch
        26 kB
        Devaraj Das
      6. 249.1.patch
        25 kB
        Devaraj Das
      7. disk_zoom.patch
        19 kB
        Benjamin Reed
      8. task_zoom.patch
        23 kB
        Benjamin Reed
      9. image001.png
        12 kB
        Benjamin Reed

        Issue Links

          Activity

          Hide
          Doug Cutting added a comment -

          I have not yet had a chance to look closely at your patches, but these are clearly optimizations that we badly need. Thanks!

          Show
          Doug Cutting added a comment - I have not yet had a chance to look closely at your patches, but these are clearly optimizations that we badly need. Thanks!
          Hide
          Johan Oskarsson added a comment -

          What happened to these patches?

          I'm especially interested in the jvm reuse since we not only run large jobs but also a lot of small ones where the setup time really kills performance

          Show
          Johan Oskarsson added a comment - What happened to these patches? I'm especially interested in the jvm reuse since we not only run large jobs but also a lot of small ones where the setup time really kills performance
          Hide
          Owen O'Malley added a comment -

          Most of these things except for the jvm reuse have been done in other patches. I've only left this bug open because of the jvm reuse issues, but the patch is currently useless because the code has changed so much in the last year. In fact, it would be relatively tricky to get right with the capturing of stdout/stderr from the tasks.

          Show
          Owen O'Malley added a comment - Most of these things except for the jvm reuse have been done in other patches. I've only left this bug open because of the jvm reuse issues, but the patch is currently useless because the code has changed so much in the last year. In fact, it would be relatively tricky to get right with the capturing of stdout/stderr from the tasks.
          Hide
          Amar Kamat added a comment -

          Wondering if we can have one jvm per job and have tasks as threads spawned from the job jvm?

          Show
          Amar Kamat added a comment - Wondering if we can have one jvm per job and have tasks as threads spawned from the job jvm?
          Hide
          Owen O'Malley added a comment -

          No, threads within a single jvm has no major advantage over multiple jvms and has significant disadvantages. I feel that the best trade off is having each jvm running a single task at a time, but reusing the jvm for the following task.

          Show
          Owen O'Malley added a comment - No, threads within a single jvm has no major advantage over multiple jvms and has significant disadvantages. I feel that the best trade off is having each jvm running a single task at a time, but reusing the jvm for the following task.
          Hide
          Holden Robbins added a comment -

          In my use case, we'd like to load a significant amount of data into memory at the start of a task run. Sharing 10Gb of read-only data in a single JVM running on a 8-16 processor machine. This would allow for lower cost and more throughput, otherwise we are redundantly loading this significant amount of data and therefore limited by memory rather than CPUs.

          My suggestion would be to provide the option and let the user choose based on their needs. If you're running "untrusted" processes on the cluster, you can run each task in an independent JVM. If you're looking to maximize throughput and memory and you are running trusted processes, you should be able to specify # of threads per Task.

          In my current situation, I'm only able to make use of 4 CPUs on 8 CPU machines because each task requires 10Gb to run, the majority of that being read-only data loaded from distributed cache.

          Show
          Holden Robbins added a comment - In my use case, we'd like to load a significant amount of data into memory at the start of a task run. Sharing 10Gb of read-only data in a single JVM running on a 8-16 processor machine. This would allow for lower cost and more throughput, otherwise we are redundantly loading this significant amount of data and therefore limited by memory rather than CPUs. My suggestion would be to provide the option and let the user choose based on their needs. If you're running "untrusted" processes on the cluster, you can run each task in an independent JVM. If you're looking to maximize throughput and memory and you are running trusted processes, you should be able to specify # of threads per Task. In my current situation, I'm only able to make use of 4 CPUs on 8 CPU machines because each task requires 10Gb to run, the majority of that being read-only data loaded from distributed cache.
          Hide
          Holden Robbins added a comment -

          To be more clear, my suggestion would be to add parameters "mapred.map.tasks.threads" and "mapred.reduce.tasks.threads" and let people set the number of threads to spawn per task, and let them default to 1.

          Show
          Holden Robbins added a comment - To be more clear, my suggestion would be to add parameters "mapred.map.tasks.threads" and "mapred.reduce.tasks.threads" and let people set the number of threads to spawn per task, and let them default to 1.
          Hide
          Sam Pullara added a comment -

          Reducing the number of JVMs spawned should be a high priority. It is a tremendous amount of overhead for low-cpu usage jobs.

          Show
          Sam Pullara added a comment - Reducing the number of JVMs spawned should be a high priority. It is a tremendous amount of overhead for low-cpu usage jobs.
          Hide
          Sam Pullara added a comment -

          Another issue is that the JVM is continuously optimizing your code, when you restart the VM you lose that information and it has to be reconstructed in the next run. CPU bound processes would definitely benefit from having the VM stay around for the length of the job.

          Show
          Sam Pullara added a comment - Another issue is that the JVM is continuously optimizing your code, when you restart the VM you lose that information and it has to be reconstructed in the next run. CPU bound processes would definitely benefit from having the VM stay around for the length of the job.
          Hide
          Christophe Taton added a comment -

          Would this make sense, under some conditions (e.g. tasks code is trusted), to allow the tasks to run directly inside the task tracker jvm?
          I actually had some scenario where the amount of memory available was critically low!

          Show
          Christophe Taton added a comment - Would this make sense, under some conditions (e.g. tasks code is trusted), to allow the tasks to run directly inside the task tracker jvm? I actually had some scenario where the amount of memory available was critically low!
          Show
          Devaraj Das added a comment - A good discussion on the JVM reuse issue happened on HADOOP-3675 . The links : http://issues.apache.org/jira/browse/HADOOP-3675?focusedCommentId=12619448#action_12619448 through http://issues.apache.org/jira/browse/HADOOP-3675?focusedCommentId=12619775#action_12619775 capture this.
          Hide
          Mahadev konar added a comment -

          sorry it was mistake...

          Show
          Mahadev konar added a comment - sorry it was mistake...
          Hide
          Devaraj Das added a comment -

          After a discussion with Owen on the logs issue, it seems the approach where we write everything to three files (stdout/err/log) with an index file containing offsets per task should work. Thoughts?

          Show
          Devaraj Das added a comment - After a discussion with Owen on the logs issue, it seems the approach where we write everything to three files (stdout/err/log) with an index file containing offsets per task should work. Thoughts?
          Hide
          Tom White added a comment -

          > the approach where we write everything to three files (stdout/err/log) with an index file containing offsets per task should work.

          Seems like a good idea. Would the Java process write the index file? If so, it would have to count bytes, but that should be doable.

          I'm not sure what the cause of the performance problem that led to the stream handling being done outside Java (HADOOP-1553), but is it worth revisiting to see if a Java approach is as fast? It would certainly be simpler.

          Show
          Tom White added a comment - > the approach where we write everything to three files (stdout/err/log) with an index file containing offsets per task should work. Seems like a good idea. Would the Java process write the index file? If so, it would have to count bytes, but that should be doable. I'm not sure what the cause of the performance problem that led to the stream handling being done outside Java ( HADOOP-1553 ), but is it worth revisiting to see if a Java approach is as fast? It would certainly be simpler.
          Hide
          Vinod Kumar Vavilapalli added a comment -

          the approach where we write everything to three files (stdout/err/log) with an index file containing offsets per task should work.

          This has problems in some cases : processes (forked by streaming/pipes tasks) that live beyond the life time of the task process pollute the stdout/err/log files, in that they might continue to write to these files even after we mark the end of data from one task. In the current setup they inherit parent task's file descriptors and continue to write to the same files and hence pose no problem.

          I'm not sure what the cause of the performance problem that led to the stream handling being done outside Java (HADOOP-1553), but is it worth revisiting to see if a Java approach is as fast? It would certainly be simpler.

          +1

          Show
          Vinod Kumar Vavilapalli added a comment - the approach where we write everything to three files (stdout/err/log) with an index file containing offsets per task should work. This has problems in some cases : processes (forked by streaming/pipes tasks) that live beyond the life time of the task process pollute the stdout/err/log files, in that they might continue to write to these files even after we mark the end of data from one task. In the current setup they inherit parent task's file descriptors and continue to write to the same files and hence pose no problem. I'm not sure what the cause of the performance problem that led to the stream handling being done outside Java ( HADOOP-1553 ), but is it worth revisiting to see if a Java approach is as fast? It would certainly be simpler. +1
          Hide
          Owen O'Malley added a comment -

          processes (forked by streaming/pipes tasks) that live beyond the life time of the task process pollute the stdout/err/log files

          This is a general problem with combining tasks into a single JVM and I don't believe there any solutions. Take for example, a Mapper that launches a thread that is not joined in the close method. The output will split across the tasks.

          I'm not sure what the cause of the performance problem that led to the stream handling being done outside Java (HADOOP-1553), but is it worth revisiting to see if a Java approach is as fast? It would certainly be simpler.

          I'm confused. Pre HADOOP-1553, the TaskTracker copied the task's stdout and stderr "by hand" to a file. Since you only get a new pair of streams when you start a new JVM, backing out HADOOP-1553 wouldn't help. You would still need to guess where the boundary was.

          Show
          Owen O'Malley added a comment - processes (forked by streaming/pipes tasks) that live beyond the life time of the task process pollute the stdout/err/log files This is a general problem with combining tasks into a single JVM and I don't believe there any solutions. Take for example, a Mapper that launches a thread that is not joined in the close method. The output will split across the tasks. I'm not sure what the cause of the performance problem that led to the stream handling being done outside Java ( HADOOP-1553 ), but is it worth revisiting to see if a Java approach is as fast? It would certainly be simpler. I'm confused. Pre HADOOP-1553 , the TaskTracker copied the task's stdout and stderr "by hand" to a file. Since you only get a new pair of streams when you start a new JVM, backing out HADOOP-1553 wouldn't help. You would still need to guess where the boundary was.
          Hide
          Tom White added a comment -

          > You would still need to guess where the boundary was.

          I was thinking that you would call System.setOut() and System.setErr() for each new task, so that each stream was redirected to a new file. (Of course, if the task hasn't finished producing output, it will go into the wrong output file. But this is a general problem, as you point out.) But if the overhead for copying the process output to a file in Java is so high then we should do the stream handling outside Java and track offsets.

          Show
          Tom White added a comment - > You would still need to guess where the boundary was. I was thinking that you would call System.setOut() and System.setErr() for each new task, so that each stream was redirected to a new file. (Of course, if the task hasn't finished producing output, it will go into the wrong output file. But this is a general problem, as you point out.) But if the overhead for copying the process output to a file in Java is so high then we should do the stream handling outside Java and track offsets.
          Hide
          Craig Macdonald added a comment -

          I was thinking that you could use an OutputStream with ThreadLocal OutputStream within it, and use these with System.setOut() and System.setErr(). This would allow multiple threads to have their own output and error streams. Not sure how this would work with pipes though.

          Show
          Craig Macdonald added a comment - I was thinking that you could use an OutputStream with ThreadLocal OutputStream within it, and use these with System.setOut() and System.setErr(). This would allow multiple threads to have their own output and error streams. Not sure how this would work with pipes though.
          Hide
          Devaraj Das added a comment -

          Will the System.setOut/setErr approach work for the case where the parent Java task spawns child processes (that do printf(), etc.) ? Will the spawned children also see those as their stdout/err?

          Show
          Devaraj Das added a comment - Will the System.setOut/setErr approach work for the case where the parent Java task spawns child processes (that do printf(), etc.) ? Will the spawned children also see those as their stdout/err?
          Hide
          Owen O'Malley added a comment -

          No it doesn't work. It also doesn't work if you use jni code in your application that calls printf. When I was doing HADOOP-1553, I did consider that possibility, but there were too many cases that it does not handle. In the case of this patch, it would still not work with threads that weren't finished when the task switch was done. They would be counted as output from the new task instead of the old one.

          Show
          Owen O'Malley added a comment - No it doesn't work. It also doesn't work if you use jni code in your application that calls printf. When I was doing HADOOP-1553 , I did consider that possibility, but there were too many cases that it does not handle. In the case of this patch, it would still not work with threads that weren't finished when the task switch was done. They would be counted as output from the new task instead of the old one.
          Hide
          Runping Qi added a comment -

          If we implement hadoop 2560, then jvm resuse becomes a less urgent issue.

          Show
          Runping Qi added a comment - If we implement hadoop 2560, then jvm resuse becomes a less urgent issue.
          Hide
          Devaraj Das added a comment -

          I agree with Runping that HADOOP-2560 has more far reaching effects.

          Show
          Devaraj Das added a comment - I agree with Runping that HADOOP-2560 has more far reaching effects.
          Hide
          Owen O'Malley added a comment -

          Sure, but re-using the jvm is the first step for HADOOP-2560. There is a lot of work to implement grouping of maps. In particular, you need to deal with task failures, changes to the shuffle and task event log, interactions with speculative execution and so on. On the other hand, to group mappers you need to reuse the jvm, otherwise you can't join their outputs together.

          Show
          Owen O'Malley added a comment - Sure, but re-using the jvm is the first step for HADOOP-2560 . There is a lot of work to implement grouping of maps. In particular, you need to deal with task failures, changes to the shuffle and task event log, interactions with speculative execution and so on. On the other hand, to group mappers you need to reuse the jvm, otherwise you can't join their outputs together.
          Hide
          Runping Qi added a comment -

          HADOOP-2560 does not need JVM re-use.
          It just needs mapper tasks can work with multiple splits.
          Everything else reminds the same.

          Show
          Runping Qi added a comment - HADOOP-2560 does not need JVM re-use. It just needs mapper tasks can work with multiple splits. Everything else reminds the same.
          Hide
          Devaraj Das added a comment -

          Here is an early untested patch (the major missing part is the handling of the logs, and some cleanup needs to be done). This is up for an early review.

          The major change here is the addition of a new class called JvmManager that does the bookkeeping of the JVMs spawned by the tasktracker.

          Show
          Devaraj Das added a comment - Here is an early untested patch (the major missing part is the handling of the logs, and some cleanup needs to be done). This is up for an early review. The major change here is the addition of a new class called JvmManager that does the bookkeeping of the JVMs spawned by the tasktracker.
          Hide
          Devaraj Das added a comment -

          This has some fixes. Logging not yet done though.

          Show
          Devaraj Das added a comment - This has some fixes. Logging not yet done though.
          Hide
          Devaraj Das added a comment -

          The data-structures have been simplified in this patch. Also, this has been tested a fair amount.

          Show
          Devaraj Das added a comment - The data-structures have been simplified in this patch. Also, this has been tested a fair amount.
          Hide
          Devaraj Das added a comment -

          The loadgen benchmark ran in the following way:

          bin/hadoop jar hadoop-0.19.0-dev/hadoop-0.19.0-dev-test.jar loadgen \
             -D test.randomtextwrite.bytes_per_map=$((100)) \
             -D test.randomtextwrite.total_bytes=$((100*100000)) \
             -D mapred.compress.map.output=false \
             -r 1 \
             -outKey org.apache.hadoop.io.Text \
             -outValue org.apache.hadoop.io.Text \
             -outFormat org.apache.hadoop.mapred.lib.NullOutputFormat \
             -outdir fakeout
          

          (that is 100K mappers each generating 100 bytes of data), run on 100 nodes with 4 mapper slots each, showed 50% improvement for the map-phase.

          Show
          Devaraj Das added a comment - The loadgen benchmark ran in the following way: bin/hadoop jar hadoop-0.19.0-dev/hadoop-0.19.0-dev-test.jar loadgen \ -D test.randomtextwrite.bytes_per_map=$((100)) \ -D test.randomtextwrite.total_bytes=$((100*100000)) \ -D mapred.compress.map.output= false \ -r 1 \ -outKey org.apache.hadoop.io.Text \ -outValue org.apache.hadoop.io.Text \ -outFormat org.apache.hadoop.mapred.lib.NullOutputFormat \ -outdir fakeout (that is 100K mappers each generating 100 bytes of data), run on 100 nodes with 4 mapper slots each, showed 50% improvement for the map-phase.
          Hide
          Runping Qi added a comment -

          Generating only 100 bytes, each mapper is essentially doing nothing.
          So you are basically testing the costs of jvm launches and task assignment.
          Thus, I would expect much better improvement (say 10x or more).
          With 2x improvement, it implies that the cost for task assignment is significant, and in your
          case, was a bottleneck.

          How long did the shuffling take?

          Show
          Runping Qi added a comment - Generating only 100 bytes, each mapper is essentially doing nothing. So you are basically testing the costs of jvm launches and task assignment. Thus, I would expect much better improvement (say 10x or more). With 2x improvement, it implies that the cost for task assignment is significant, and in your case, was a bottleneck. How long did the shuffling take?
          Hide
          Devaraj Das added a comment -

          Yes, the aim of this benchmark was to see how large jobs with small tasks would be affected. But as discussed in this jira already, there are other cases where jvm reuse could prove useful. The shuffling in both cases took ~30 minutes.

          Show
          Devaraj Das added a comment - Yes, the aim of this benchmark was to see how large jobs with small tasks would be affected. But as discussed in this jira already, there are other cases where jvm reuse could prove useful. The shuffling in both cases took ~30 minutes.
          Hide
          Devaraj Das added a comment -

          Here is a patch that addresses the task logs issue. In short the patch has the following:
          1) Addition of queues in the tasktracker for maps and reduces where a new task is queued up. When a slot becomes free a task is pulled from one of the relevant queues and initialized/launched
          2) When a tasktracker reports with tasks in COMMIT_PENDING status, it could be given a new task. This new task would be queued up at the tasktracker and launched only when the task doing the commit finishes.
          3) JvmManager is the class that keeps track of running JVMs and tasks running on the JVMs. When a JVM exits the appropriate task (if any was being run by the JVM) is failed.
          4) New task state has been added for signifying INITIALIZED and ready to run. Till a task is initialized (like task locallization, distributed cache init, etc.,) a JVM is not given that.
          5) The log handling is done this way:

          • I have a small indirection file, which I call index.log, in each attempt dir. Let's say that the JVM for attempt_1_1
            also runs attempt_foo_bar. The index.log file in the directory attempt_foo_bar would look like:
            LOG_DIR: attempt_1_1
            STDOUT: <start of stdout for attempt_foo_bar in the stdout file in attempt_1_1 directory> <length>
            SYSLOG: <similar to above>
            STDERR: <similar to above>
          • I create this log.index file at task startup, and have a thread to continously update the lengths of the log files
            so that one can see the logs of a running task.
          • I modified tools like TaskLog.Reader to go through this indirection when someone asks for the log files.
            6) A new class JVMId has been created to create/identify JVM IDs instead of plain integers. (Arun had requested for this offline)

          Have tested large sort with this patch.

          The only thing remaining is the handling of the workDir for tasks (that has things like Distributed Cache symlinks). I am going to address that in the next patch very shortly.

          Would really appreciate a review on this one.

          Show
          Devaraj Das added a comment - Here is a patch that addresses the task logs issue. In short the patch has the following: 1) Addition of queues in the tasktracker for maps and reduces where a new task is queued up. When a slot becomes free a task is pulled from one of the relevant queues and initialized/launched 2) When a tasktracker reports with tasks in COMMIT_PENDING status, it could be given a new task. This new task would be queued up at the tasktracker and launched only when the task doing the commit finishes. 3) JvmManager is the class that keeps track of running JVMs and tasks running on the JVMs. When a JVM exits the appropriate task (if any was being run by the JVM) is failed. 4) New task state has been added for signifying INITIALIZED and ready to run. Till a task is initialized (like task locallization, distributed cache init, etc.,) a JVM is not given that. 5) The log handling is done this way: I have a small indirection file, which I call index.log, in each attempt dir. Let's say that the JVM for attempt_1_1 also runs attempt_foo_bar. The index.log file in the directory attempt_foo_bar would look like: LOG_DIR: attempt_1_1 STDOUT: <start of stdout for attempt_foo_bar in the stdout file in attempt_1_1 directory> <length> SYSLOG: <similar to above> STDERR: <similar to above> I create this log.index file at task startup, and have a thread to continously update the lengths of the log files so that one can see the logs of a running task. I modified tools like TaskLog.Reader to go through this indirection when someone asks for the log files. 6) A new class JVMId has been created to create/identify JVM IDs instead of plain integers. (Arun had requested for this offline) Have tested large sort with this patch. The only thing remaining is the handling of the workDir for tasks (that has things like Distributed Cache symlinks). I am going to address that in the next patch very shortly. Would really appreciate a review on this one.
          Hide
          Arun C Murthy added a comment -

          I'm spent a fair bit of time on this, it's looking good - some comments:

          1. JvmManager.getJvmManagerInstance isn't thread-safe.
          2. JvmManagerForType. {taskFinished|taskKilled|killJvm}

            - silently errors are ignored i.e. all the != null checks, at least log them?

          3. JvmManagerForType.reapJvm
            • Minor: Iteration could be cleaner
            • What if numJvmsSpawned >= maxJvms and all are busy and belong to different jobs? i.e should we at least log that?
          4. JVMId should probably have a static counter for the 'id' rather than use hashCode().
          5. TaskRunner.exitCode should probably be initialized to -1, definitely not 0.
          6. TaskTracker:
            • Can we make {map|reduce}TasksToLaunch internal to {map|reduce}

              Launcher? Ditto for numFree

              {Map|Reduce}Slots? Basically make addToTaskQueue and addFreeSlot methods on the TaskLauncher.
              #* Sanity checks in addFree{Map|Reduce}

              Slot ?

            • TaskTracker.initialize should reset numFree {Map|Reduce}

              Slots to handle the case when the TaskTracker bounces?

          7. Minor: TaskTracker.Child - Good time to move it to a different file?

          As an aside: How about removing the

          {map|reduce}

          Launcher threads and getting TaskTracker.getTask to pull it off the queue, initialize task and then returning it to the Child? Thoughts?

          Show
          Arun C Murthy added a comment - I'm spent a fair bit of time on this, it's looking good - some comments: JvmManager.getJvmManagerInstance isn't thread-safe. JvmManagerForType. {taskFinished|taskKilled|killJvm} - silently errors are ignored i.e. all the != null checks, at least log them? JvmManagerForType.reapJvm Minor: Iteration could be cleaner What if numJvmsSpawned >= maxJvms and all are busy and belong to different jobs? i.e should we at least log that? JVMId should probably have a static counter for the 'id' rather than use hashCode(). TaskRunner.exitCode should probably be initialized to -1, definitely not 0. TaskTracker: Can we make {map|reduce}TasksToLaunch internal to {map|reduce} Launcher? Ditto for numFree {Map|Reduce}Slots? Basically make addToTaskQueue and addFreeSlot methods on the TaskLauncher. #* Sanity checks in addFree{Map|Reduce} Slot ? TaskTracker.initialize should reset numFree {Map|Reduce} Slots to handle the case when the TaskTracker bounces? Minor: TaskTracker.Child - Good time to move it to a different file? As an aside: How about removing the {map|reduce} Launcher threads and getting TaskTracker.getTask to pull it off the queue, initialize task and then returning it to the Child? Thoughts?
          Hide
          Devaraj Das added a comment -

          Thanks Arun for the review. Most of them can be addressed without much debate. However, I am not sure about the last comment, i.e., getTask RPC doing the task initialization. Currently, most of the initialization is done in a separate TaskRunner thread and TaskTracker.getTask just returns an initialized task. If we move the initialization to a separate thread, I am worried that if the task initialization takes a long time (since there is communication with the DFS involved as well), then the RPC handlers get blocked (depending on how many JVMs ask for tasks). Also, there is the issue of task cleanup that in my current patch is done in the same TaskRunner thread. That has to be done inline too (or in a separate thread launched when the task finishes) if we remove the taskrunner thread.
          Also, in the current patch, the JVM is launched (if needed) in the same TaskRunner thread. We'd also need to think about where to launch the JVM in the first place so that it starts making the getTask RPCs.
          For the above reasons, I propose to keep the existing model but I am happy to discuss this out. Also, I think having launcher threads sets us up logically for the case where the TT queues up many more tasks than there are slots. What do others think?

          Show
          Devaraj Das added a comment - Thanks Arun for the review. Most of them can be addressed without much debate. However, I am not sure about the last comment, i.e., getTask RPC doing the task initialization. Currently, most of the initialization is done in a separate TaskRunner thread and TaskTracker.getTask just returns an initialized task. If we move the initialization to a separate thread, I am worried that if the task initialization takes a long time (since there is communication with the DFS involved as well), then the RPC handlers get blocked (depending on how many JVMs ask for tasks). Also, there is the issue of task cleanup that in my current patch is done in the same TaskRunner thread. That has to be done inline too (or in a separate thread launched when the task finishes) if we remove the taskrunner thread. Also, in the current patch, the JVM is launched (if needed) in the same TaskRunner thread. We'd also need to think about where to launch the JVM in the first place so that it starts making the getTask RPCs. For the above reasons, I propose to keep the existing model but I am happy to discuss this out. Also, I think having launcher threads sets us up logically for the case where the TT queues up many more tasks than there are slots. What do others think?
          Hide
          Riccardo Boscolo added a comment -

          Guys, just a quick comment. Unless it has been modified recently, S3FileSystem's store will cache to $hadoop.tmp.dir/s3 the blocks read from S3 and remove them only at JVM shutdown. This is not a major issue if each map task is run under a new JVM but it might become an issue if the JVM is re-used. We often experience problems with disk usage when running an Hadoop client that reads a lot of data from S3 and we had to patch it to get around it.

          Show
          Riccardo Boscolo added a comment - Guys, just a quick comment. Unless it has been modified recently, S3FileSystem's store will cache to $hadoop.tmp.dir/s3 the blocks read from S3 and remove them only at JVM shutdown. This is not a major issue if each map task is run under a new JVM but it might become an issue if the JVM is re-used. We often experience problems with disk usage when running an Hadoop client that reads a lot of data from S3 and we had to patch it to get around it.
          Hide
          Arun C Murthy added a comment -

          Another corner case:

          TaskTracker.getTask should validate that the JVM identified by JVMId was really launched by the current JVMManager, currently it just checks the JobId. This probably means that the JVMId.id should be a function of the current timestamp to ensure it's unique across TaskTracker restarts (and hence it invalidates my previous comment about using a static counter in JVMId...).

          For the above reasons, I propose to keep the existing model but I am happy to discuss this out. Also, I think having launcher threads sets us up logically for the case where the TT queues up many more tasks than there are slots. What do others think?

          Actually to be clear, I'm not proposing to get rid of TaskRunner thread, just the TaskLauncher threads; this will ensure we do task-initialization on demand... but I'm happy to just throw this up and get feedback from others! Owen?

          Show
          Arun C Murthy added a comment - Another corner case: TaskTracker.getTask should validate that the JVM identified by JVMId was really launched by the current JVMManager, currently it just checks the JobId. This probably means that the JVMId.id should be a function of the current timestamp to ensure it's unique across TaskTracker restarts (and hence it invalidates my previous comment about using a static counter in JVMId...). For the above reasons, I propose to keep the existing model but I am happy to discuss this out. Also, I think having launcher threads sets us up logically for the case where the TT queues up many more tasks than there are slots. What do others think? Actually to be clear, I'm not proposing to get rid of TaskRunner thread, just the TaskLauncher threads; this will ensure we do task-initialization on demand... but I'm happy to just throw this up and get feedback from others! Owen?
          Hide
          Devaraj Das added a comment -

          Good point. I hadn't considered the tasktracker restart case.
          There are some other corner cases:
          1) I need to make sure that the JVM really gets to execute that task it was spawned for. This is required so that things like keep.failed.task.files works. Although I do disable jvm reuse for such cases, the thing is it might still be possible that the spawned JVM gets to run some other queued task. So to handle this, I need to pass the firstTaskId in the getTask RPC which will return the task if it finds it. This will guarantee that when jvm reuse is disabled the JVM executes that and only that task for which it was spawned.
          2) A JVM is spawned but the task which it was spawned for happened to be executed by some other JVM just before the new JVM was fully initialized. This new JVM would just stay in memory since the busy flag is set to true in the JvmRunner initialization. To handle this I need to introduce a JVM_INITIALIZING state to distinguish between busy and initializing JVMs (initializing JVMs are not killed when the TT looks for a JVM to purge).

          Regarding the task initialization, it happens on demand even in the patch. It's just that the RPCs are kept away from task init/cleanup. Owen?

          Show
          Devaraj Das added a comment - Good point. I hadn't considered the tasktracker restart case. There are some other corner cases: 1) I need to make sure that the JVM really gets to execute that task it was spawned for. This is required so that things like keep.failed.task.files works. Although I do disable jvm reuse for such cases, the thing is it might still be possible that the spawned JVM gets to run some other queued task. So to handle this, I need to pass the firstTaskId in the getTask RPC which will return the task if it finds it. This will guarantee that when jvm reuse is disabled the JVM executes that and only that task for which it was spawned. 2) A JVM is spawned but the task which it was spawned for happened to be executed by some other JVM just before the new JVM was fully initialized. This new JVM would just stay in memory since the busy flag is set to true in the JvmRunner initialization. To handle this I need to introduce a JVM_INITIALIZING state to distinguish between busy and initializing JVMs (initializing JVMs are not killed when the TT looks for a JVM to purge). Regarding the task initialization, it happens on demand even in the patch. It's just that the RPCs are kept away from task init/cleanup. Owen?
          Hide
          Devaraj Das added a comment -

          Ignore my second comment on jvm initialization. I handled it a bit differently with the busy flag itself.

          Show
          Devaraj Das added a comment - Ignore my second comment on jvm initialization. I handled it a bit differently with the busy flag itself.
          Hide
          Devaraj Das added a comment -

          Attached is a patch with all the review comments incorporated (Arun's and Amareshwari's). This patch might have some findbugs warnings (showed up when test-patch was run locally) but it seems unrelated to my patch.

          Show
          Devaraj Das added a comment - Attached is a patch with all the review comments incorporated (Arun's and Amareshwari's). This patch might have some findbugs warnings (showed up when test-patch was run locally) but it seems unrelated to my patch.
          Hide
          Devaraj Das added a comment -

          Pushing through hudson

          Show
          Devaraj Das added a comment - Pushing through hudson
          Hide
          Arun C Murthy added a comment -

          This is looking good. +1.

          If we can get the output of 'ant test-patch' this is good to go! smile

          Show
          Arun C Murthy added a comment - This is looking good. +1. If we can get the output of 'ant test-patch' this is good to go! smile
          Hide
          Devaraj Das added a comment -

          Attached is the output of test-patch

          [exec]
          [exec] -1 overall.
          [exec]
          [exec] +1 @author. The patch does not contain any @author tags.
          [exec]
          [exec] +1 tests included. The patch appears to include 3 new
          or modified tests.
          [exec]
          [exec] +1 javadoc. The javadoc tool did not generate any
          warning messages.
          [exec]
          [exec] +1 javac. The applied patch does not increase the total
          number of javac compiler warnings.
          [exec]
          [exec] -1 findbugs. The patch appears to introduce 1 new
          Findbugs warnings.
          [exec]
          [exec]
          [exec]

          The findbugs warning is due to the System.exit call I make in the Child.java. The calls are required.
          The attached patch fixes a javadoc warning and failures in two testcases. It passes all core/contrib tests on my local machine.

          Show
          Devaraj Das added a comment - Attached is the output of test-patch [exec] [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] +1 tests included. The patch appears to include 3 new or modified tests. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] -1 findbugs. The patch appears to introduce 1 new Findbugs warnings. [exec] [exec] [exec] The findbugs warning is due to the System.exit call I make in the Child.java. The calls are required. The attached patch fixes a javadoc warning and failures in two testcases. It passes all core/contrib tests on my local machine.
          Hide
          Arun C Murthy added a comment -

          I just committed this. Thanks, Devaraj!

          Show
          Arun C Murthy added a comment - I just committed this. Thanks, Devaraj!
          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Hadoop-trunk #611 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/611/ )
          Hide
          Robert Chansler added a comment -

          If this is 1 (the default), then JVMs are not reused (1 task per JVM). If it is -1, there is no limit to the number of tasks a JVM can run (of the same job). One can also specify some value greater than 1. Also a JobConf API has been added - setNumTasksToExecutePerJvm.

          Show
          Robert Chansler added a comment - If this is 1 (the default), then JVMs are not reused (1 task per JVM). If it is -1, there is no limit to the number of tasks a JVM can run (of the same job). One can also specify some value greater than 1. Also a JobConf API has been added - setNumTasksToExecutePerJvm.

            People

            • Assignee:
              Devaraj Das
              Reporter:
              Benjamin Reed
            • Votes:
              9 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development