Issue Details (XML | Word | Printable)

Key: HADOOP-76
Type: Improvement Improvement
Status: Closed Closed
Resolution: Fixed
Priority: Minor Minor
Assignee: Sanjay Dahiya
Reporter: Doug Cutting
Votes: 1
Watchers: 2
Operations

If you were logged in you would be able to see more operations.
Hadoop Common

Implement speculative re-execution of reduces

Created: 11/Mar/06 06:57 AM   Updated: 08/Jul/09 04:51 PM
Return to search
Component/s: None
Affects Version/s: 0.1.0
Fix Version/s: 0.9.0

Time Tracking:
Not Specified

File Attachments:
  Size
Text File Licensed for inclusion in ASF works Hadoop-76.patch 2006-10-19 05:41 PM Sanjay Dahiya 9 kB
Text File Licensed for inclusion in ASF works Hadoop-76_1.patch 2006-11-02 11:46 AM Sanjay Dahiya 26 kB
Text File Licensed for inclusion in ASF works Hadoop-76_2.patch 2006-11-10 02:53 PM Sanjay Dahiya 30 kB
Text File Licensed for inclusion in ASF works Hadoop-76_3.patch 2006-11-13 12:20 PM Sanjay Dahiya 30 kB
Text File Licensed for inclusion in ASF works Hadoop-76_4.patch 2006-11-14 09:39 AM Sanjay Dahiya 30 kB
Text File Licensed for inclusion in ASF works Hadoop-76_5.patch 2006-11-15 02:09 PM Sanjay Dahiya 30 kB
Text File Licensed for inclusion in ASF works Hadoop-76_6.patch 2006-11-20 08:40 PM Sanjay Dahiya 28 kB
Text File Licensed for inclusion in ASF works spec_reducev.patch 2006-06-30 10:32 PM Johan Oskarsson 12 kB
Issue Links:
Dependants
 
Duplicate
 

Resolution Date: 21/Nov/06 08:40 PM


 Description  « Hide
As a first step, reduce task outputs should go to temporary files which are renamed when the task completes.

 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
Doug Cutting made changes - 22/Apr/06 12:12 AM
Field Original Value New Value
Fix Version/s 0.3 [ 12310930 ]
Fix Version/s 0.2 [ 12310813 ]
Sameer Paranjpye made changes - 31/May/06 01:56 AM
Summary reduce output should first be written to a temporary file name, then moved into place Implement speculative re-execution of reduces
Fix Version/s 0.4 [ 12311021 ]
Description This would permit speculative execution of reduce tasks. As a first step, reduce task outputs should go to temporary files which are renamed when the task completes.
Fix Version/s 0.3 [ 12310930 ]
Sameer Paranjpye made changes - 31/May/06 07:36 AM
Link This issue duplicates HADOOP-253 [ HADOOP-253 ]
Doug Cutting made changes - 06/Jun/06 06:16 AM
Workflow jira [ 12351917 ] no reopen closed [ 12372897 ]
Doug Cutting made changes - 07/Jun/06 04:37 AM
Workflow no reopen closed [ 12372897 ] no-reopen-closed [ 12373229 ]
Doug Cutting made changes - 29/Jun/06 04:11 AM
Fix Version/s 0.5.0 [ 12311939 ]
Fix Version/s 0.4.0 [ 12311021 ]
Johan Oskarsson added a comment - 30/Jun/06 10:32 PM
I've tried to implement speculative reduces and it seems to be working, however I'd like you to take a look at it since I'm not familiar with some of the inner workings of hadoop.

As suggested it writes output to a temporary name and the first one to finish moves it to the correct output name.
The patch adds a String tmpName to getRecordWriter in OutputFormatBase
and a close method. Basically the OutputFormatBase keeps track of the tmpName and the final name
once close is called it moves the tmp to the final.

This means the current output formats doesn't have to be changed.

This patch would ideally be complemented by a better tasktracker selection, I've seen instances where there's two final reduce tips and then a speculative reduce is assigned to the same node that is already running the other task.

A speculative reduce will be started if finishedReduces / numReduceTasks >= 0.7

That's about it, looking forward to hear your input


Johan Oskarsson made changes - 30/Jun/06 10:32 PM
Attachment spec_reducev.patch [ 12336206 ]
Johan Oskarsson added a comment - 07/Jul/06 10:35 PM
As one should've suspected there seems to be a problem with the patch.
It doesn't clean up after the tasks properly, but I do not currently have the time to look at this.
Still, feedback is welcome or if someone wants to use any of it in their own patch, go ahead.

Doug Cutting made changes - 03/Aug/06 05:46 PM
Workflow no-reopen-closed [ 12373229 ] no-reopen-closed, patch-avail [ 12377431 ]
Doug Cutting made changes - 04/Aug/06 08:05 PM
Fix Version/s 0.5.0 [ 12311939 ]
Fix Version/s 0.6.0 [ 12312025 ]
Doug Cutting made changes - 08/Sep/06 08:23 PM
Fix Version/s 0.6.0 [ 12312025 ]
Sanjay Dahiya made changes - 11/Oct/06 01:22 PM
Assignee Owen O'Malley [ owen.omalley ] Sanjay Dahiya [ sanjay.dahiya ]
Sanjay Dahiya made changes - 11/Oct/06 01:22 PM
Status Open [ 1 ] In Progress [ 3 ]
Sanjay Dahiya added a comment - 11/Oct/06 01:35 PM
Here is a list of code level changes, I will test this stuff meanwhile
  • Adding extra jobConf configuration - runSpeculativeReduces.
  • TaskInProgress maintains a list of nodes where it has already ran ( or is running ), this will be used to not schedule a speculative instance where the task is already running or has failed in past. [TIP already contains a list of nodes where it task failed ].
  • Another option is if any reduce task is already assigned to this TT and is still running, then its not assigned a speculative task. [comments?]
  • TIP.hasSpeculative task , now checks for reduce tasks as well. currently it checks for only map tasks. The exact condition(timeouts) in which reduce task should be executed speculatively is open for discussion. using johan's conditions(finishedReduces / numReduceTasks >= 0.7 ) for testing till then.
  • JobInProgress.findNewTask - looks for speculative tasks (TIP.hasSpeculativeTask()) and whether the task ran on same task tracker.
  • If speculative execution of reduce is enabled then ReduceTask.run() creates a temp file name for reduce output. When reduce task finishes it checks if the output file is already written by some other reduce instance else it renames its output to final output. otherwise temp output is deleted.
  • TaskTracker.TIP.cleanup() also cleans up the reduce task temp file if it is killed in between.
  • JobTracker.pollForTaskWithClosedJob(), TIP.shouldCloseForClosedJob() - return true if a speculative reduce task finished first, which ultimately goes down to TT and kills/cleans up the task.

The exact condition(timeouts) in which reduce task should be executed speculatively is open for discussion.

comments?


Owen O'Malley added a comment - 12/Oct/06 08:53 PM
I think it is better for now to just use JobConf.setSpeculativeExecution for both maps and reduces.

machinesWhereFailed is a list of machines where the task has failed. To find where it is currently running, you need to use recentTasks. Currently, the recentTasks is a set of task ids that are running. You should probably make it a map from task id to task tracker id.

Don't block speculative reduces based on other reduces running. That would make a perpetually busy cluster never run speculative reduces.

      • Please create a library that lets you create files off to the side and when you call commit moves them into place. Speculative reduces need it, but that functionality is useful other places, such as side effect-based maps. The class should also have an abort method that cleans up.

The defaults for the map speculative execution don't look too unreasonable, so just use them for now.


Sanjay Dahiya added a comment - 19/Oct/06 05:41 PM
This patch is up for review.

Here is the list of changes included in this patch -

Replaced recentTasks to a Map, added a new method in TaskInProgress hasRanOnMachine, which looks at this Map and hasFailedOnMachines(). This is used to avoid scheduling multiple reduce instances of same task on the same node.

Added a PhasedRecordWriter, which takes a RecordWriter, tempName, finalName. Another option was to create a PhasedOutputFormat, this seems more natural as it works with any existing OutputFormat and RecordWriter. Records are written to tempName and when commit is called they are moved to finalName.

ReduceTask.run() - if speculative execution is enabled then reduce output is written to a temp location using PhasedRecordWriter. After task finishes the output is written to a final location.
If some other speculative instance finishes first then TaskInProgress.shouldCloseForClosedJob() returns true for the taskId. On TaskTracker the task is killed by Process.destroy() so cleanup code is in TaskTracker instead of Task. The cleanup of Maps happen in Conf, which is probably misplaced. We could refactor this part for both Map and Reduce and move cleanup code to some utility classes which, given a Map and Reduce task track the files generated and cleanup if needed.

Added an extra attribute in TaskInProgress - runningSpeculative, to avoid running more than one speculative instances of ReduceTask. Too many Reduce instances for same task could increase load on Map machines, this needs discussion. I can revert this change back to allow some other number of instances of Reduces (MAX_TASK_FAILURES?).

comments


Sanjay Dahiya made changes - 19/Oct/06 05:41 PM
Attachment Hadoop-76.patch [ 12343282 ]
Devaraj Das added a comment - 26/Oct/06 06:44 PM
Regarding the PhasedRecordWriter class, I think this should provide the option of creating a file, write to them and then call commit or abort. The current implementation binds it to a RecordWriter, but this should cater to things like Sequence files also (for e.g., RandomWriter should be able to use this functionality).

Owen O'Malley added a comment - 27/Oct/06 12:06 AM
The PhasedRecordWriter won't handle all of the cases, because RecordWriters can write multiple files. Furthermore, they are user code and it would be better to minimize required changes to them.

A better approach would be to have a PhasedFileSystem that takes a base FileSystem and use that to commit/abort the changes. Then the framework could pass the PhasedFileSystem to the createRecordReader call and it would catch all of the files that the RecordWriter created. When the PhasedFileSystem gets a create call, it creates it in the base FileSystem with a mutated name. When the changes are commited, the files are all renamed. If the changes are aborted, the mutated filenames are deleted.


Sanjay Dahiya added a comment - 27/Oct/06 11:15 AM
PhasedFileSystem sounds good, I will work on this approach. thanks Devaraj, Owen for review.

Sanjay Dahiya added a comment - 27/Oct/06 12:53 PM
Looking at FileSystem implementation here is a concern

FileSystem is an abstract class, extended by Local/Distributed FileSystem. ideal way to have a PhasedFileSystem would be if FileSystem was an interface, we could implement it and take a FileSystem in the constructor of PhasedFIleSystem. some thing like -
PhasedFileSystem implements FileSystem{
public PhasedFileSystem(FileSysten fs);
// channel all methods to fs
// implement commit() / abort()
}
In this case we could use Phased functionality with both Local/Distributed FileSystem, still maintaining interface compatibility. Doing this with base abstract class will cause some dummy objects. as in next option

Next option is to have something like

PhasedFileSystem extends FileSystem{
private FileSystem fs ;
protected PhasedFileSystem(FileSysten fs) { super(fs.getConf()); // not used, just dummy for base class creation this.fs = fs ; }
protected PhasedFileSystem(conf){ super(conf); throw NotSupportedException; }

public static getNamed(...) { return new PhasedFileSystem(FileSystem.getNamed..) ; }
// All other methods channel calls to fs
commit() ;
abort();
}

Last option is to add extra methods to FileSystem itself, which will not be used in most cases but will be available in derived FileSystems. This doesnt sound too good.

Option 2 is something that will work well for us, even though its not a good design.

Comments?


Owen O'Malley added a comment - 28/Oct/06 03:19 AM
1. This patch doesn't apply cleanly anymore. The TaskTracker.java has a conflicting change.

2. I agree that it would be nice if FileSystem was an interface, but that is way out of scope for this bug.

You should derive PhasedFileSystem from FileSystem. It should have a "base" FileSystem that does the real work and a Map<String,String> that maps "final" filenames to "temporary" filenames.

PhasedFileSystem will need to implement the abstract "Raw" methods from FileSystem. The read operations, can just be sent directly to the base FileSystem. createRaw should create a temporary name, put it in the map and create the file using the base FileSystem. The other "modification" methods (renameRaw, deleteRaw) should throw UnsupportedOperationException.

To ensure that the files are cleaned up correctly, I'd suggest that the temporary files be stored under:

<system dir>/<job id>/<tip id>/<task id>/<unique file id>

(in dfs clearly) so that when the job is finished, we just need to delete the job directory to clean up any remains of failed tasks that didn't clean up properly.

there should also be:
public void commit() throws IOException {...}
public void abort() throws IOException {...}
public void close() throws IOException { abort(); }

For close, it should lock the tip directory, move the files into place, put a DONE touch file in the tip directory and delete the task id directory.


Owen O'Malley added a comment - 01/Nov/06 05:43 AM
Since the dfs locking doesn't really work reliably, I would drop the tip id part of the directory structure, don't do any locking, and if the file is already in place, just skip that file.

Sanjay Dahiya added a comment - 02/Nov/06 11:46 AM
Here is update patch for review

changes in this patch

  • Moved temp files to (mapred.system.dir)/<jobid>
  • 2 new variables in TIP -
  • runningTasks ( to track currently running instances of attempts for same task, earlier this was done using a boolean)
  • hasCompletedTask ( for reduces if a attempts succeeds then all subsequent failures for same TIP should be ignored, existing setup tries to schedule another task for that.)
  • Added a phaseFileSystem, which takes a jobid, tipid, taskid and creates all files in (mapred.system.dir)/<jobid>/<tipid>/<taskid>, these are moved to their final location on commit or deleted.
  • Task Constructor needs tip id now, its passed in RPC as well.
  • Task.localizeConfiguration adds tip id to conf.
  • Minor change in FSDirectory to add exception message in log if rename fails.

+Planning to add an example for using PhasedFileSystem - changing RandomWriter to use PhasedFileSystem, as Devaraj suggested in a separate patch on the same issue.

thanks owen for review.


Sanjay Dahiya made changes - 02/Nov/06 11:46 AM
Attachment Hadoop-76_1.patch [ 12344192 ]
Sanjay Dahiya added a comment - 03/Nov/06 05:40 PM
One important point that needs to be reviewed in this patch is extra checks for speculative reduce tasks in TIP. We may want to inherit some of this in spec maps too or structure it better. looking for comments on that.

Devaraj Das added a comment - 07/Nov/06 02:44 PM
1. Noticed incorrect Javadoc/comments in some places. Please go through the patch carefully and fix them.
2. The commit and abort APIs of PhasedFileSystem needs to invoke the baseFS's close() method.
3. The runningTasks variable seems redundant and can be removed, and the code updating/reading runningTasks can be changed to update/read recentTasks instead.

Devaraj Das added a comment - 07/Nov/06 03:24 PM
One more comment - need to document somewhere how exactly (what config params) a user of the PhasedFileSystem (a map method) can access the JobId, TaskId and TIPId when he wants to create an instance of PhasedFileSystem. Better yet, add a new constructor in PhasedFileSystem that takes a JobConf object. Inside that constructor, you can get the JobId, TaskId and TIPId values and proceed. The user doesn't have to bother about details in this case.

Doug Cutting added a comment - 08/Nov/06 08:56 PM
> it would be nice if FileSystem was an interface [ ...]

Sometimes that would be nice, and sometimes it would not be. The FileSystem API has evolved considerably without breaking back-compatibility. That's harder to do with an interface.


Sanjay Dahiya added a comment - 09/Nov/06 11:15 AM
Changes in this patch update

1. PhasedFileSystem has extra constructor that takes an JobConf
2. PhasedFileSystem now handles file overwrite flag properly.

3. commit/abort can be done for a single file or for all files created by this FS.
4. The method for commit is split in 2 function one is pvt with an extra boolean variable to avoid concurrent modification while committing in a iterator loop.
4. reduceProgress calculation is changed, earlier it was dividing the progress by reduces.length twice, once while update status and then while checking for spec tasks, which was incorrect.

Thanks Devaraj for the review.


Sanjay Dahiya made changes - 09/Nov/06 11:15 AM
Attachment Hadoop-76_2.patch [ 12344659 ]
Sanjay Dahiya made changes - 09/Nov/06 01:30 PM
Attachment Hadoop-76_2.patch [ 12344659 ]
Sanjay Dahiya added a comment - 09/Nov/06 01:31 PM
ops attached wrong file. updating.

Sanjay Dahiya made changes - 09/Nov/06 01:31 PM
Attachment Hadoop-76_2.patch [ 12344670 ]
Sanjay Dahiya made changes - 09/Nov/06 01:32 PM
Attachment Hadoop-76_2.patch [ 12344670 ]
Sanjay Dahiya made changes - 09/Nov/06 01:33 PM
Attachment Hadoop-76_2.patch [ 12344671 ]
Sanjay Dahiya made changes - 10/Nov/06 08:18 AM
Attachment Hadoop-76_2.patch [ 12344671 ]
Sanjay Dahiya made changes - 10/Nov/06 08:18 AM
Attachment Hadoop-76_2.patch [ 12344739 ]
Sanjay Dahiya made changes - 10/Nov/06 02:50 PM
Attachment Hadoop-76_2.patch [ 12344739 ]
Sanjay Dahiya added a comment - 10/Nov/06 02:53 PM
Updated patch, earlier patch was causing PiEstimator test to fail. It was closing fs before ReduceTask, and Piestimator reduce task's close opens a new file causing it to fail. It is fixed now. TextInputFormat test still fails but I guess thats due to HADOOP-696.

Sanjay Dahiya made changes - 10/Nov/06 02:53 PM
Attachment Hadoop-76_2.patch [ 12344758 ]
Sanjay Dahiya added a comment - 13/Nov/06 12:20 PM
The patch failed with Streaming test when run twice in succession, because output file created in first run still exists in second run and PhasedFileSystem was not handling an exception while rename.
This patch fixes the issue.

Sanjay Dahiya made changes - 13/Nov/06 12:20 PM
Attachment Hadoop-76_3.patch [ 12344875 ]
Sanjay Dahiya added a comment - 14/Nov/06 09:39 AM
New patch, fixed some java doc warnings.

Sanjay Dahiya made changes - 14/Nov/06 09:39 AM
Attachment Hadoop-76_4.patch [ 12344948 ]
Sanjay Dahiya made changes - 14/Nov/06 09:40 AM
Status In Progress [ 3 ] Patch Available [ 10002 ]
Sanjay Dahiya added a comment - 15/Nov/06 08:19 AM
Conflicts with latest trunk. will submit an updated patch

Sanjay Dahiya made changes - 15/Nov/06 08:19 AM
Status Patch Available [ 10002 ] Open [ 1 ]
Sanjay Dahiya added a comment - 15/Nov/06 02:09 PM
Patch updated with latest trunk.

Sanjay Dahiya made changes - 15/Nov/06 02:09 PM
Attachment Hadoop-76_5.patch [ 12345039 ]
Sanjay Dahiya made changes - 15/Nov/06 09:46 PM
Status Open [ 1 ] Patch Available [ 10002 ]
Owen O'Malley added a comment - 15/Nov/06 11:24 PM
The propsed PhasedFileSystem needs to support mkdirs in order to work with MapFileOutputFormat.

It will break less user OutputFormats if we also support the read operations:
exists
openRaw
getLength
isDirectory
listPathsRaw
setWorkingDirectory
getWorkingDirectory
by passing the request to the underlying FileSystem.

It is confusing to use the TaskInProgress.hasSucceededTask for reduces and TaskInProgress.completes for maps. I think it would be better to use the completes for both.

Thanks for putting the generic types into activeTasks, but it should look like:

Map<String, String> activeTasks = new HashMap();

The declared type should use the interface and the constructor doesn't need the generic types.

The logic in TaskInProgress.isRunnable is pretty convoluted although I think if you use completes for reduces, it doesn't need to change.

TaskInProgress.hasRanOnMachine should be hasRunOnMachine.

findNewTask should add a new condition instead of continue:

} else if (specTarget == -1 &&
task.hasSpeculativeTask(avgProgress)) {
+ if(task.hasRanOnMachine(taskTracker)){ + continue ; + }
specTarget = i;
}

should be:

} else if (specTarget == -1 &&
task.hasSpeculativeTask(avgProgress) &&
!task.hasRanOnMachine(taskTracker)) { specTarget = i; }

The patch always creates a PhasedFileSystem even when it won't be used because there is no speculative execution.

@@ -298,7 +305,14 @@

} finally {
reducer.close();

  • out.close(reporter);
    + if( runSpeculative ){ + out.close(reporter); + pfs.commit(); + pfs.close(); + }else{ + out.close(reporter); + fs.close(); + }
    }

"out.close(reporter);" should be lifted out of the branch. And it is usually better to not close the file system because they are cached and may be used in another context. So I'd drop the else clause all together.


Sanjay Dahiya made changes - 20/Nov/06 12:48 PM
Link This issue depends on HADOOP-737 [ HADOOP-737 ]
Sanjay Dahiya added a comment - 20/Nov/06 08:40 PM
This patch removes the part of killing a reduce task if another speculative instance is running due to issue reported in HADOOP-737. Now the task is left running till job finishes.
Also includes Owen's review comments

thanks Owen for the review.


Sanjay Dahiya made changes - 20/Nov/06 08:40 PM
Attachment Hadoop-76_6.patch [ 12345369 ]
Owen O'Malley added a comment - 20/Nov/06 11:13 PM
I see a couple more points:
1. The code in TaskInProgress starting at line 445 is duplicated between the then/else branches and should be re-written to use a single test.
2. The directory ${system.dir}/${jobid}/${tipid}/${taskid} should always be deleted when a task completes or fails regardless of whether speculative execution was running so that non-speculative execution tasks can use the PhasedFileSystem too.
3. Equivalently, ${system.dir}/${jobid} should be always be deleted when a job completes or fails.
4. Why did you remove the "/ reduces.length" in computing the avgProgress in JobInProgress.java line 357? That seems wrong.
5. The JobInProgress.findNewTask should not break if it finds a speculative task. That would make it run a speculative task rather than a regular task that comes after it in the task list.

Sanjay Dahiya added a comment - 21/Nov/06 12:11 AM
4. Why did you remove the "/ reduces.length" in computing the avgProgress in JobInProgress.java line 357? That seems wrong.

JobInProgress.java:313 already divides the progressDelta of a task by no of tasks before adding it to reduce progress. That averages it out over reduces, or am I missing something there?


Repository Revision Date User Message
ASF #477876 Tue Nov 21 20:40:24 UTC 2006 cutting HADOOP-76. Implement speculative reduce. Contributed by Sanjay.
Files Changed
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
ADD /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
MODIFY /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
MODIFY /lucene/hadoop/trunk/CHANGES.txt

Doug Cutting added a comment - 21/Nov/06 08:40 PM
I just committed this.

I made one change, making PhasedFileSystem package-private rather than public. It's easier to make something public later than to make it non-public once it is public, and we should strive to minimize our public APIs.

Thanks, Sanjay!


Doug Cutting made changes - 21/Nov/06 08:40 PM
Resolution Fixed [ 1 ]
Fix Version/s 0.9.0 [ 12312134 ]
Status Patch Available [ 10002 ] Resolved [ 5 ]
Doug Cutting made changes - 01/Dec/06 10:43 PM
Status Resolved [ 5 ] Closed [ 6 ]
Owen O'Malley made changes - 08/Jul/09 04:51 PM
Component/s mapred [ 12310690 ]