|
[
Permlink
| « Hide
]
Doug Cutting added a comment - 25/May/06 11:58 PM
I have not yet had a chance to look closely at your patches, but these are clearly optimizations that we badly need. Thanks!
What happened to these patches?
I'm especially interested in the jvm reuse since we not only run large jobs but also a lot of small ones where the setup time really kills performance Most of these things except for the jvm reuse have been done in other patches. I've only left this bug open because of the jvm reuse issues, but the patch is currently useless because the code has changed so much in the last year. In fact, it would be relatively tricky to get right with the capturing of stdout/stderr from the tasks.
Wondering if we can have one jvm per job and have tasks as threads spawned from the job jvm?
No, threads within a single jvm has no major advantage over multiple jvms and has significant disadvantages. I feel that the best trade off is having each jvm running a single task at a time, but reusing the jvm for the following task.
In my use case, we'd like to load a significant amount of data into memory at the start of a task run. Sharing 10Gb of read-only data in a single JVM running on a 8-16 processor machine. This would allow for lower cost and more throughput, otherwise we are redundantly loading this significant amount of data and therefore limited by memory rather than CPUs.
My suggestion would be to provide the option and let the user choose based on their needs. If you're running "untrusted" processes on the cluster, you can run each task in an independent JVM. If you're looking to maximize throughput and memory and you are running trusted processes, you should be able to specify # of threads per Task. In my current situation, I'm only able to make use of 4 CPUs on 8 CPU machines because each task requires 10Gb to run, the majority of that being read-only data loaded from distributed cache. To be more clear, my suggestion would be to add parameters "mapred.map.tasks.threads" and "mapred.reduce.tasks.threads" and let people set the number of threads to spawn per task, and let them default to 1.
Reducing the number of JVMs spawned should be a high priority. It is a tremendous amount of overhead for low-cpu usage jobs.
Another issue is that the JVM is continuously optimizing your code, when you restart the VM you lose that information and it has to be reconstructed in the next run. CPU bound processes would definitely benefit from having the VM stay around for the length of the job.
Would this make sense, under some conditions (e.g. tasks code is trusted), to allow the tasks to run directly inside the task tracker jvm?
I actually had some scenario where the amount of memory available was critically low! A good discussion on the JVM reuse issue happened on HADOOP-3675. The links :
http://issues.apache.org/jira/browse/HADOOP-3675?focusedCommentId=12619448#action_12619448 After a discussion with Owen on the logs issue, it seems the approach where we write everything to three files (stdout/err/log) with an index file containing offsets per task should work. Thoughts?
> the approach where we write everything to three files (stdout/err/log) with an index file containing offsets per task should work.
Seems like a good idea. Would the Java process write the index file? If so, it would have to count bytes, but that should be doable. I'm not sure what the cause of the performance problem that led to the stream handling being done outside Java (
This has problems in some cases : processes (forked by streaming/pipes tasks) that live beyond the life time of the task process pollute the stdout/err/log files, in that they might continue to write to these files even after we mark the end of data from one task. In the current setup they inherit parent task's file descriptors and continue to write to the same files and hence pose no problem.
+1
This is a general problem with combining tasks into a single JVM and I don't believe there any solutions. Take for example, a Mapper that launches a thread that is not joined in the close method. The output will split across the tasks.
I'm confused. Pre > You would still need to guess where the boundary was.
I was thinking that you would call System.setOut() and System.setErr() for each new task, so that each stream was redirected to a new file. (Of course, if the task hasn't finished producing output, it will go into the wrong output file. But this is a general problem, as you point out.) But if the overhead for copying the process output to a file in Java is so high then we should do the stream handling outside Java and track offsets. I was thinking that you could use an OutputStream with ThreadLocal OutputStream within it, and use these with System.setOut() and System.setErr(). This would allow multiple threads to have their own output and error streams. Not sure how this would work with pipes though.
Will the System.setOut/setErr approach work for the case where the parent Java task spawns child processes (that do printf(), etc.) ? Will the spawned children also see those as their stdout/err?
No it doesn't work. It also doesn't work if you use jni code in your application that calls printf. When I was doing
If we implement hadoop 2560, then jvm resuse becomes a less urgent issue. I agree with Runping that HADOOP-2560 has more far reaching effects.
Sure, but re-using the jvm is the first step for HADOOP-2560. There is a lot of work to implement grouping of maps. In particular, you need to deal with task failures, changes to the shuffle and task event log, interactions with speculative execution and so on. On the other hand, to group mappers you need to reuse the jvm, otherwise you can't join their outputs together.
HADOOP-2560 does not need JVM re-use. Here is an early untested patch (the major missing part is the handling of the logs, and some cleanup needs to be done). This is up for an early review.
The major change here is the addition of a new class called JvmManager that does the bookkeeping of the JVMs spawned by the tasktracker. This has some fixes. Logging not yet done though.
The data-structures have been simplified in this patch. Also, this has been tested a fair amount.
The loadgen benchmark ran in the following way:
bin/hadoop jar hadoop-0.19.0-dev/hadoop-0.19.0-dev-test.jar loadgen \
-D test.randomtextwrite.bytes_per_map=$((100)) \
-D test.randomtextwrite.total_bytes=$((100*100000)) \
-D mapred.compress.map.output=false \
-r 1 \
-outKey org.apache.hadoop.io.Text \
-outValue org.apache.hadoop.io.Text \
-outFormat org.apache.hadoop.mapred.lib.NullOutputFormat \
-outdir fakeout
(that is 100K mappers each generating 100 bytes of data), run on 100 nodes with 4 mapper slots each, showed 50% improvement for the map-phase. Generating only 100 bytes, each mapper is essentially doing nothing. How long did the shuffling take? Yes, the aim of this benchmark was to see how large jobs with small tasks would be affected. But as discussed in this jira already, there are other cases where jvm reuse could prove useful. The shuffling in both cases took ~30 minutes.
Here is a patch that addresses the task logs issue. In short the patch has the following:
1) Addition of queues in the tasktracker for maps and reduces where a new task is queued up. When a slot becomes free a task is pulled from one of the relevant queues and initialized/launched 2) When a tasktracker reports with tasks in COMMIT_PENDING status, it could be given a new task. This new task would be queued up at the tasktracker and launched only when the task doing the commit finishes. 3) JvmManager is the class that keeps track of running JVMs and tasks running on the JVMs. When a JVM exits the appropriate task (if any was being run by the JVM) is failed. 4) New task state has been added for signifying INITIALIZED and ready to run. Till a task is initialized (like task locallization, distributed cache init, etc.,) a JVM is not given that. 5) The log handling is done this way:
Have tested large sort with this patch. The only thing remaining is the handling of the workDir for tasks (that has things like Distributed Cache symlinks). I am going to address that in the next patch very shortly. Would really appreciate a review on this one. I'm spent a fair bit of time on this, it's looking good - some comments:
As an aside: How about removing the {map|reduce}Launcher threads and getting TaskTracker.getTask to pull it off the queue, initialize task and then returning it to the Child? Thoughts? Thanks Arun for the review. Most of them can be addressed without much debate. However, I am not sure about the last comment, i.e., getTask RPC doing the task initialization. Currently, most of the initialization is done in a separate TaskRunner thread and TaskTracker.getTask just returns an initialized task. If we move the initialization to a separate thread, I am worried that if the task initialization takes a long time (since there is communication with the DFS involved as well), then the RPC handlers get blocked (depending on how many JVMs ask for tasks). Also, there is the issue of task cleanup that in my current patch is done in the same TaskRunner thread. That has to be done inline too (or in a separate thread launched when the task finishes) if we remove the taskrunner thread.
Also, in the current patch, the JVM is launched (if needed) in the same TaskRunner thread. We'd also need to think about where to launch the JVM in the first place so that it starts making the getTask RPCs. For the above reasons, I propose to keep the existing model but I am happy to discuss this out. Also, I think having launcher threads sets us up logically for the case where the TT queues up many more tasks than there are slots. What do others think? Guys, just a quick comment. Unless it has been modified recently, S3FileSystem's store will cache to $hadoop.tmp.dir/s3 the blocks read from S3 and remove them only at JVM shutdown. This is not a major issue if each map task is run under a new JVM but it might become an issue if the JVM is re-used. We often experience problems with disk usage when running an Hadoop client that reads a lot of data from S3 and we had to patch it to get around it.
Another corner case:
TaskTracker.getTask should validate that the JVM identified by JVMId was really launched by the current JVMManager, currently it just checks the JobId. This probably means that the JVMId.id should be a function of the current timestamp to ensure it's unique across TaskTracker restarts (and hence it invalidates my previous comment about using a static counter in JVMId...).
Actually to be clear, I'm not proposing to get rid of TaskRunner thread, just the TaskLauncher threads; this will ensure we do task-initialization on demand... but I'm happy to just throw this up and get feedback from others! Owen? Good point. I hadn't considered the tasktracker restart case.
There are some other corner cases: 1) I need to make sure that the JVM really gets to execute that task it was spawned for. This is required so that things like keep.failed.task.files works. Although I do disable jvm reuse for such cases, the thing is it might still be possible that the spawned JVM gets to run some other queued task. So to handle this, I need to pass the firstTaskId in the getTask RPC which will return the task if it finds it. This will guarantee that when jvm reuse is disabled the JVM executes that and only that task for which it was spawned. 2) A JVM is spawned but the task which it was spawned for happened to be executed by some other JVM just before the new JVM was fully initialized. This new JVM would just stay in memory since the busy flag is set to true in the JvmRunner initialization. To handle this I need to introduce a JVM_INITIALIZING state to distinguish between busy and initializing JVMs (initializing JVMs are not killed when the TT looks for a JVM to purge). Regarding the task initialization, it happens on demand even in the patch. It's just that the RPCs are kept away from task init/cleanup. Owen? Ignore my second comment on jvm initialization. I handled it a bit differently with the busy flag itself.
Attached is a patch with all the review comments incorporated (Arun's and Amareshwari's). This patch might have some findbugs warnings (showed up when test-patch was run locally) but it seems unrelated to my patch.
This is looking good. +1.
If we can get the output of 'ant test-patch' this is good to go! smile Attached is the output of test-patch
[exec] The findbugs warning is due to the System.exit call I make in the Child.java. The calls are required. I just committed this. Thanks, Devaraj!
Integrated in Hadoop-trunk #611 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/611/
If this is 1 (the default), then JVMs are not reused (1 task per JVM). If it is -1, there is no limit to the number of tasks a JVM can run (of the same job). One can also specify some value greater than 1. Also a JobConf API has been added - setNumTasksToExecutePerJvm.
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||