Issue Details (XML | Word | Printable)

Key: HADOOP-968
Type: Improvement Improvement
Status: Closed Closed
Resolution: Fixed
Priority: Major Major
Assignee: Devaraj Das
Reporter: Owen O'Malley
Votes: 0
Watchers: 0
Operations

If you were logged in you would be able to see more operations.
Hadoop Common

Reduce shuffle and merge should be done a child JVM

Created: 01/Feb/07 06:09 PM   Updated: 08/Jul/09 04:52 PM
Return to search
Component/s: None
Affects Version/s: 0.10.1
Fix Version/s: 0.13.0

Time Tracking:
Not Specified

File Attachments:
  Size
Text File Licensed for inclusion in ASF works 968-reindent.patch 2007-04-16 10:47 PM Doug Cutting 79 kB
Text File Licensed for inclusion in ASF works 968-with-metrics-fix.new.patch 2007-04-17 07:23 PM Devaraj Das 80 kB
Text File Licensed for inclusion in ASF works 968-with-metrics-fix.patch 2007-04-17 06:29 AM Devaraj Das 80 kB
Text File Licensed for inclusion in ASF works 968.apr06.patch 2007-04-06 05:55 PM Devaraj Das 82 kB
Text File Licensed for inclusion in ASF works 968.apr10.patch 2007-04-10 06:05 AM Devaraj Das 79 kB
Text File Licensed for inclusion in ASF works 968.apr14.patch 2007-04-14 06:11 PM Devaraj Das 79 kB
Text File Licensed for inclusion in ASF works 968.apr14.patch 2007-04-14 01:04 PM Devaraj Das 79 kB
Text File Licensed for inclusion in ASF works 968.patch 2007-04-05 04:37 PM Devaraj Das 82 kB
Issue Links:
Reference
 

Resolution Date: 17/Apr/07 07:57 PM


 Description  « Hide
The Reduce's shuffle and initial merge is done in the TaskTracker's JVM. It would be better to have it run in the Task's child JVM. The advantages are:
1. The class path and environment would be set up correctly.
2. User code doesn't need to be loaded into the TaskTracker.
3. Lower memory usage and contention in the TaskTracker.

 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
Owen O'Malley made changes - 01/Feb/07 06:10 PM
Field Original Value New Value
Link This issue relates to HADOOP-964 [ HADOOP-964 ]
Doug Cutting made changes - 02/Mar/07 10:17 PM
Fix Version/s 0.12.0 [ 12312293 ]
Devaraj Das added a comment - 03/Apr/07 06:14 PM
The salient points of the design:
On the TaskTracker
1) The TaskTracker maintains the list of TaskCompletionEvents for a job. Whenever a ReduceTask is assigned to a TaskTracker it extracts the JobId out of that.
2) For that jobid it starts fetching MapTask completion events as long as any ReduceTask for that job is in the SHUFFLE phase (this ensures that the TaskTracker sees all MapTask lost events and keeps an updated cache of all events). When all the ReduceTasks for a given job have gone past the SHUFFLE phase, the TaskTracker does not fetch any more MapTask completion events until another ReduceTask gets assigned to it. If no other ReduceTask from the same job gets assigned to it, and the job completes, it clears the cache of TaskCompletionEvents.
3) The event-fetcher thread blocks on runningJobs object. Whenever the method addTaskToJob in TaskTracker adds a new Task to a job, it invokes runningJobs.notify(), so that the event-fetcher thread can unblock and continue.
4) The event-fetcher thread also goes through the runningJobs and immediately stops fetching events for those jobs that have been killed/failed.

On the TaskUmbilicalProtocol, ReduceTaskRunner & ReduceTask:
1) A new method - TaskCompletionEvent[] getSuccessMapCompleteEvents(String taskId, int fromIndex, int maxLocs) throws IOException; - has been added for enabling the ReduceTask to fetch TaskCompletionEvents cached at the TaskTracker. The semantics of this method are mirrored to the one in InterTrackerProtocol - getTaskCompletionEvents, except that in the umbilical protocol, we are interested in just the successful map events. Fetch failures are handled in the same way as is done today. Thus, most of the fetcher code in ReduceTaskRunner remains the same (the code now is part of ReduceTask in a new class called ReduceCopier, and the ReduceTaskRunner very closely matches to MapTaskRunner in terms of functionality/code).

Comments?


Devaraj Das added a comment - 05/Apr/07 04:37 PM
Attached patch for review. The unit tests run fine with the patch and I am in the process of running the sort benchmark.

Devaraj Das made changes - 05/Apr/07 04:37 PM
Attachment 968.patch [ 12355021 ]
Devaraj Das added a comment - 06/Apr/07 05:55 PM
A well-tested patch.

Devaraj Das made changes - 06/Apr/07 05:55 PM
Attachment 968.apr06.patch [ 12355093 ]
Devaraj Das added a comment - 10/Apr/07 06:05 AM
This is w.r.t the current (HADOOP-1218 made it go out of sync with the trunk), plus, it has some changes in TaskTracker.java.

Devaraj Das made changes - 10/Apr/07 06:05 AM
Attachment 968.apr10.patch [ 12355215 ]
Devaraj Das added a comment - 13/Apr/07 06:43 AM
This has the potential to quickly go stale since the patch touches many files in major ways. So would appreciate a quick review/commit on this one. Thanks.

Devaraj Das made changes - 13/Apr/07 06:43 AM
Fix Version/s 0.13.0 [ 12312348 ]
Status Open [ 1 ] Patch Available [ 10002 ]

Owen O'Malley added a comment - 13/Apr/07 09:16 PM
1. I notice that a lot of your iterators are not typed causing you to do casts of itr.next().
2. In many cases, the loop "for(Item item: itemSet){..}" is easier to read and more concise.
3. Maps should not be iterated through using:
for(Map.Entry<Key,Value> item: myMap) {...}
rather than:
Iterator itr = myMap.keySet().iterator();
while (itr.hasNext()) { Value value = myMap.get(itr.next()); ... }
4. It looks like each reduce from a job will cause its job's FetchState to be added to the list a multiple time, so it will fetch multiple times per a loop.
5. I'd remove the sleep from queryJobTracker and move it to the MapEventsFetcherThread's run loop.
6. The doFetch is badly named, since it doesn't actually do the fetch. It should be called findReduces or something.
7. The name of the parameter of the first parameter in TaskUmbilicalProtocol.getMapCompletionEvents is "taskid", but if fact it is a job id.
8. The MapEventsFetcherThread's name doesn't need to include the task in the normal case, but I guess for unit tests it might be useful.
9. I assume that the shuffle code in ReduceTask matches the old code in ReduceTaskRunner. smile

Owen O'Malley made changes - 13/Apr/07 09:16 PM
Status Patch Available [ 10002 ] Open [ 1 ]
Owen O'Malley added a comment - 13/Apr/07 09:30 PM
Point 3 should be "Maps SHOULD BE iterated through using". Sorry for any confusion.

Devaraj Das added a comment - 14/Apr/07 01:04 PM
Thanks for the review, Owen. Some comments below.

> 1. I notice that a lot of your iterators are not typed causing you to do casts of itr.next
> 2. In many cases, the loop "for(Item item: itemSet){..}" is easier to read and more
> concise.
> 3. Maps should not be iterated through using:
> for(Map.Entry<Key,Value> item: myMap) {...}

Done (old habits die hard smile).

> 4. It looks like each reduce from a job will cause its job's FetchState to be added to
> the list a multiple time, so it will fetch multiple times per a loop.
No change. There is already a "break" statement in the loop as soon as one FetchState gets added.

> 5. I'd remove the sleep from queryJobTracker and move it to the
> MapEventsFetcherThread's run loop.
Done

> 6. The doFetch is badly named, since it doesn't actually do the fetch. It should be
> called findReduces or something.
Changed that to reducesInShuffle

> 7. The name of the parameter of the first parameter in
> TaskUmbilicalProtocol.getMapCompletionEvents is "taskid", but if fact it is a job id.
Made the name change in TaskUmbilicalProtocol.java

> 8. The MapEventsFetcherThread's name doesn't need to include the task in the
> normal case, but I guess for unit tests it might be useful.
No change

> 9. I assume that the shuffle code in ReduceTask matches the old code in
> ReduceTaskRunner. smile
Smile yes the only change that has been introduced to take care of variable initializations (for example, the variable reduceTask's initialization is different).


Devaraj Das made changes - 14/Apr/07 01:04 PM
Attachment 968.apr14.patch [ 12355546 ]
Devaraj Das made changes - 14/Apr/07 01:24 PM
Status Open [ 1 ] Patch Available [ 10002 ]
Devaraj Das added a comment - 14/Apr/07 06:11 PM
This removes a redundant call to System.currentTimeMillis in the TaskTracker.

Devaraj Das made changes - 14/Apr/07 06:11 PM
Attachment 968.apr14.patch [ 12355549 ]
Hadoop QA added a comment - 15/Apr/07 06:27 AM
-1, build or testing failed

2 attempts failed to build and test the latest attachment http://issues.apache.org/jira/secure/attachment/12355549/968.apr14.patch against trunk revision r528230.

Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/50/testReport/
Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/50/console

Please note that this message is automatically generated and may represent a problem with the automation system and not the patch.


Doug Cutting added a comment - 16/Apr/07 10:47 PM
Here's a re-indented version of this patch.

Doug Cutting made changes - 16/Apr/07 10:47 PM
Attachment 968-reindent.patch [ 12355654 ]
Devaraj Das added a comment - 17/Apr/07 06:29 AM
This patch addresses an issue to do with metrics reporting wherein stopMonitoring was not called for the ReduceTask. This potentially could lead to hung ReduceTasks after they are finished since the task JVM might not be able to exit until the (non-daemon) monitoring thread goes away.

Devaraj Das made changes - 17/Apr/07 06:29 AM
Attachment 968-with-metrics-fix.patch [ 12355667 ]
Nigel Daley added a comment - 17/Apr/07 05:45 PM
+1. The latest patch passes my nightly suit of benchmarks. I have resubmitted it to the patch process.

David Bowen added a comment - 17/Apr/07 06:16 PM

Devaraj, please can you clarify what you fixed with regard to the lack of calls to stopMonitoring? I think you're right that it is a problem, but I would expect the fix to be to change the Timer constructor in o.a.h.metrics.spi.AbstractMetricsContext. Instead of the zero-arg constructor we should use the two arg constructor that takes a thread name and a boolean isDaemon. I didn't see this change in the patch.


Devaraj Das added a comment - 17/Apr/07 06:31 PM
I am pasting the relevant block of code from the latest patch (methodname :TaskTracker.java::Child::main()). Basically, I invoke the close() method of metricsContext for the context "mapred" just before the point where the child task closes the log manager (LogManager.shutdown()) and it is about to die. I thought this was the best way to stop the monitoring and let the task exit nicely without having to touch other parts of the metrics code.

} finally {
+ MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+ metricsContext.close();
// Shutting down log4j of the child-vm...
// This assumes that on return from Task.run()
// there is no more logging done.

Makes sense?


Doug Cutting added a comment - 17/Apr/07 06:55 PM
I think David's point was that it would be best to fix the metrics code so that its thread is a daemon thread, rather than to expect client code to explicitly stop that thread.

Devaraj Das added a comment - 17/Apr/07 07:23 PM
Ok, changed the Timer constructor to the two-argument one. Also, retained the metricsContext.close() call that I had in the last patch. In general, I think that it is a good idea to call close() on an object if the close() method is clearly documented. Does this seem all right?

Devaraj Das made changes - 17/Apr/07 07:23 PM
Attachment 968-with-metrics-fix.new.patch [ 12355713 ]
Repository Revision Date User Message
ASF #529742 Tue Apr 17 19:57:34 UTC 2007 cutting HADOOP-968. Move shuffle and sort code to run in child JVM, rather than in TaskTracker. Contributed by Devaraj.
Files Changed
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/spi/AbstractMetricsContext.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
MODIFY /lucene/hadoop/trunk/CHANGES.txt

Doug Cutting added a comment - 17/Apr/07 07:57 PM
I just committed this. Thanks, Devaraj!

Doug Cutting made changes - 17/Apr/07 07:57 PM
Status Patch Available [ 10002 ] Resolved [ 5 ]
Resolution Fixed [ 1 ]
Hadoop QA added a comment - 17/Apr/07 09:44 PM
-1, could not apply patch.

The patch command could not apply the latest attachment http://issues.apache.org/jira/secure/attachment/12355713/968-with-metrics-fix.new.patch as a patch to trunk revision r529763.

Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/58/console

Please note that this message is automatically generated and may represent a problem with the automation system and not the patch.


Hadoop QA added a comment - 18/Apr/07 11:04 AM

Doug Cutting made changes - 08/Jun/07 08:40 PM
Status Resolved [ 5 ] Closed [ 6 ]
Owen O'Malley made changes - 08/Jul/09 04:52 PM
Component/s mapred [ 12310690 ]