|
Doug Cutting made changes - 22/Apr/06 12:12 AM
Sameer Paranjpye made changes - 31/May/06 01:56 AM
Sameer Paranjpye made changes - 31/May/06 07:36 AM
Doug Cutting made changes - 06/Jun/06 06:16 AM
Doug Cutting made changes - 07/Jun/06 04:37 AM
Doug Cutting made changes - 29/Jun/06 04:11 AM
Johan Oskarsson made changes - 30/Jun/06 10:32 PM
As one should've suspected there seems to be a problem with the patch.
It doesn't clean up after the tasks properly, but I do not currently have the time to look at this. Still, feedback is welcome or if someone wants to use any of it in their own patch, go ahead.
Doug Cutting made changes - 03/Aug/06 05:46 PM
Doug Cutting made changes - 04/Aug/06 08:05 PM
Doug Cutting made changes - 08/Sep/06 08:23 PM
Sanjay Dahiya made changes - 11/Oct/06 01:22 PM
Sanjay Dahiya made changes - 11/Oct/06 01:22 PM
Here is a list of code level changes, I will test this stuff meanwhile
The exact condition(timeouts) in which reduce task should be executed speculatively is open for discussion. comments? I think it is better for now to just use JobConf.setSpeculativeExecution for both maps and reduces.
machinesWhereFailed is a list of machines where the task has failed. To find where it is currently running, you need to use recentTasks. Currently, the recentTasks is a set of task ids that are running. You should probably make it a map from task id to task tracker id. Don't block speculative reduces based on other reduces running. That would make a perpetually busy cluster never run speculative reduces.
The defaults for the map speculative execution don't look too unreasonable, so just use them for now. This patch is up for review.
Here is the list of changes included in this patch - Replaced recentTasks to a Map, added a new method in TaskInProgress hasRanOnMachine, which looks at this Map and hasFailedOnMachines(). This is used to avoid scheduling multiple reduce instances of same task on the same node. Added a PhasedRecordWriter, which takes a RecordWriter, tempName, finalName. Another option was to create a PhasedOutputFormat, this seems more natural as it works with any existing OutputFormat and RecordWriter. Records are written to tempName and when commit is called they are moved to finalName. ReduceTask.run() - if speculative execution is enabled then reduce output is written to a temp location using PhasedRecordWriter. After task finishes the output is written to a final location. Added an extra attribute in TaskInProgress - runningSpeculative, to avoid running more than one speculative instances of ReduceTask. Too many Reduce instances for same task could increase load on Map machines, this needs discussion. I can revert this change back to allow some other number of instances of Reduces (MAX_TASK_FAILURES?). comments
Sanjay Dahiya made changes - 19/Oct/06 05:41 PM
Regarding the PhasedRecordWriter class, I think this should provide the option of creating a file, write to them and then call commit or abort. The current implementation binds it to a RecordWriter, but this should cater to things like Sequence files also (for e.g., RandomWriter should be able to use this functionality).
The PhasedRecordWriter won't handle all of the cases, because RecordWriters can write multiple files. Furthermore, they are user code and it would be better to minimize required changes to them.
A better approach would be to have a PhasedFileSystem that takes a base FileSystem and use that to commit/abort the changes. Then the framework could pass the PhasedFileSystem to the createRecordReader call and it would catch all of the files that the RecordWriter created. When the PhasedFileSystem gets a create call, it creates it in the base FileSystem with a mutated name. When the changes are commited, the files are all renamed. If the changes are aborted, the mutated filenames are deleted. PhasedFileSystem sounds good, I will work on this approach. thanks Devaraj, Owen for review.
Looking at FileSystem implementation here is a concern
FileSystem is an abstract class, extended by Local/Distributed FileSystem. ideal way to have a PhasedFileSystem would be if FileSystem was an interface, we could implement it and take a FileSystem in the constructor of PhasedFIleSystem. some thing like - Next option is to have something like PhasedFileSystem extends FileSystem{ public static getNamed(...) {
return new PhasedFileSystem(FileSystem.getNamed..) ;
} Last option is to add extra methods to FileSystem itself, which will not be used in most cases but will be available in derived FileSystems. This doesnt sound too good. Option 2 is something that will work well for us, even though its not a good design. Comments? 1. This patch doesn't apply cleanly anymore. The TaskTracker.java has a conflicting change.
2. I agree that it would be nice if FileSystem was an interface, but that is way out of scope for this bug. You should derive PhasedFileSystem from FileSystem. It should have a "base" FileSystem that does the real work and a Map<String,String> that maps "final" filenames to "temporary" filenames. PhasedFileSystem will need to implement the abstract "Raw" methods from FileSystem. The read operations, can just be sent directly to the base FileSystem. createRaw should create a temporary name, put it in the map and create the file using the base FileSystem. The other "modification" methods (renameRaw, deleteRaw) should throw UnsupportedOperationException. To ensure that the files are cleaned up correctly, I'd suggest that the temporary files be stored under: <system dir>/<job id>/<tip id>/<task id>/<unique file id> (in dfs clearly) so that when the job is finished, we just need to delete the job directory to clean up any remains of failed tasks that didn't clean up properly. there should also be: For close, it should lock the tip directory, move the files into place, put a DONE touch file in the tip directory and delete the task id directory. Since the dfs locking doesn't really work reliably, I would drop the tip id part of the directory structure, don't do any locking, and if the file is already in place, just skip that file.
Here is update patch for review
changes in this patch
+Planning to add an example for using PhasedFileSystem - changing RandomWriter to use PhasedFileSystem, as Devaraj suggested in a separate patch on the same issue. thanks owen for review.
Sanjay Dahiya made changes - 02/Nov/06 11:46 AM
One important point that needs to be reviewed in this patch is extra checks for speculative reduce tasks in TIP. We may want to inherit some of this in spec maps too or structure it better. looking for comments on that.
1. Noticed incorrect Javadoc/comments in some places. Please go through the patch carefully and fix them.
2. The commit and abort APIs of PhasedFileSystem needs to invoke the baseFS's close() method. 3. The runningTasks variable seems redundant and can be removed, and the code updating/reading runningTasks can be changed to update/read recentTasks instead. One more comment - need to document somewhere how exactly (what config params) a user of the PhasedFileSystem (a map method) can access the JobId, TaskId and TIPId when he wants to create an instance of PhasedFileSystem. Better yet, add a new constructor in PhasedFileSystem that takes a JobConf object. Inside that constructor, you can get the JobId, TaskId and TIPId values and proceed. The user doesn't have to bother about details in this case.
> it would be nice if FileSystem was an interface [ ...]
Sometimes that would be nice, and sometimes it would not be. The FileSystem API has evolved considerably without breaking back-compatibility. That's harder to do with an interface. Changes in this patch update
1. PhasedFileSystem has extra constructor that takes an JobConf 3. commit/abort can be done for a single file or for all files created by this FS. Thanks Devaraj for the review.
Sanjay Dahiya made changes - 09/Nov/06 11:15 AM
Sanjay Dahiya made changes - 09/Nov/06 01:30 PM
ops attached wrong file. updating.
Sanjay Dahiya made changes - 09/Nov/06 01:31 PM
Sanjay Dahiya made changes - 09/Nov/06 01:32 PM
Sanjay Dahiya made changes - 09/Nov/06 01:33 PM
Sanjay Dahiya made changes - 10/Nov/06 08:18 AM
Sanjay Dahiya made changes - 10/Nov/06 08:18 AM
Sanjay Dahiya made changes - 10/Nov/06 02:50 PM
Updated patch, earlier patch was causing PiEstimator test to fail. It was closing fs before ReduceTask, and Piestimator reduce task's close opens a new file causing it to fail. It is fixed now. TextInputFormat test still fails but I guess thats due to
Sanjay Dahiya made changes - 10/Nov/06 02:53 PM
The patch failed with Streaming test when run twice in succession, because output file created in first run still exists in second run and PhasedFileSystem was not handling an exception while rename.
This patch fixes the issue.
Sanjay Dahiya made changes - 13/Nov/06 12:20 PM
New patch, fixed some java doc warnings.
Sanjay Dahiya made changes - 14/Nov/06 09:39 AM
Sanjay Dahiya made changes - 14/Nov/06 09:40 AM
Conflicts with latest trunk. will submit an updated patch
Sanjay Dahiya made changes - 15/Nov/06 08:19 AM
Patch updated with latest trunk.
Sanjay Dahiya made changes - 15/Nov/06 02:09 PM
Sanjay Dahiya made changes - 15/Nov/06 09:46 PM
The propsed PhasedFileSystem needs to support mkdirs in order to work with MapFileOutputFormat.
It will break less user OutputFormats if we also support the read operations: It is confusing to use the TaskInProgress.hasSucceededTask for reduces and TaskInProgress.completes for maps. I think it would be better to use the completes for both. Thanks for putting the generic types into activeTasks, but it should look like: Map<String, String> activeTasks = new HashMap(); The declared type should use the interface and the constructor doesn't need the generic types. The logic in TaskInProgress.isRunnable is pretty convoluted although I think if you use completes for reduces, it doesn't need to change. TaskInProgress.hasRanOnMachine should be hasRunOnMachine. findNewTask should add a new condition instead of continue: } else if (specTarget == -1 && should be: } else if (specTarget == -1 && The patch always creates a PhasedFileSystem even when it won't be used because there is no speculative execution. @@ -298,7 +305,14 @@ } finally {
"out.close(reporter);" should be lifted out of the branch. And it is usually better to not close the file system because they are cached and may be used in another context. So I'd drop the else clause all together.
Sanjay Dahiya made changes - 20/Nov/06 12:48 PM
This patch removes the part of killing a reduce task if another speculative instance is running due to issue reported in
Also includes Owen's review comments thanks Owen for the review.
Sanjay Dahiya made changes - 20/Nov/06 08:40 PM
I see a couple more points:
1. The code in TaskInProgress starting at line 445 is duplicated between the then/else branches and should be re-written to use a single test. 2. The directory ${system.dir}/${jobid}/${tipid}/${taskid} should always be deleted when a task completes or fails regardless of whether speculative execution was running so that non-speculative execution tasks can use the PhasedFileSystem too. 3. Equivalently, ${system.dir}/${jobid} should be always be deleted when a job completes or fails. 4. Why did you remove the "/ reduces.length" in computing the avgProgress in JobInProgress.java line 357? That seems wrong. 5. The JobInProgress.findNewTask should not break if it finds a speculative task. That would make it run a speculative task rather than a regular task that comes after it in the task list. 4. Why did you remove the "/ reduces.length" in computing the avgProgress in JobInProgress.java line 357? That seems wrong.
JobInProgress.java:313 already divides the progressDelta of a task by no of tasks before adding it to reduce progress. That averages it out over reduces, or am I missing something there?
I just committed this.
I made one change, making PhasedFileSystem package-private rather than public. It's easier to make something public later than to make it non-public once it is public, and we should strive to minimize our public APIs. Thanks, Sanjay!
Doug Cutting made changes - 21/Nov/06 08:40 PM
Doug Cutting made changes - 01/Dec/06 10:43 PM
Owen O'Malley made changes - 08/Jul/09 04:51 PM
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
As suggested it writes output to a temporary name and the first one to finish moves it to the correct output name.
The patch adds a String tmpName to getRecordWriter in OutputFormatBase
and a close method. Basically the OutputFormatBase keeps track of the tmpName and the final name
once close is called it moves the tmp to the final.
This means the current output formats doesn't have to be changed.
This patch would ideally be complemented by a better tasktracker selection, I've seen instances where there's two final reduce tips and then a speculative reduce is assigned to the same node that is already running the other task.
A speculative reduce will be started if finishedReduces / numReduceTasks >= 0.7
That's about it, looking forward to hear your input