Hadoop Common
  1. Hadoop Common
  2. HADOOP-76

Implement speculative re-execution of reduces

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: 0.1.0
    • Fix Version/s: 0.9.0
    • Component/s: None
    • Labels:
      None

      Description

      As a first step, reduce task outputs should go to temporary files which are renamed when the task completes.

      1. spec_reducev.patch
        12 kB
        Johan Oskarsson
      2. Hadoop-76.patch
        9 kB
        Sanjay Dahiya
      3. Hadoop-76_1.patch
        26 kB
        Sanjay Dahiya
      4. Hadoop-76_2.patch
        30 kB
        Sanjay Dahiya
      5. Hadoop-76_3.patch
        30 kB
        Sanjay Dahiya
      6. Hadoop-76_4.patch
        30 kB
        Sanjay Dahiya
      7. Hadoop-76_5.patch
        30 kB
        Sanjay Dahiya
      8. Hadoop-76_6.patch
        28 kB
        Sanjay Dahiya

        Issue Links

          Activity

          Hide
          Doug Cutting added a comment -

          I just committed this.

          I made one change, making PhasedFileSystem package-private rather than public. It's easier to make something public later than to make it non-public once it is public, and we should strive to minimize our public APIs.

          Thanks, Sanjay!

          Show
          Doug Cutting added a comment - I just committed this. I made one change, making PhasedFileSystem package-private rather than public. It's easier to make something public later than to make it non-public once it is public, and we should strive to minimize our public APIs. Thanks, Sanjay!
          Hide
          Sanjay Dahiya added a comment -

          4. Why did you remove the "/ reduces.length" in computing the avgProgress in JobInProgress.java line 357? That seems wrong.

          JobInProgress.java:313 already divides the progressDelta of a task by no of tasks before adding it to reduce progress. That averages it out over reduces, or am I missing something there?

          Show
          Sanjay Dahiya added a comment - 4. Why did you remove the "/ reduces.length" in computing the avgProgress in JobInProgress.java line 357? That seems wrong. JobInProgress.java:313 already divides the progressDelta of a task by no of tasks before adding it to reduce progress. That averages it out over reduces, or am I missing something there?
          Hide
          Owen O'Malley added a comment -

          I see a couple more points:
          1. The code in TaskInProgress starting at line 445 is duplicated between the then/else branches and should be re-written to use a single test.
          2. The directory $

          {system.dir}/${jobid}/${tipid}/${taskid} should always be deleted when a task completes or fails regardless of whether speculative execution was running so that non-speculative execution tasks can use the PhasedFileSystem too.
          3. Equivalently, ${system.dir}

          /$

          {jobid}

          should be always be deleted when a job completes or fails.
          4. Why did you remove the "/ reduces.length" in computing the avgProgress in JobInProgress.java line 357? That seems wrong.
          5. The JobInProgress.findNewTask should not break if it finds a speculative task. That would make it run a speculative task rather than a regular task that comes after it in the task list.

          Show
          Owen O'Malley added a comment - I see a couple more points: 1. The code in TaskInProgress starting at line 445 is duplicated between the then/else branches and should be re-written to use a single test. 2. The directory $ {system.dir}/${jobid}/${tipid}/${taskid} should always be deleted when a task completes or fails regardless of whether speculative execution was running so that non-speculative execution tasks can use the PhasedFileSystem too. 3. Equivalently, ${system.dir} /$ {jobid} should be always be deleted when a job completes or fails. 4. Why did you remove the "/ reduces.length" in computing the avgProgress in JobInProgress.java line 357? That seems wrong. 5. The JobInProgress.findNewTask should not break if it finds a speculative task. That would make it run a speculative task rather than a regular task that comes after it in the task list.
          Hide
          Sanjay Dahiya added a comment -

          This patch removes the part of killing a reduce task if another speculative instance is running due to issue reported in HADOOP-737. Now the task is left running till job finishes.
          Also includes Owen's review comments

          thanks Owen for the review.

          Show
          Sanjay Dahiya added a comment - This patch removes the part of killing a reduce task if another speculative instance is running due to issue reported in HADOOP-737 . Now the task is left running till job finishes. Also includes Owen's review comments thanks Owen for the review.
          Hide
          Owen O'Malley added a comment -

          The propsed PhasedFileSystem needs to support mkdirs in order to work with MapFileOutputFormat.

          It will break less user OutputFormats if we also support the read operations:
          exists
          openRaw
          getLength
          isDirectory
          listPathsRaw
          setWorkingDirectory
          getWorkingDirectory
          by passing the request to the underlying FileSystem.

          It is confusing to use the TaskInProgress.hasSucceededTask for reduces and TaskInProgress.completes for maps. I think it would be better to use the completes for both.

          Thanks for putting the generic types into activeTasks, but it should look like:

          Map<String, String> activeTasks = new HashMap();

          The declared type should use the interface and the constructor doesn't need the generic types.

          The logic in TaskInProgress.isRunnable is pretty convoluted although I think if you use completes for reduces, it doesn't need to change.

          TaskInProgress.hasRanOnMachine should be hasRunOnMachine.

          findNewTask should add a new condition instead of continue:

          } else if (specTarget == -1 &&
          task.hasSpeculativeTask(avgProgress)) {
          + if(task.hasRanOnMachine(taskTracker))

          { + continue ; + }

          specTarget = i;
          }

          should be:

          } else if (specTarget == -1 &&
          task.hasSpeculativeTask(avgProgress) &&
          !task.hasRanOnMachine(taskTracker))

          { specTarget = i; }

          The patch always creates a PhasedFileSystem even when it won't be used because there is no speculative execution.

          @@ -298,7 +305,14 @@

          } finally {
          reducer.close();

          • out.close(reporter);
            + if( runSpeculative ) { + out.close(reporter); + pfs.commit(); + pfs.close(); + }

            else

            { + out.close(reporter); + fs.close(); + }

            }

          "out.close(reporter);" should be lifted out of the branch. And it is usually better to not close the file system because they are cached and may be used in another context. So I'd drop the else clause all together.

          Show
          Owen O'Malley added a comment - The propsed PhasedFileSystem needs to support mkdirs in order to work with MapFileOutputFormat. It will break less user OutputFormats if we also support the read operations: exists openRaw getLength isDirectory listPathsRaw setWorkingDirectory getWorkingDirectory by passing the request to the underlying FileSystem. It is confusing to use the TaskInProgress.hasSucceededTask for reduces and TaskInProgress.completes for maps. I think it would be better to use the completes for both. Thanks for putting the generic types into activeTasks, but it should look like: Map<String, String> activeTasks = new HashMap(); The declared type should use the interface and the constructor doesn't need the generic types. The logic in TaskInProgress.isRunnable is pretty convoluted although I think if you use completes for reduces, it doesn't need to change. TaskInProgress.hasRanOnMachine should be hasRunOnMachine. findNewTask should add a new condition instead of continue: } else if (specTarget == -1 && task.hasSpeculativeTask(avgProgress)) { + if(task.hasRanOnMachine(taskTracker)) { + continue ; + } specTarget = i; } should be: } else if (specTarget == -1 && task.hasSpeculativeTask(avgProgress) && !task.hasRanOnMachine(taskTracker)) { specTarget = i; } The patch always creates a PhasedFileSystem even when it won't be used because there is no speculative execution. @@ -298,7 +305,14 @@ } finally { reducer.close(); out.close(reporter); + if( runSpeculative ) { + out.close(reporter); + pfs.commit(); + pfs.close(); + } else { + out.close(reporter); + fs.close(); + } } "out.close(reporter);" should be lifted out of the branch. And it is usually better to not close the file system because they are cached and may be used in another context. So I'd drop the else clause all together.
          Hide
          Sanjay Dahiya added a comment -

          Patch updated with latest trunk.

          Show
          Sanjay Dahiya added a comment - Patch updated with latest trunk.
          Hide
          Sanjay Dahiya added a comment -

          Conflicts with latest trunk. will submit an updated patch

          Show
          Sanjay Dahiya added a comment - Conflicts with latest trunk. will submit an updated patch
          Hide
          Sanjay Dahiya added a comment -

          New patch, fixed some java doc warnings.

          Show
          Sanjay Dahiya added a comment - New patch, fixed some java doc warnings.
          Hide
          Sanjay Dahiya added a comment -

          The patch failed with Streaming test when run twice in succession, because output file created in first run still exists in second run and PhasedFileSystem was not handling an exception while rename.
          This patch fixes the issue.

          Show
          Sanjay Dahiya added a comment - The patch failed with Streaming test when run twice in succession, because output file created in first run still exists in second run and PhasedFileSystem was not handling an exception while rename. This patch fixes the issue.
          Hide
          Sanjay Dahiya added a comment -

          Updated patch, earlier patch was causing PiEstimator test to fail. It was closing fs before ReduceTask, and Piestimator reduce task's close opens a new file causing it to fail. It is fixed now. TextInputFormat test still fails but I guess thats due to HADOOP-696.

          Show
          Sanjay Dahiya added a comment - Updated patch, earlier patch was causing PiEstimator test to fail. It was closing fs before ReduceTask, and Piestimator reduce task's close opens a new file causing it to fail. It is fixed now. TextInputFormat test still fails but I guess thats due to HADOOP-696 .
          Hide
          Sanjay Dahiya added a comment -

          ops attached wrong file. updating.

          Show
          Sanjay Dahiya added a comment - ops attached wrong file. updating.
          Hide
          Sanjay Dahiya added a comment -

          Changes in this patch update

          1. PhasedFileSystem has extra constructor that takes an JobConf
          2. PhasedFileSystem now handles file overwrite flag properly.

          3. commit/abort can be done for a single file or for all files created by this FS.
          4. The method for commit is split in 2 function one is pvt with an extra boolean variable to avoid concurrent modification while committing in a iterator loop.
          4. reduceProgress calculation is changed, earlier it was dividing the progress by reduces.length twice, once while update status and then while checking for spec tasks, which was incorrect.

          Thanks Devaraj for the review.

          Show
          Sanjay Dahiya added a comment - Changes in this patch update 1. PhasedFileSystem has extra constructor that takes an JobConf 2. PhasedFileSystem now handles file overwrite flag properly. 3. commit/abort can be done for a single file or for all files created by this FS. 4. The method for commit is split in 2 function one is pvt with an extra boolean variable to avoid concurrent modification while committing in a iterator loop. 4. reduceProgress calculation is changed, earlier it was dividing the progress by reduces.length twice, once while update status and then while checking for spec tasks, which was incorrect. Thanks Devaraj for the review.
          Hide
          Doug Cutting added a comment -

          > it would be nice if FileSystem was an interface [ ...]

          Sometimes that would be nice, and sometimes it would not be. The FileSystem API has evolved considerably without breaking back-compatibility. That's harder to do with an interface.

          Show
          Doug Cutting added a comment - > it would be nice if FileSystem was an interface [ ...] Sometimes that would be nice, and sometimes it would not be. The FileSystem API has evolved considerably without breaking back-compatibility. That's harder to do with an interface.
          Hide
          Devaraj Das added a comment -

          One more comment - need to document somewhere how exactly (what config params) a user of the PhasedFileSystem (a map method) can access the JobId, TaskId and TIPId when he wants to create an instance of PhasedFileSystem. Better yet, add a new constructor in PhasedFileSystem that takes a JobConf object. Inside that constructor, you can get the JobId, TaskId and TIPId values and proceed. The user doesn't have to bother about details in this case.

          Show
          Devaraj Das added a comment - One more comment - need to document somewhere how exactly (what config params) a user of the PhasedFileSystem (a map method) can access the JobId, TaskId and TIPId when he wants to create an instance of PhasedFileSystem. Better yet, add a new constructor in PhasedFileSystem that takes a JobConf object. Inside that constructor, you can get the JobId, TaskId and TIPId values and proceed. The user doesn't have to bother about details in this case.
          Hide
          Devaraj Das added a comment -

          1. Noticed incorrect Javadoc/comments in some places. Please go through the patch carefully and fix them.
          2. The commit and abort APIs of PhasedFileSystem needs to invoke the baseFS's close() method.
          3. The runningTasks variable seems redundant and can be removed, and the code updating/reading runningTasks can be changed to update/read recentTasks instead.

          Show
          Devaraj Das added a comment - 1. Noticed incorrect Javadoc/comments in some places. Please go through the patch carefully and fix them. 2. The commit and abort APIs of PhasedFileSystem needs to invoke the baseFS's close() method. 3. The runningTasks variable seems redundant and can be removed, and the code updating/reading runningTasks can be changed to update/read recentTasks instead.
          Hide
          Sanjay Dahiya added a comment -

          One important point that needs to be reviewed in this patch is extra checks for speculative reduce tasks in TIP. We may want to inherit some of this in spec maps too or structure it better. looking for comments on that.

          Show
          Sanjay Dahiya added a comment - One important point that needs to be reviewed in this patch is extra checks for speculative reduce tasks in TIP. We may want to inherit some of this in spec maps too or structure it better. looking for comments on that.
          Hide
          Sanjay Dahiya added a comment -

          Here is update patch for review

          changes in this patch

          • Moved temp files to (mapred.system.dir)/<jobid>
          • 2 new variables in TIP -
          • runningTasks ( to track currently running instances of attempts for same task, earlier this was done using a boolean)
          • hasCompletedTask ( for reduces if a attempts succeeds then all subsequent failures for same TIP should be ignored, existing setup tries to schedule another task for that.)
          • Added a phaseFileSystem, which takes a jobid, tipid, taskid and creates all files in (mapred.system.dir)/<jobid>/<tipid>/<taskid>, these are moved to their final location on commit or deleted.
          • Task Constructor needs tip id now, its passed in RPC as well.
          • Task.localizeConfiguration adds tip id to conf.
          • Minor change in FSDirectory to add exception message in log if rename fails.

          +Planning to add an example for using PhasedFileSystem - changing RandomWriter to use PhasedFileSystem, as Devaraj suggested in a separate patch on the same issue.

          thanks owen for review.

          Show
          Sanjay Dahiya added a comment - Here is update patch for review changes in this patch Moved temp files to (mapred.system.dir)/<jobid> 2 new variables in TIP - runningTasks ( to track currently running instances of attempts for same task, earlier this was done using a boolean) hasCompletedTask ( for reduces if a attempts succeeds then all subsequent failures for same TIP should be ignored, existing setup tries to schedule another task for that.) Added a phaseFileSystem, which takes a jobid, tipid, taskid and creates all files in (mapred.system.dir)/<jobid>/<tipid>/<taskid>, these are moved to their final location on commit or deleted. Task Constructor needs tip id now, its passed in RPC as well. Task.localizeConfiguration adds tip id to conf. Minor change in FSDirectory to add exception message in log if rename fails. +Planning to add an example for using PhasedFileSystem - changing RandomWriter to use PhasedFileSystem, as Devaraj suggested in a separate patch on the same issue. thanks owen for review.
          Hide
          Owen O'Malley added a comment -

          Since the dfs locking doesn't really work reliably, I would drop the tip id part of the directory structure, don't do any locking, and if the file is already in place, just skip that file.

          Show
          Owen O'Malley added a comment - Since the dfs locking doesn't really work reliably, I would drop the tip id part of the directory structure, don't do any locking, and if the file is already in place, just skip that file.
          Hide
          Owen O'Malley added a comment -

          1. This patch doesn't apply cleanly anymore. The TaskTracker.java has a conflicting change.

          2. I agree that it would be nice if FileSystem was an interface, but that is way out of scope for this bug.

          You should derive PhasedFileSystem from FileSystem. It should have a "base" FileSystem that does the real work and a Map<String,String> that maps "final" filenames to "temporary" filenames.

          PhasedFileSystem will need to implement the abstract "Raw" methods from FileSystem. The read operations, can just be sent directly to the base FileSystem. createRaw should create a temporary name, put it in the map and create the file using the base FileSystem. The other "modification" methods (renameRaw, deleteRaw) should throw UnsupportedOperationException.

          To ensure that the files are cleaned up correctly, I'd suggest that the temporary files be stored under:

          <system dir>/<job id>/<tip id>/<task id>/<unique file id>

          (in dfs clearly) so that when the job is finished, we just need to delete the job directory to clean up any remains of failed tasks that didn't clean up properly.

          there should also be:
          public void commit() throws IOException

          {...}
          public void abort() throws IOException {...}

          public void close() throws IOException

          { abort(); }

          For close, it should lock the tip directory, move the files into place, put a DONE touch file in the tip directory and delete the task id directory.

          Show
          Owen O'Malley added a comment - 1. This patch doesn't apply cleanly anymore. The TaskTracker.java has a conflicting change. 2. I agree that it would be nice if FileSystem was an interface, but that is way out of scope for this bug. You should derive PhasedFileSystem from FileSystem. It should have a "base" FileSystem that does the real work and a Map<String,String> that maps "final" filenames to "temporary" filenames. PhasedFileSystem will need to implement the abstract "Raw" methods from FileSystem. The read operations, can just be sent directly to the base FileSystem. createRaw should create a temporary name, put it in the map and create the file using the base FileSystem. The other "modification" methods (renameRaw, deleteRaw) should throw UnsupportedOperationException. To ensure that the files are cleaned up correctly, I'd suggest that the temporary files be stored under: <system dir>/<job id>/<tip id>/<task id>/<unique file id> (in dfs clearly) so that when the job is finished, we just need to delete the job directory to clean up any remains of failed tasks that didn't clean up properly. there should also be: public void commit() throws IOException {...} public void abort() throws IOException {...} public void close() throws IOException { abort(); } For close, it should lock the tip directory, move the files into place, put a DONE touch file in the tip directory and delete the task id directory.
          Hide
          Sanjay Dahiya added a comment -

          Looking at FileSystem implementation here is a concern

          FileSystem is an abstract class, extended by Local/Distributed FileSystem. ideal way to have a PhasedFileSystem would be if FileSystem was an interface, we could implement it and take a FileSystem in the constructor of PhasedFIleSystem. some thing like -
          PhasedFileSystem implements FileSystem

          { public PhasedFileSystem(FileSysten fs); // channel all methods to fs // implement commit() / abort() }

          In this case we could use Phased functionality with both Local/Distributed FileSystem, still maintaining interface compatibility. Doing this with base abstract class will cause some dummy objects. as in next option

          Next option is to have something like

          PhasedFileSystem extends FileSystem{
          private FileSystem fs ;
          protected PhasedFileSystem(FileSysten fs)

          { super(fs.getConf()); // not used, just dummy for base class creation this.fs = fs ; }

          protected PhasedFileSystem(conf)

          { super(conf); throw NotSupportedException; }

          public static getNamed(...)

          { return new PhasedFileSystem(FileSystem.getNamed..) ; }

          // All other methods channel calls to fs
          commit() ;
          abort();
          }

          Last option is to add extra methods to FileSystem itself, which will not be used in most cases but will be available in derived FileSystems. This doesnt sound too good.

          Option 2 is something that will work well for us, even though its not a good design.

          Comments?

          Show
          Sanjay Dahiya added a comment - Looking at FileSystem implementation here is a concern FileSystem is an abstract class, extended by Local/Distributed FileSystem. ideal way to have a PhasedFileSystem would be if FileSystem was an interface, we could implement it and take a FileSystem in the constructor of PhasedFIleSystem. some thing like - PhasedFileSystem implements FileSystem { public PhasedFileSystem(FileSysten fs); // channel all methods to fs // implement commit() / abort() } In this case we could use Phased functionality with both Local/Distributed FileSystem, still maintaining interface compatibility. Doing this with base abstract class will cause some dummy objects. as in next option Next option is to have something like PhasedFileSystem extends FileSystem{ private FileSystem fs ; protected PhasedFileSystem(FileSysten fs) { super(fs.getConf()); // not used, just dummy for base class creation this.fs = fs ; } protected PhasedFileSystem(conf) { super(conf); throw NotSupportedException; } public static getNamed(...) { return new PhasedFileSystem(FileSystem.getNamed..) ; } // All other methods channel calls to fs commit() ; abort(); } Last option is to add extra methods to FileSystem itself, which will not be used in most cases but will be available in derived FileSystems. This doesnt sound too good. Option 2 is something that will work well for us, even though its not a good design. Comments?
          Hide
          Sanjay Dahiya added a comment -

          PhasedFileSystem sounds good, I will work on this approach. thanks Devaraj, Owen for review.

          Show
          Sanjay Dahiya added a comment - PhasedFileSystem sounds good, I will work on this approach. thanks Devaraj, Owen for review.
          Hide
          Owen O'Malley added a comment -

          The PhasedRecordWriter won't handle all of the cases, because RecordWriters can write multiple files. Furthermore, they are user code and it would be better to minimize required changes to them.

          A better approach would be to have a PhasedFileSystem that takes a base FileSystem and use that to commit/abort the changes. Then the framework could pass the PhasedFileSystem to the createRecordReader call and it would catch all of the files that the RecordWriter created. When the PhasedFileSystem gets a create call, it creates it in the base FileSystem with a mutated name. When the changes are commited, the files are all renamed. If the changes are aborted, the mutated filenames are deleted.

          Show
          Owen O'Malley added a comment - The PhasedRecordWriter won't handle all of the cases, because RecordWriters can write multiple files. Furthermore, they are user code and it would be better to minimize required changes to them. A better approach would be to have a PhasedFileSystem that takes a base FileSystem and use that to commit/abort the changes. Then the framework could pass the PhasedFileSystem to the createRecordReader call and it would catch all of the files that the RecordWriter created. When the PhasedFileSystem gets a create call, it creates it in the base FileSystem with a mutated name. When the changes are commited, the files are all renamed. If the changes are aborted, the mutated filenames are deleted.
          Hide
          Devaraj Das added a comment -

          Regarding the PhasedRecordWriter class, I think this should provide the option of creating a file, write to them and then call commit or abort. The current implementation binds it to a RecordWriter, but this should cater to things like Sequence files also (for e.g., RandomWriter should be able to use this functionality).

          Show
          Devaraj Das added a comment - Regarding the PhasedRecordWriter class, I think this should provide the option of creating a file, write to them and then call commit or abort. The current implementation binds it to a RecordWriter, but this should cater to things like Sequence files also (for e.g., RandomWriter should be able to use this functionality).
          Hide
          Sanjay Dahiya added a comment -

          This patch is up for review.

          Here is the list of changes included in this patch -

          Replaced recentTasks to a Map, added a new method in TaskInProgress hasRanOnMachine, which looks at this Map and hasFailedOnMachines(). This is used to avoid scheduling multiple reduce instances of same task on the same node.

          Added a PhasedRecordWriter, which takes a RecordWriter, tempName, finalName. Another option was to create a PhasedOutputFormat, this seems more natural as it works with any existing OutputFormat and RecordWriter. Records are written to tempName and when commit is called they are moved to finalName.

          ReduceTask.run() - if speculative execution is enabled then reduce output is written to a temp location using PhasedRecordWriter. After task finishes the output is written to a final location.
          If some other speculative instance finishes first then TaskInProgress.shouldCloseForClosedJob() returns true for the taskId. On TaskTracker the task is killed by Process.destroy() so cleanup code is in TaskTracker instead of Task. The cleanup of Maps happen in Conf, which is probably misplaced. We could refactor this part for both Map and Reduce and move cleanup code to some utility classes which, given a Map and Reduce task track the files generated and cleanup if needed.

          Added an extra attribute in TaskInProgress - runningSpeculative, to avoid running more than one speculative instances of ReduceTask. Too many Reduce instances for same task could increase load on Map machines, this needs discussion. I can revert this change back to allow some other number of instances of Reduces (MAX_TASK_FAILURES?).

          comments

          Show
          Sanjay Dahiya added a comment - This patch is up for review. Here is the list of changes included in this patch - Replaced recentTasks to a Map, added a new method in TaskInProgress hasRanOnMachine, which looks at this Map and hasFailedOnMachines(). This is used to avoid scheduling multiple reduce instances of same task on the same node. Added a PhasedRecordWriter, which takes a RecordWriter, tempName, finalName. Another option was to create a PhasedOutputFormat, this seems more natural as it works with any existing OutputFormat and RecordWriter. Records are written to tempName and when commit is called they are moved to finalName. ReduceTask.run() - if speculative execution is enabled then reduce output is written to a temp location using PhasedRecordWriter. After task finishes the output is written to a final location. If some other speculative instance finishes first then TaskInProgress.shouldCloseForClosedJob() returns true for the taskId. On TaskTracker the task is killed by Process.destroy() so cleanup code is in TaskTracker instead of Task. The cleanup of Maps happen in Conf, which is probably misplaced. We could refactor this part for both Map and Reduce and move cleanup code to some utility classes which, given a Map and Reduce task track the files generated and cleanup if needed. Added an extra attribute in TaskInProgress - runningSpeculative, to avoid running more than one speculative instances of ReduceTask. Too many Reduce instances for same task could increase load on Map machines, this needs discussion. I can revert this change back to allow some other number of instances of Reduces (MAX_TASK_FAILURES?). comments
          Hide
          Owen O'Malley added a comment -

          I think it is better for now to just use JobConf.setSpeculativeExecution for both maps and reduces.

          machinesWhereFailed is a list of machines where the task has failed. To find where it is currently running, you need to use recentTasks. Currently, the recentTasks is a set of task ids that are running. You should probably make it a map from task id to task tracker id.

          Don't block speculative reduces based on other reduces running. That would make a perpetually busy cluster never run speculative reduces.

              • Please create a library that lets you create files off to the side and when you call commit moves them into place. Speculative reduces need it, but that functionality is useful other places, such as side effect-based maps. The class should also have an abort method that cleans up.

          The defaults for the map speculative execution don't look too unreasonable, so just use them for now.

          Show
          Owen O'Malley added a comment - I think it is better for now to just use JobConf.setSpeculativeExecution for both maps and reduces. machinesWhereFailed is a list of machines where the task has failed. To find where it is currently running, you need to use recentTasks. Currently, the recentTasks is a set of task ids that are running. You should probably make it a map from task id to task tracker id. Don't block speculative reduces based on other reduces running. That would make a perpetually busy cluster never run speculative reduces. Please create a library that lets you create files off to the side and when you call commit moves them into place. Speculative reduces need it, but that functionality is useful other places, such as side effect-based maps. The class should also have an abort method that cleans up. The defaults for the map speculative execution don't look too unreasonable, so just use them for now.
          Hide
          Sanjay Dahiya added a comment -

          Here is a list of code level changes, I will test this stuff meanwhile

          • Adding extra jobConf configuration - runSpeculativeReduces.
          • TaskInProgress maintains a list of nodes where it has already ran ( or is running ), this will be used to not schedule a speculative instance where the task is already running or has failed in past. [TIP already contains a list of nodes where it task failed ].
          • Another option is if any reduce task is already assigned to this TT and is still running, then its not assigned a speculative task. [comments?]
          • TIP.hasSpeculative task , now checks for reduce tasks as well. currently it checks for only map tasks. The exact condition(timeouts) in which reduce task should be executed speculatively is open for discussion. using johan's conditions(finishedReduces / numReduceTasks >= 0.7 ) for testing till then.
          • JobInProgress.findNewTask - looks for speculative tasks (TIP.hasSpeculativeTask()) and whether the task ran on same task tracker.
          • If speculative execution of reduce is enabled then ReduceTask.run() creates a temp file name for reduce output. When reduce task finishes it checks if the output file is already written by some other reduce instance else it renames its output to final output. otherwise temp output is deleted.
          • TaskTracker.TIP.cleanup() also cleans up the reduce task temp file if it is killed in between.
          • JobTracker.pollForTaskWithClosedJob(), TIP.shouldCloseForClosedJob() - return true if a speculative reduce task finished first, which ultimately goes down to TT and kills/cleans up the task.

          The exact condition(timeouts) in which reduce task should be executed speculatively is open for discussion.

          comments?

          Show
          Sanjay Dahiya added a comment - Here is a list of code level changes, I will test this stuff meanwhile Adding extra jobConf configuration - runSpeculativeReduces. TaskInProgress maintains a list of nodes where it has already ran ( or is running ), this will be used to not schedule a speculative instance where the task is already running or has failed in past. [TIP already contains a list of nodes where it task failed ] . Another option is if any reduce task is already assigned to this TT and is still running, then its not assigned a speculative task. [comments?] TIP.hasSpeculative task , now checks for reduce tasks as well. currently it checks for only map tasks. The exact condition(timeouts) in which reduce task should be executed speculatively is open for discussion. using johan's conditions(finishedReduces / numReduceTasks >= 0.7 ) for testing till then. JobInProgress.findNewTask - looks for speculative tasks (TIP.hasSpeculativeTask()) and whether the task ran on same task tracker. If speculative execution of reduce is enabled then ReduceTask.run() creates a temp file name for reduce output. When reduce task finishes it checks if the output file is already written by some other reduce instance else it renames its output to final output. otherwise temp output is deleted. TaskTracker.TIP.cleanup() also cleans up the reduce task temp file if it is killed in between. JobTracker.pollForTaskWithClosedJob(), TIP.shouldCloseForClosedJob() - return true if a speculative reduce task finished first, which ultimately goes down to TT and kills/cleans up the task. The exact condition(timeouts) in which reduce task should be executed speculatively is open for discussion. comments?
          Hide
          Johan Oskarsson added a comment -

          As one should've suspected there seems to be a problem with the patch.
          It doesn't clean up after the tasks properly, but I do not currently have the time to look at this.
          Still, feedback is welcome or if someone wants to use any of it in their own patch, go ahead.

          Show
          Johan Oskarsson added a comment - As one should've suspected there seems to be a problem with the patch. It doesn't clean up after the tasks properly, but I do not currently have the time to look at this. Still, feedback is welcome or if someone wants to use any of it in their own patch, go ahead.
          Hide
          Johan Oskarsson added a comment -

          I've tried to implement speculative reduces and it seems to be working, however I'd like you to take a look at it since I'm not familiar with some of the inner workings of hadoop.

          As suggested it writes output to a temporary name and the first one to finish moves it to the correct output name.
          The patch adds a String tmpName to getRecordWriter in OutputFormatBase
          and a close method. Basically the OutputFormatBase keeps track of the tmpName and the final name
          once close is called it moves the tmp to the final.

          This means the current output formats doesn't have to be changed.

          This patch would ideally be complemented by a better tasktracker selection, I've seen instances where there's two final reduce tips and then a speculative reduce is assigned to the same node that is already running the other task.

          A speculative reduce will be started if finishedReduces / numReduceTasks >= 0.7

          That's about it, looking forward to hear your input

          Show
          Johan Oskarsson added a comment - I've tried to implement speculative reduces and it seems to be working, however I'd like you to take a look at it since I'm not familiar with some of the inner workings of hadoop. As suggested it writes output to a temporary name and the first one to finish moves it to the correct output name. The patch adds a String tmpName to getRecordWriter in OutputFormatBase and a close method. Basically the OutputFormatBase keeps track of the tmpName and the final name once close is called it moves the tmp to the final. This means the current output formats doesn't have to be changed. This patch would ideally be complemented by a better tasktracker selection, I've seen instances where there's two final reduce tips and then a speculative reduce is assigned to the same node that is already running the other task. A speculative reduce will be started if finishedReduces / numReduceTasks >= 0.7 That's about it, looking forward to hear your input

            People

            • Assignee:
              Sanjay Dahiya
              Reporter:
              Doug Cutting
            • Votes:
              1 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development