Issue Details (XML | Word | Printable)

Key: HADOOP-249
Type: Improvement Improvement
Status: Closed Closed
Resolution: Fixed
Priority: Major Major
Assignee: Devaraj Das
Reporter: Benjamin Reed
Votes: 9
Watchers: 13
Operations

If you were logged in you would be able to see more operations.
Hadoop Common

Improving Map -> Reduce performance and Task JVM reuse

Created: 24/May/06 07:50 AM   Updated: 08/Jul/09 04:51 PM
Return to search
Component/s: None
Affects Version/s: 0.3.0
Fix Version/s: 0.19.0

Time Tracking:
Not Specified

File Attachments:
  Size
Text File Licensed for inclusion in ASF works 249-3.patch 2008-08-27 02:16 PM Devaraj Das 30 kB
Text File Licensed for inclusion in ASF works 249-after-review.patch 2008-09-18 07:29 PM Devaraj Das 83 kB
Text File Licensed for inclusion in ASF works 249-final.patch 2008-09-19 07:08 AM Devaraj Das 86 kB
Text File Licensed for inclusion in ASF works 249-with-jvmID.patch 2008-09-14 02:40 PM Devaraj Das 54 kB
Text File Licensed for inclusion in ASF works 249.1.patch 2008-08-17 07:52 PM Devaraj Das 25 kB
Text File Licensed for inclusion in ASF works 249.2.patch 2008-08-19 01:23 PM Devaraj Das 26 kB
Text File Licensed for inclusion in ASF works disk_zoom.patch 2006-05-24 07:52 AM Benjamin Reed 19 kB
Text File Licensed for inclusion in ASF works task_zoom.patch 2006-05-24 07:52 AM Benjamin Reed 23 kB
Image Attachments:

1. image001.png
(12 kB)
Issue Links:
Blocker
 
Incorporates
 
Reference

Hadoop Flags: Reviewed
Release Note: Enabled task JVMs to be reused via the job config mapred.job.reuse.jvm.num.tasks.
Resolution Date: 19/Sep/08 07:32 AM


 Description  « Hide
These patches are really just to make Hadoop start trotting. It is still at least an order of magnitude slower than it should be, but I think these patches are a good start.

I've created two patches for clarity. They are not independent, but could easily be made so.

The disk-zoom patch is a performance trifecta: less disk IO, less disk space, less CPU, and overall a tremendous improvement. The patch is based on the following observation: every piece of data from a map hits the disk once on the mapper, and 3 (+plus sorting) times on the reducer. Further, the entire input for the reduce step is sorted together maximizing the sort time. This patch causes:

1) the mapper to sort the relatively small fragments at the mapper which causes two hits to the disk, but they are smaller files.
2) the reducer copies the map output and may merge (if more than 100 outputs are present) with a couple of other outputs at copy time. No sorting is done since the map outputs are sorted.
3) the reducer will merge the map outputs on the fly in memory at reduce time.

I'm attaching the performance graph (with just the disk-zoom patch) to show the results. This benchmark uses a random input and null output to remove any DFS performance influences. The cluster of 49 machines I was running on had limited disk space, so I was only able to run to a certain size on unmodified Hadoop. With the patch we use 1/3 the amount of disk space.

The second patch allows the task tracker to reuse processes to avoid the over-head of starting the JVM. While JVM startup is relatively fast, restarting a Task causes disk IO and DFS operations that have a negative impact on the rest of the system. When a Task finishes, rather than exiting, it reads the next task to run from stdin. We still isolate the Task runtime from TaskTracker, but we only pay the startup penalty once.

This second patch also fixes two performance issues not related to JVM reuse. (The reuse just makes the problems glaring.) First, the JobTracker counts all jobs not just the running jobs to decide the load on a tracker. Second, the TaskTracker should really ask for a new Task as soon as one finishes rather than wait the 10 secs.

I've been benchmarking the code alot, but I don't have access to a really good cluster to try the code out on, so please treat it as experimental. I would love to feedback.

There is another obvious thing to change: ReduceTasks should start after the first batch of MapTasks complete, so that 1) they have something to do, and 2) they are running on the fastest machines.



 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
Doug Cutting added a comment - 25/May/06 11:58 PM
I have not yet had a chance to look closely at your patches, but these are clearly optimizations that we badly need. Thanks!

Johan Oskarsson added a comment - 25/May/07 05:01 PM
What happened to these patches?

I'm especially interested in the jvm reuse since we not only run large jobs but also a lot of small ones where the setup time really kills performance


Owen O'Malley added a comment - 25/May/07 05:07 PM
Most of these things except for the jvm reuse have been done in other patches. I've only left this bug open because of the jvm reuse issues, but the patch is currently useless because the code has changed so much in the last year. In fact, it would be relatively tricky to get right with the capturing of stdout/stderr from the tasks.

Amar Kamat added a comment - 02/Mar/08 06:04 AM
Wondering if we can have one jvm per job and have tasks as threads spawned from the job jvm?

Owen O'Malley added a comment - 02/Mar/08 08:28 AM
No, threads within a single jvm has no major advantage over multiple jvms and has significant disadvantages. I feel that the best trade off is having each jvm running a single task at a time, but reusing the jvm for the following task.

Holden Robbins added a comment - 02/Mar/08 09:28 PM
In my use case, we'd like to load a significant amount of data into memory at the start of a task run. Sharing 10Gb of read-only data in a single JVM running on a 8-16 processor machine. This would allow for lower cost and more throughput, otherwise we are redundantly loading this significant amount of data and therefore limited by memory rather than CPUs.

My suggestion would be to provide the option and let the user choose based on their needs. If you're running "untrusted" processes on the cluster, you can run each task in an independent JVM. If you're looking to maximize throughput and memory and you are running trusted processes, you should be able to specify # of threads per Task.

In my current situation, I'm only able to make use of 4 CPUs on 8 CPU machines because each task requires 10Gb to run, the majority of that being read-only data loaded from distributed cache.


Holden Robbins added a comment - 02/Mar/08 09:35 PM
To be more clear, my suggestion would be to add parameters "mapred.map.tasks.threads" and "mapred.reduce.tasks.threads" and let people set the number of threads to spawn per task, and let them default to 1.

Sam Pullara added a comment - 21/Apr/08 05:24 PM
Reducing the number of JVMs spawned should be a high priority. It is a tremendous amount of overhead for low-cpu usage jobs.

Sam Pullara added a comment - 21/Apr/08 07:41 PM
Another issue is that the JVM is continuously optimizing your code, when you restart the VM you lose that information and it has to be reconstructed in the next run. CPU bound processes would definitely benefit from having the VM stay around for the length of the job.

Christophe Taton added a comment - 21/Apr/08 08:06 PM
Would this make sense, under some conditions (e.g. tasks code is trusted), to allow the tasks to run directly inside the task tracker jvm?
I actually had some scenario where the amount of memory available was critically low!

Devaraj Das added a comment - 05/Aug/08 06:24 AM

Mahadev konar added a comment - 05/Aug/08 04:46 PM
sorry it was mistake...

Devaraj Das added a comment - 06/Aug/08 05:57 PM
After a discussion with Owen on the logs issue, it seems the approach where we write everything to three files (stdout/err/log) with an index file containing offsets per task should work. Thoughts?

Tom White added a comment - 08/Aug/08 03:08 PM
> the approach where we write everything to three files (stdout/err/log) with an index file containing offsets per task should work.

Seems like a good idea. Would the Java process write the index file? If so, it would have to count bytes, but that should be doable.

I'm not sure what the cause of the performance problem that led to the stream handling being done outside Java (HADOOP-1553), but is it worth revisiting to see if a Java approach is as fast? It would certainly be simpler.


Vinod K V added a comment - 11/Aug/08 11:32 AM

the approach where we write everything to three files (stdout/err/log) with an index file containing offsets per task should work.

This has problems in some cases : processes (forked by streaming/pipes tasks) that live beyond the life time of the task process pollute the stdout/err/log files, in that they might continue to write to these files even after we mark the end of data from one task. In the current setup they inherit parent task's file descriptors and continue to write to the same files and hence pose no problem.

I'm not sure what the cause of the performance problem that led to the stream handling being done outside Java (HADOOP-1553), but is it worth revisiting to see if a Java approach is as fast? It would certainly be simpler.

+1


Owen O'Malley added a comment - 11/Aug/08 03:56 PM

processes (forked by streaming/pipes tasks) that live beyond the life time of the task process pollute the stdout/err/log files

This is a general problem with combining tasks into a single JVM and I don't believe there any solutions. Take for example, a Mapper that launches a thread that is not joined in the close method. The output will split across the tasks.

I'm not sure what the cause of the performance problem that led to the stream handling being done outside Java (HADOOP-1553), but is it worth revisiting to see if a Java approach is as fast? It would certainly be simpler.

I'm confused. Pre HADOOP-1553, the TaskTracker copied the task's stdout and stderr "by hand" to a file. Since you only get a new pair of streams when you start a new JVM, backing out HADOOP-1553 wouldn't help. You would still need to guess where the boundary was.


Tom White added a comment - 13/Aug/08 02:48 PM
> You would still need to guess where the boundary was.

I was thinking that you would call System.setOut() and System.setErr() for each new task, so that each stream was redirected to a new file. (Of course, if the task hasn't finished producing output, it will go into the wrong output file. But this is a general problem, as you point out.) But if the overhead for copying the process output to a file in Java is so high then we should do the stream handling outside Java and track offsets.


Craig Macdonald added a comment - 14/Aug/08 10:01 AM
I was thinking that you could use an OutputStream with ThreadLocal OutputStream within it, and use these with System.setOut() and System.setErr(). This would allow multiple threads to have their own output and error streams. Not sure how this would work with pipes though.

Devaraj Das added a comment - 14/Aug/08 02:48 PM
Will the System.setOut/setErr approach work for the case where the parent Java task spawns child processes (that do printf(), etc.) ? Will the spawned children also see those as their stdout/err?

Owen O'Malley added a comment - 14/Aug/08 03:33 PM
No it doesn't work. It also doesn't work if you use jni code in your application that calls printf. When I was doing HADOOP-1553, I did consider that possibility, but there were too many cases that it does not handle. In the case of this patch, it would still not work with threads that weren't finished when the task switch was done. They would be counted as output from the new task instead of the old one.

Runping Qi added a comment - 14/Aug/08 08:54 PM

If we implement hadoop 2560, then jvm resuse becomes a less urgent issue.


Devaraj Das added a comment - 15/Aug/08 06:11 AM
I agree with Runping that HADOOP-2560 has more far reaching effects.

Owen O'Malley added a comment - 15/Aug/08 02:46 PM
Sure, but re-using the jvm is the first step for HADOOP-2560. There is a lot of work to implement grouping of maps. In particular, you need to deal with task failures, changes to the shuffle and task event log, interactions with speculative execution and so on. On the other hand, to group mappers you need to reuse the jvm, otherwise you can't join their outputs together.

Runping Qi added a comment - 15/Aug/08 04:59 PM

HADOOP-2560 does not need JVM re-use.
It just needs mapper tasks can work with multiple splits.
Everything else reminds the same.


Devaraj Das added a comment - 17/Aug/08 07:52 PM
Here is an early untested patch (the major missing part is the handling of the logs, and some cleanup needs to be done). This is up for an early review.

The major change here is the addition of a new class called JvmManager that does the bookkeeping of the JVMs spawned by the tasktracker.


Devaraj Das added a comment - 19/Aug/08 01:23 PM
This has some fixes. Logging not yet done though.

Devaraj Das added a comment - 27/Aug/08 02:16 PM
The data-structures have been simplified in this patch. Also, this has been tested a fair amount.

Devaraj Das added a comment - 27/Aug/08 03:14 PM
The loadgen benchmark ran in the following way:
bin/hadoop jar hadoop-0.19.0-dev/hadoop-0.19.0-dev-test.jar loadgen \
   -D test.randomtextwrite.bytes_per_map=$((100)) \
   -D test.randomtextwrite.total_bytes=$((100*100000)) \
   -D mapred.compress.map.output=false \
   -r 1 \
   -outKey org.apache.hadoop.io.Text \
   -outValue org.apache.hadoop.io.Text \
   -outFormat org.apache.hadoop.mapred.lib.NullOutputFormat \
   -outdir fakeout

(that is 100K mappers each generating 100 bytes of data), run on 100 nodes with 4 mapper slots each, showed 50% improvement for the map-phase.


Runping Qi added a comment - 27/Aug/08 05:52 PM

Generating only 100 bytes, each mapper is essentially doing nothing.
So you are basically testing the costs of jvm launches and task assignment.
Thus, I would expect much better improvement (say 10x or more).
With 2x improvement, it implies that the cost for task assignment is significant, and in your
case, was a bottleneck.

How long did the shuffling take?


Devaraj Das added a comment - 28/Aug/08 05:03 AM
Yes, the aim of this benchmark was to see how large jobs with small tasks would be affected. But as discussed in this jira already, there are other cases where jvm reuse could prove useful. The shuffling in both cases took ~30 minutes.

Devaraj Das added a comment - 14/Sep/08 02:40 PM
Here is a patch that addresses the task logs issue. In short the patch has the following:
1) Addition of queues in the tasktracker for maps and reduces where a new task is queued up. When a slot becomes free a task is pulled from one of the relevant queues and initialized/launched
2) When a tasktracker reports with tasks in COMMIT_PENDING status, it could be given a new task. This new task would be queued up at the tasktracker and launched only when the task doing the commit finishes.
3) JvmManager is the class that keeps track of running JVMs and tasks running on the JVMs. When a JVM exits the appropriate task (if any was being run by the JVM) is failed.
4) New task state has been added for signifying INITIALIZED and ready to run. Till a task is initialized (like task locallization, distributed cache init, etc.,) a JVM is not given that.
5) The log handling is done this way:
  • I have a small indirection file, which I call index.log, in each attempt dir. Let's say that the JVM for attempt_1_1
    also runs attempt_foo_bar. The index.log file in the directory attempt_foo_bar would look like:
    LOG_DIR: attempt_1_1
    STDOUT: <start of stdout for attempt_foo_bar in the stdout file in attempt_1_1 directory> <length>
    SYSLOG: <similar to above>
    STDERR: <similar to above>
  • I create this log.index file at task startup, and have a thread to continously update the lengths of the log files
    so that one can see the logs of a running task.
  • I modified tools like TaskLog.Reader to go through this indirection when someone asks for the log files.
    6) A new class JVMId has been created to create/identify JVM IDs instead of plain integers. (Arun had requested for this offline)

Have tested large sort with this patch.

The only thing remaining is the handling of the workDir for tasks (that has things like Distributed Cache symlinks). I am going to address that in the next patch very shortly.

Would really appreciate a review on this one.


Arun C Murthy added a comment - 15/Sep/08 08:47 AM
I'm spent a fair bit of time on this, it's looking good - some comments:
  1. JvmManager.getJvmManagerInstance isn't thread-safe.
  2. JvmManagerForType.{taskFinished|taskKilled|killJvm} - silently errors are ignored i.e. all the != null checks, at least log them?
  3. JvmManagerForType.reapJvm
    • Minor: Iteration could be cleaner
    • What if numJvmsSpawned >= maxJvms and all are busy and belong to different jobs? i.e should we at least log that?
  4. JVMId should probably have a static counter for the 'id' rather than use hashCode().
  5. TaskRunner.exitCode should probably be initialized to -1, definitely not 0.
  6. TaskTracker:
    • Can we make {map|reduce}TasksToLaunch internal to {map|reduce}Launcher? Ditto for numFree{Map|Reduce}Slots? Basically make addToTaskQueue and addFreeSlot methods on the TaskLauncher.
      #* Sanity checks in addFree{Map|Reduce}Slot ?
    • TaskTracker.initialize should reset numFree{Map|Reduce}Slots to handle the case when the TaskTracker bounces?
  7. Minor: TaskTracker.Child - Good time to move it to a different file?

As an aside: How about removing the {map|reduce}Launcher threads and getting TaskTracker.getTask to pull it off the queue, initialize task and then returning it to the Child? Thoughts?


Devaraj Das added a comment - 15/Sep/08 01:08 PM
Thanks Arun for the review. Most of them can be addressed without much debate. However, I am not sure about the last comment, i.e., getTask RPC doing the task initialization. Currently, most of the initialization is done in a separate TaskRunner thread and TaskTracker.getTask just returns an initialized task. If we move the initialization to a separate thread, I am worried that if the task initialization takes a long time (since there is communication with the DFS involved as well), then the RPC handlers get blocked (depending on how many JVMs ask for tasks). Also, there is the issue of task cleanup that in my current patch is done in the same TaskRunner thread. That has to be done inline too (or in a separate thread launched when the task finishes) if we remove the taskrunner thread.
Also, in the current patch, the JVM is launched (if needed) in the same TaskRunner thread. We'd also need to think about where to launch the JVM in the first place so that it starts making the getTask RPCs.
For the above reasons, I propose to keep the existing model but I am happy to discuss this out. Also, I think having launcher threads sets us up logically for the case where the TT queues up many more tasks than there are slots. What do others think?

Riccardo Boscolo added a comment - 15/Sep/08 05:54 PM
Guys, just a quick comment. Unless it has been modified recently, S3FileSystem's store will cache to $hadoop.tmp.dir/s3 the blocks read from S3 and remove them only at JVM shutdown. This is not a major issue if each map task is run under a new JVM but it might become an issue if the JVM is re-used. We often experience problems with disk usage when running an Hadoop client that reads a lot of data from S3 and we had to patch it to get around it.

Arun C Murthy added a comment - 15/Sep/08 07:07 PM
Another corner case:

TaskTracker.getTask should validate that the JVM identified by JVMId was really launched by the current JVMManager, currently it just checks the JobId. This probably means that the JVMId.id should be a function of the current timestamp to ensure it's unique across TaskTracker restarts (and hence it invalidates my previous comment about using a static counter in JVMId...).

For the above reasons, I propose to keep the existing model but I am happy to discuss this out. Also, I think having launcher threads sets us up logically for the case where the TT queues up many more tasks than there are slots. What do others think?

Actually to be clear, I'm not proposing to get rid of TaskRunner thread, just the TaskLauncher threads; this will ensure we do task-initialization on demand... but I'm happy to just throw this up and get feedback from others! Owen?


Devaraj Das added a comment - 15/Sep/08 07:28 PM
Good point. I hadn't considered the tasktracker restart case.
There are some other corner cases:
1) I need to make sure that the JVM really gets to execute that task it was spawned for. This is required so that things like keep.failed.task.files works. Although I do disable jvm reuse for such cases, the thing is it might still be possible that the spawned JVM gets to run some other queued task. So to handle this, I need to pass the firstTaskId in the getTask RPC which will return the task if it finds it. This will guarantee that when jvm reuse is disabled the JVM executes that and only that task for which it was spawned.
2) A JVM is spawned but the task which it was spawned for happened to be executed by some other JVM just before the new JVM was fully initialized. This new JVM would just stay in memory since the busy flag is set to true in the JvmRunner initialization. To handle this I need to introduce a JVM_INITIALIZING state to distinguish between busy and initializing JVMs (initializing JVMs are not killed when the TT looks for a JVM to purge).

Regarding the task initialization, it happens on demand even in the patch. It's just that the RPCs are kept away from task init/cleanup. Owen?


Devaraj Das added a comment - 15/Sep/08 07:44 PM
Ignore my second comment on jvm initialization. I handled it a bit differently with the busy flag itself.

Devaraj Das added a comment - 18/Sep/08 07:29 PM
Attached is a patch with all the review comments incorporated (Arun's and Amareshwari's). This patch might have some findbugs warnings (showed up when test-patch was run locally) but it seems unrelated to my patch.

Devaraj Das added a comment - 18/Sep/08 07:30 PM
Pushing through hudson

Arun C Murthy added a comment - 19/Sep/08 05:54 AM
This is looking good. +1.

If we can get the output of 'ant test-patch' this is good to go! smile


Devaraj Das added a comment - 19/Sep/08 07:08 AM
Attached is the output of test-patch

[exec]
[exec] -1 overall.
[exec]
[exec] +1 @author. The patch does not contain any @author tags.
[exec]
[exec] +1 tests included. The patch appears to include 3 new
or modified tests.
[exec]
[exec] +1 javadoc. The javadoc tool did not generate any
warning messages.
[exec]
[exec] +1 javac. The applied patch does not increase the total
number of javac compiler warnings.
[exec]
[exec] -1 findbugs. The patch appears to introduce 1 new
Findbugs warnings.
[exec]
[exec]
[exec]

The findbugs warning is due to the System.exit call I make in the Child.java. The calls are required.
The attached patch fixes a javadoc warning and failures in two testcases. It passes all core/contrib tests on my local machine.


Arun C Murthy added a comment - 19/Sep/08 07:32 AM
I just committed this. Thanks, Devaraj!

Hudson added a comment - 22/Sep/08 03:18 PM

Robert Chansler added a comment - 22/Oct/08 12:01 AM
If this is 1 (the default), then JVMs are not reused (1 task per JVM). If it is -1, there is no limit to the number of tasks a JVM can run (of the same job). One can also specify some value greater than 1. Also a JobConf API has been added - setNumTasksToExecutePerJvm.