Hadoop Common
  1. Hadoop Common
  2. HADOOP-2141

speculative execution start up condition based on completion time

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.21.0
    • Fix Version/s: 0.21.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      We had one job with speculative execution hang.

      4 reduce tasks were stuck with 95% completion because of a bad disk.

      Devaraj pointed out
      bq . One of the conditions that must be met for launching a speculative instance of a task is that it must be at least 20% behind the average progress, and this is not true here.

      It would be nice if speculative execution also starts up when tasks stop making progress.

      Devaraj suggested

      Maybe, we should introduce a condition for average completion time for tasks in the speculative execution check.

      1. HADOOP-2141.patch
        28 kB
        Andy Konwinski
      2. HADOOP-2141-v2.patch
        30 kB
        Andy Konwinski
      3. HADOOP-2141-v3.patch
        30 kB
        Andy Konwinski
      4. HADOOP-2141-v4.patch
        30 kB
        Andy Konwinski
      5. 2141.patch
        32 kB
        Devaraj Das
      6. HADOOP-2141-v5.patch
        31 kB
        Andy Konwinski
      7. HADOOP-2141-v6.patch
        34 kB
        Andy Konwinski
      8. HADOOP-2141.v7.patch
        34 kB
        Andy Konwinski
      9. HADOOP-2141.v8.patch
        35 kB
        Andy Konwinski
      10. 2141.4.patch
        69 kB
        Devaraj Das
      11. 2141.5.patch
        75 kB
        Devaraj Das
      12. 2141.6.patch
        76 kB
        Devaraj Das
      13. 2141.7.patch
        76 kB
        Devaraj Das
      14. 2141.8.2.patch
        80 kB
        Devaraj Das
      15. 2141.8.3.patch
        80 kB
        Devaraj Das
      16. hadoop-2141-yahoo-v1.4.1.patch
        22 kB
        Amar Kamat
      17. hadoop-2141-yahoo-v1.4.8.patch
        25 kB
        Amar Kamat

        Issue Links

          Activity

          Hide
          Andrew added a comment -

          Was this integrated in to .20/1.0.x?

          Show
          Andrew added a comment - Was this integrated in to .20/1.0.x?
          Jeff Hammerbacher made changes -
          Link This issue relates to MAPREDUCE-2039 [ MAPREDUCE-2039 ]
          Tom White made changes -
          Status Resolved [ 5 ] Closed [ 6 ]
          Amar Kamat made changes -
          Attachment hadoop-2141-yahoo-v1.4.8.patch [ 12435253 ]
          Hide
          Amar Kamat added a comment -

          Attaching a patch for Yahoo!'s distribution of hadoop-0.20 not to be committed.

          Show
          Amar Kamat added a comment - Attaching a patch for Yahoo!'s distribution of hadoop-0.20 not to be committed.
          Amar Kamat made changes -
          Attachment hadoop-2141-yahoo-v1.4.1.patch [ 12431756 ]
          Hide
          Amar Kamat added a comment -

          Attaching a patch for Yahoo!'s distribution of Hadoop 0.20 not to be committed. test-patch and ant tests passed on my box.

          Show
          Amar Kamat added a comment - Attaching a patch for Yahoo!'s distribution of Hadoop 0.20 not to be committed. test-patch and ant tests passed on my box.
          Robert Chansler made changes -
          Release Note Improves the speculative execution heuristic. The heuristic is currently based on the progress-rates of tasks and the expected time of completion. Also, statistics about trackers are collected, and speculative tasks are not given to the ones deduced to be slow.
          Hide
          Robert Chansler added a comment -

          Editorial pass over all release notes prior to publication of 0.21. Routine.

          Show
          Robert Chansler added a comment - Editorial pass over all release notes prior to publication of 0.21. Routine.
          Owen O'Malley made changes -
          Component/s mapred [ 12310690 ]
          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Hadoop-trunk #869 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/869/ )
          Devaraj Das made changes -
          Status Patch Available [ 10002 ] Resolved [ 5 ]
          Hadoop Flags [Reviewed]
          Release Note Updated speculative execution scheduler Improves the speculative execution heuristic. The heuristic is currently based on the progress-rates of tasks and the expected time of completion. Also, statistics about trackers are collected, and speculative tasks are not given to the ones deduced to be slow.
          Resolution Fixed [ 1 ]
          Hide
          Devaraj Das added a comment -

          I just committed this. Thanks Andy!

          Show
          Devaraj Das added a comment - I just committed this. Thanks Andy!
          Hide
          Amareshwari Sriramadasu added a comment -

          +1 patch looks good to me.

          Show
          Amareshwari Sriramadasu added a comment - +1 patch looks good to me.
          Devaraj Das made changes -
          Attachment 2141.8.3.patch [ 12410655 ]
          Hide
          Devaraj Das added a comment -

          Attached patch addresses the minor concerns.

          Show
          Devaraj Das added a comment - Attached patch addresses the minor concerns.
          Hide
          Amareshwari Sriramadasu added a comment -

          Some minor comments :

          +  //runningMapStats used to maintain the RUNNING reduce tasks' statistics
          +  private DataStatistics runningReduceTaskStats = new DataStatistics();
          

          The comment should say runningReduceStats

          In TaskInProgress.updateStatus() method, taskStatuses.put(taskid, status) can be moved up in the if condition and else can be removed.

          Show
          Amareshwari Sriramadasu added a comment - Some minor comments : + //runningMapStats used to maintain the RUNNING reduce tasks' statistics + private DataStatistics runningReduceTaskStats = new DataStatistics(); The comment should say runningReduceStats In TaskInProgress.updateStatus() method, taskStatuses.put(taskid, status) can be moved up in the if condition and else can be removed.
          Devaraj Das made changes -
          Attachment 2141.8.2.patch [ 12410621 ]
          Hide
          Devaraj Das added a comment -

          This patch has an improved testcase, and also fixes some java doc.
          ant run-test-mapred/test-patch passed with this patch.

          Show
          Devaraj Das added a comment - This patch has an improved testcase, and also fixes some java doc. ant run-test-mapred/test-patch passed with this patch.
          Devaraj Das made changes -
          Attachment 2141.7.patch [ 12410480 ]
          Hide
          Devaraj Das added a comment -

          Attached patch addresses the concerns.

          Show
          Devaraj Das added a comment - Attached patch addresses the concerns.
          Hide
          Amareshwari Sriramadasu added a comment - - edited

          one minor comment:
          Initialization for stats can be done inline. And comments to the stats, describing what the stats are for, can be added.

          Otherwise, Changes look fine to me.

          Show
          Amareshwari Sriramadasu added a comment - - edited one minor comment: Initialization for stats can be done inline. And comments to the stats, describing what the stats are for, can be added. Otherwise, Changes look fine to me.
          Devaraj Das made changes -
          Attachment 2141.6.patch [ 12410463 ]
          Hide
          Devaraj Das added a comment -

          Attaching a patch with minor changes. For a sort job on a ~200 node cluster, the number of speculative tasks launched with this patch is only ~10% of the number of task launches with the trunk. The job run time is almost the same. From the tasks that is chosen, I've seen at least 30% accuracy in correctly choosing the tasks for speculation. In some cases, I even saw 100%.

          Show
          Devaraj Das added a comment - Attaching a patch with minor changes. For a sort job on a ~200 node cluster, the number of speculative tasks launched with this patch is only ~10% of the number of task launches with the trunk. The job run time is almost the same. From the tasks that is chosen, I've seen at least 30% accuracy in correctly choosing the tasks for speculation. In some cases, I even saw 100%.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12410208/2141.5.patch
          against trunk revision 783059.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 22 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 Eclipse classpath. The patch retains Eclipse classpath integrity.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          +1 core tests. The patch passed core unit tests.

          -1 contrib tests. The patch failed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch-vesta.apache.org/488/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch-vesta.apache.org/488/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch-vesta.apache.org/488/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch-vesta.apache.org/488/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12410208/2141.5.patch against trunk revision 783059. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 22 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch-vesta.apache.org/488/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch-vesta.apache.org/488/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch-vesta.apache.org/488/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch-vesta.apache.org/488/console This message is automatically generated.
          Devaraj Das made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Devaraj Das made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Fix Version/s 0.21.0 [ 12313563 ]
          Devaraj Das made changes -
          Attachment 2141.5.patch [ 12410208 ]
          Hide
          Devaraj Das added a comment -

          A well tested patch. Results are pretty good w.r.t slot utilization vis-a-vis job run-time. The tests are still going on.

          Show
          Devaraj Das added a comment - A well tested patch. Results are pretty good w.r.t slot utilization vis-a-vis job run-time. The tests are still going on.
          Devaraj Das made changes -
          Attachment 2141.4.patch [ 12409166 ]
          Hide
          Devaraj Das added a comment -

          The attached patch fixes some bugs that was there in the earlier patch. Also, the standard deviation of a task progress is what is used for determining whether to speculate or not. I also wrote up a unit test that fakes JobInProgress and the Clock. The first is required so that task scheduling is transparent to the testcase although it uses the framework's lower level methods. This keeps things really very close to reality, and at the same time the testcase can make equality checks for the taskAttemptID that it gets back when it asks for one from the fake JobInProgress. The Clock is faked so that tasks can be made to progress over time artificially and things like progress rates can be easily computed for testing purposes. The other option was to use a MiniMRCluster but it seems like it would not be easy to achieve what i have in the testcase easily. The third option was to spoof heartbeats and not fake the JobInProgress but that also seemed not easily manageable..

          Show
          Devaraj Das added a comment - The attached patch fixes some bugs that was there in the earlier patch. Also, the standard deviation of a task progress is what is used for determining whether to speculate or not. I also wrote up a unit test that fakes JobInProgress and the Clock. The first is required so that task scheduling is transparent to the testcase although it uses the framework's lower level methods. This keeps things really very close to reality, and at the same time the testcase can make equality checks for the taskAttemptID that it gets back when it asks for one from the fake JobInProgress. The Clock is faked so that tasks can be made to progress over time artificially and things like progress rates can be easily computed for testing purposes. The other option was to use a MiniMRCluster but it seems like it would not be easy to achieve what i have in the testcase easily. The third option was to spoof heartbeats and not fake the JobInProgress but that also seemed not easily manageable..
          Hide
          Andy Konwinski added a comment -

          Devaraj, addressing your points:

          1) Why do you think that progress rate would be better than expected time to
          completion? We thought about this in the OSDI paper a bit, here is what we said:

          "The primary insight behind our scheduler is the fol- lowing: We always
          speculatively execute the task that we think will finish farthest into the
          future, because this task provides the greatest opportunity for a speculative
          task to overtake the original and save a significant amount of time."

          If speculative tasks didn't have to redo (potentially a lot of) work that the
          original task attempt already completed, I would agree that using progress-rate
          would be enough.

          In a perfect world, we would only want to speculate on a task if 1) we
          calculated its expected finish time on the node it's currently running on based
          on % of task remaining and speed of the node it is running on, and then 2) also
          calculated its expected finish time on the new node that we are considering
          launching it on speculatively based on running it from the beginning at the
          speed of the new node.

          As such, we would only ever launch speculative tasks on nodes that are faster
          than the node they are currently running on, which makes it seem like progress-rate
          would be enough. However, sometimes just being faster isn't enough since it
          doesn't guarantee that the speculative task will finish before the original
          because the speculative task has to catch up with work that has already been
          completed by the current task attempt.

          For example, TT1 comes in asking for a speculative task. Let's say that TT1 can
          progress at a rate of 3% of task progress per second. We look at tasks running
          on TT2 and TT3 to decide which one to speculate.

          TT2 is 90% done with task A and is progressing at 1% of progress per second. TT3
          is 10% done with task B and is progressing at 2% of progress per second.

          In this case, sorting based on progress rate would mean we speculate task A because
          progress is being made more slowly on that task. However, if you think about it,
          TT2 would have finished task A in another 10 seconds. If we speculate it, TT1
          will get 30% done with it and then get killed because TT2 finished the task.

          On the other hand, TT3 would have required another 45 seconds to finish task B.
          If we speculate task B on TT1, it will finish the task in 33 seconds which is
          faster than TT3 could have done it in.

          In this case, using expected time to completion would be the right thing to do.

          This was the motivation for LATE (as we described in the OSDI paper).

          2) I will work on this soon (this weekend)

          3) I will remove that code

          4) Using tip.getDispatchTime() instead of tip.getExecStartTime() should do the
          trick right?

          I will work on all of this ASAP (hopefully over the weekend).

          Show
          Andy Konwinski added a comment - Devaraj, addressing your points: 1) Why do you think that progress rate would be better than expected time to completion? We thought about this in the OSDI paper a bit, here is what we said: "The primary insight behind our scheduler is the fol- lowing: We always speculatively execute the task that we think will finish farthest into the future, because this task provides the greatest opportunity for a speculative task to overtake the original and save a significant amount of time." If speculative tasks didn't have to redo (potentially a lot of) work that the original task attempt already completed, I would agree that using progress-rate would be enough. In a perfect world, we would only want to speculate on a task if 1) we calculated its expected finish time on the node it's currently running on based on % of task remaining and speed of the node it is running on, and then 2) also calculated its expected finish time on the new node that we are considering launching it on speculatively based on running it from the beginning at the speed of the new node. As such, we would only ever launch speculative tasks on nodes that are faster than the node they are currently running on, which makes it seem like progress-rate would be enough. However, sometimes just being faster isn't enough since it doesn't guarantee that the speculative task will finish before the original because the speculative task has to catch up with work that has already been completed by the current task attempt. For example, TT1 comes in asking for a speculative task. Let's say that TT1 can progress at a rate of 3% of task progress per second. We look at tasks running on TT2 and TT3 to decide which one to speculate. TT2 is 90% done with task A and is progressing at 1% of progress per second. TT3 is 10% done with task B and is progressing at 2% of progress per second. In this case, sorting based on progress rate would mean we speculate task A because progress is being made more slowly on that task. However, if you think about it, TT2 would have finished task A in another 10 seconds. If we speculate it, TT1 will get 30% done with it and then get killed because TT2 finished the task. On the other hand, TT3 would have required another 45 seconds to finish task B. If we speculate task B on TT1, it will finish the task in 33 seconds which is faster than TT3 could have done it in. In this case, using expected time to completion would be the right thing to do. This was the motivation for LATE (as we described in the OSDI paper). 2) I will work on this soon (this weekend) 3) I will remove that code 4) Using tip.getDispatchTime() instead of tip.getExecStartTime() should do the trick right? I will work on all of this ASAP (hopefully over the weekend).
          Hide
          Devaraj Das added a comment -

          In the future, we may consider the overall performance of TTs w.r.t all tasks it has run so far and make some decisions based on that for this special case.

          I meant tasks across all jobs.

          Show
          Devaraj Das added a comment - In the future, we may consider the overall performance of TTs w.r.t all tasks it has run so far and make some decisions based on that for this special case. I meant tasks across all jobs.
          Hide
          Devaraj Das added a comment -

          Went through the patch. Looks good for the most part. Some comments:
          1) I don't think we need to sort tasks on expected time to completion. Sorting on progress-rate should be good enough. On a related note, we will have a fix pretty soon that will smoothen the progress curve pretty soon (HADOOP-5572).

          2) In the patch, jobs will always launch speculative tasks if given a chance. We should launch speculative tasks only for the slower tasks. Offline, you were saying that you would implement the std deviation technique and launch spec tasks only for the ones that are lagging behind in their progress rates by 1 standard deviation. We should implement some technique like that.

          3) You had commented:

          It might make more sense to just assume that nodes who haven't reported back progress (regardless if they have been assigned a task for this job or not) are not laggards.

          I am +1 for this. Please remove the code where you check whether a TT ran a task at all before deciding it is a laggard or not. I guess we can give it the benefit of doubt. In the future, we may consider the overall performance of TTs w.r.t all tasks it has run so far and make some decisions based on that for this special case.

          4) The statistics update upon successful completion of a task has a problem - the tip.execStartTime is global to all attempts and is not per attempt. So the tipduration wouldn't be always reflective of the time the current TT took to run the task (take a case where an attempt failed on some other TT and got reexecuted on this TT).

          Show
          Devaraj Das added a comment - Went through the patch. Looks good for the most part. Some comments: 1) I don't think we need to sort tasks on expected time to completion. Sorting on progress-rate should be good enough. On a related note, we will have a fix pretty soon that will smoothen the progress curve pretty soon ( HADOOP-5572 ). 2) In the patch, jobs will always launch speculative tasks if given a chance. We should launch speculative tasks only for the slower tasks. Offline, you were saying that you would implement the std deviation technique and launch spec tasks only for the ones that are lagging behind in their progress rates by 1 standard deviation. We should implement some technique like that. 3) You had commented: It might make more sense to just assume that nodes who haven't reported back progress (regardless if they have been assigned a task for this job or not) are not laggards. I am +1 for this. Please remove the code where you check whether a TT ran a task at all before deciding it is a laggard or not. I guess we can give it the benefit of doubt. In the future, we may consider the overall performance of TTs w.r.t all tasks it has run so far and make some decisions based on that for this special case. 4) The statistics update upon successful completion of a task has a problem - the tip.execStartTime is global to all attempts and is not per attempt. So the tipduration wouldn't be always reflective of the time the current TT took to run the task (take a case where an attempt failed on some other TT and got reexecuted on this TT).
          Andy Konwinski made changes -
          Attachment HADOOP-2141.v8.patch [ 12408228 ]
          Hide
          Andy Konwinski added a comment - - edited

          Responding to Devaraj's comments:

          re 1) You are right, they were redundant as far as I can tell. I have removed the mostRecentStartTime and am now only using dispatchTime. It is now updated in TaskInProgress.getTaskToRun(), not JobTracker.assignTasks().

          re 2) Devaraj, what you are saying makes sense about locality, and I think we need to think about this a bit more, but I want to get this patch submitted with the changes and bug fixes I have done now.

          Also, some other comments:

          A) I have updated isSlowTracker() to better handle the case where a task tracker hasn't successfully completed a task for this job yet. In the last patch (v8) I was just assuming that it was a laggard in such cases to be safe. Now I am checking if the TT has been assigned a task for this job or not yet. If it hasn't then we give it the benefit of the doubt, if it has been assigned a task but hasn't finished the task yet then we don't speculate on it. This should address the case Deveraj pointed out earlier of running in a cluster that has more nodes than we have tasks or adding at task tracker during the middle of a long job. It might make more sense to just assume that nodes who haven't reported back progress (regardless if they have been assigned a task for this job or not) are not laggards.

          B) Finally, Devaraj caught two very serious bugs in my math in isSlowTracker. My current implementation of DataStatistics.std() calculates the variance, not the standard deviation. I should have been using the square root of my formula. Also, I was considering trackers with faster tasks to be the laggards, it should obviously be trackers with slower tasks that are considered the laggards.

          Walking through an example (given by Devaraj):

          2 trackers runs 3 maps each. TT1 takes 1 second to run each map. TT2 takes 2 seconds to run each map. Assuming these figures, let's compute mapTaskStats.mean() and mapTaskStats.std(), and, TT1.mean()/std(). Now if you assume that TT1 comes asking for a task, TT1 will be declared as slow. That should not happen.

          The mapTaskStats.mean() would be 1.5 at the end of the 6 tasks. MapTaskStats.std() would be 0.25 (2.5 - 1.5*1.5). TT1's mean() would be 1. The check in isSlowTracker is would evaluate to true since (1 < (1.5 - 0.25)) (assuming slowNodeThreshold is 1). This is obviously wrong.

          After fixing the bugs, for the numbers above, neither tracker would be considered a laggard:

          mapTaskStats.mean() = (1+1+1+2+2+2)/6 = 1.5

          mapTaskStats.sumSquares = (1^2 + 1^2 + 1^2 + 2^2 + 2^2 + 2^2) = 15
          mapTaskStats.std() = [sumSquares/6 - mean*mean](1/2) = [15/6 - 1.5*1.5](1/2) = (0.25)^(1/2) = (0.5)

          Now since we are using the default one standard deviation, we expect that no more than 1/2 of the tasks will be considered slow. This is shown by the One-sided Chebyshev inequality (http://en.wikipedia.org/w/index.php?title=Chebyshev%27s_inequality#Variant:_One-sided_Chebyshev_inequality)

          Now, we consider a task tracker to be slow if (tracker's task mean - mapTaskStats.mean > maptaskStats.std * slowNodeThreshold).

          • for TT1: (tt1.mean - mapTaskStats.mean > mapTaskStats.std) == (1 - 1.5 > 0.5) == (-0.5 > 0.5) == false
          • for TT2: (tt2.mean - mapTaskStats.mean > mapTaskStats.std) == (2 - 1.5 > 0.5) == (0.5 > 0.5) == false
          Show
          Andy Konwinski added a comment - - edited Responding to Devaraj's comments: re 1) You are right, they were redundant as far as I can tell. I have removed the mostRecentStartTime and am now only using dispatchTime. It is now updated in TaskInProgress.getTaskToRun(), not JobTracker.assignTasks(). re 2) Devaraj, what you are saying makes sense about locality, and I think we need to think about this a bit more, but I want to get this patch submitted with the changes and bug fixes I have done now. Also, some other comments: A) I have updated isSlowTracker() to better handle the case where a task tracker hasn't successfully completed a task for this job yet. In the last patch (v8) I was just assuming that it was a laggard in such cases to be safe. Now I am checking if the TT has been assigned a task for this job or not yet. If it hasn't then we give it the benefit of the doubt, if it has been assigned a task but hasn't finished the task yet then we don't speculate on it. This should address the case Deveraj pointed out earlier of running in a cluster that has more nodes than we have tasks or adding at task tracker during the middle of a long job. It might make more sense to just assume that nodes who haven't reported back progress (regardless if they have been assigned a task for this job or not) are not laggards. B) Finally, Devaraj caught two very serious bugs in my math in isSlowTracker. My current implementation of DataStatistics.std() calculates the variance, not the standard deviation. I should have been using the square root of my formula. Also, I was considering trackers with faster tasks to be the laggards, it should obviously be trackers with slower tasks that are considered the laggards. Walking through an example (given by Devaraj): 2 trackers runs 3 maps each. TT1 takes 1 second to run each map. TT2 takes 2 seconds to run each map. Assuming these figures, let's compute mapTaskStats.mean() and mapTaskStats.std(), and, TT1.mean()/std(). Now if you assume that TT1 comes asking for a task, TT1 will be declared as slow. That should not happen. The mapTaskStats.mean() would be 1.5 at the end of the 6 tasks. MapTaskStats.std() would be 0.25 (2.5 - 1.5*1.5). TT1's mean() would be 1. The check in isSlowTracker is would evaluate to true since (1 < (1.5 - 0.25)) (assuming slowNodeThreshold is 1). This is obviously wrong. – After fixing the bugs, for the numbers above, neither tracker would be considered a laggard: mapTaskStats.mean() = (1+1+1+2+2+2)/6 = 1.5 mapTaskStats.sumSquares = (1^2 + 1^2 + 1^2 + 2^2 + 2^2 + 2^2) = 15 mapTaskStats.std() = [sumSquares/6 - mean*mean] (1/2) = [15/6 - 1.5*1.5] (1/2) = (0.25)^(1/2) = (0.5) Now since we are using the default one standard deviation, we expect that no more than 1/2 of the tasks will be considered slow. This is shown by the One-sided Chebyshev inequality ( http://en.wikipedia.org/w/index.php?title=Chebyshev%27s_inequality#Variant:_One-sided_Chebyshev_inequality ) Now, we consider a task tracker to be slow if (tracker's task mean - mapTaskStats.mean > maptaskStats.std * slowNodeThreshold). for TT1: (tt1.mean - mapTaskStats.mean > mapTaskStats.std) == (1 - 1.5 > 0.5) == (-0.5 > 0.5) == false for TT2: (tt2.mean - mapTaskStats.mean > mapTaskStats.std) == (2 - 1.5 > 0.5) == (0.5 > 0.5) == false
          Hide
          Devaraj Das added a comment -

          I am going through the patch. Some early comments:
          1) I don't understand the motivation of having two time fields - dispatchTime and mostRecentStartTime. Seems like both of them is updated in the same code flow - mostRecentStartTime is updated in TaskInProgress.getTaskToRun and dispatchTime is updated in the place just after assignTasks in JobTracker. But getTaskToRun is anyway called from within assignTasks .. so why have two fields representing the same information
          2) The locality code seems quite redundant actually. The locality aspect actually conflicts with the algorithm for choosing tasks to speculate. In the current codebase (unpatched), we get the running tasks list based on locality w.r.t the tracker that just came in asking for a task, and then see if something can be speculatively run. In the patch, all running tasks are sorted globally w.r.t progress rates and expected-time-to-completion and a task from that list is handed out. Locality could only be a coincidence here at best. I will ponder some more whether to leave that code around or simplify it to remove the locality aspects for running tasks.
          Now, coming to Eric's concern about a slow disk slowing the progress of a task, if the speculative task also starts reading input from the same replica, then yes, there is a problem. So yes, this is an interesting area for further research!

          Show
          Devaraj Das added a comment - I am going through the patch. Some early comments: 1) I don't understand the motivation of having two time fields - dispatchTime and mostRecentStartTime. Seems like both of them is updated in the same code flow - mostRecentStartTime is updated in TaskInProgress.getTaskToRun and dispatchTime is updated in the place just after assignTasks in JobTracker. But getTaskToRun is anyway called from within assignTasks .. so why have two fields representing the same information 2) The locality code seems quite redundant actually. The locality aspect actually conflicts with the algorithm for choosing tasks to speculate. In the current codebase (unpatched), we get the running tasks list based on locality w.r.t the tracker that just came in asking for a task, and then see if something can be speculatively run. In the patch, all running tasks are sorted globally w.r.t progress rates and expected-time-to-completion and a task from that list is handed out. Locality could only be a coincidence here at best. I will ponder some more whether to leave that code around or simplify it to remove the locality aspects for running tasks. Now, coming to Eric's concern about a slow disk slowing the progress of a task, if the speculative task also starts reading input from the same replica, then yes, there is a problem. So yes, this is an interesting area for further research!
          Hide
          Andy Konwinski added a comment -

          The current patch contains the changes discussed (see my responses below).

          2. We are now using the the task dispatch time from the JT as the base time to estimate progress so that the time estimates are accurate and also account for potential laggard behavior of a node due to network problems/latency.

          5. I've put locality preference back in for speculative maps.

          6. I implemented isSlowTracker as I described above, with the number of standard deviations that a TT has to be below the global average specified in the conf file (with a default of 1 std).

          7. I just removed this filter, we now allow speculation if there is more than one task.

          • Also, I changed the behavior of the filter that only allows tasks that have run for more than a minute to be speculated. To do this for now, I've set it to 0, which means that tasks aren't being filtered, but this way we can keep an eye out while testing and easily turn it back on if we want the filter back. I think it is just a remnant of the original speculative execution heuristic.

          I have been testing this patch on small sort jobs on a 10 node EC2 cluster for a couple of days now. I've been simulating laggards by running nice -n -20 ruby -e "while true;;end" loops also dd if=/dev/zero of=/tmp/tmpfile bs=100000. Hopefully large scale testing will flush out any bugs I've missed.

          Other thoughts and some ideas for near term future work:

          • As we've talked about some already, after this patch gets tested and committed, we should update the way we calculate task progress, probably normalizing by data input size to task. Also, we might think about using only the first two phases of the reduce tasks to estimate the performance of Task Trackers because we know more about their behavior.
          • We should further improve isSlowTracker() with regards to how we handle Task Trackers that have not reported any successful tasks for this job. Right now if a TT is a laggard and 1) is really slow, or 2) was added to the cluster near the end of a job, or 3) the job is smaller than the cluster size and is thus spreading out its tasks thinly; then the task tracker might not have reported a successful task by the time we start looking to run speculative tasks. In this case we don't know if the task tracker is a laggard since we use a TT's history to determine if it is slow or not. Currently, we just assume it might be a laggard and thus isSlowTracker() will return true. In the near future it will be better to allow assignment of a spec task to a TT if:
            1) the TT has run at least one successful task for this job already and it's average task duration is less than slowNodeThreshold standard deviations below the average task duration of all completed tasks for this job.
            2) if the TT has not been assigned any tasks for this job yet (i.e. has been assigned a task for this job bus the task has not completed yet)
          • Finally, we might want to think up some unit test cases for speculative execution.
          Show
          Andy Konwinski added a comment - The current patch contains the changes discussed (see my responses below). 2. We are now using the the task dispatch time from the JT as the base time to estimate progress so that the time estimates are accurate and also account for potential laggard behavior of a node due to network problems/latency. 5. I've put locality preference back in for speculative maps. 6. I implemented isSlowTracker as I described above, with the number of standard deviations that a TT has to be below the global average specified in the conf file (with a default of 1 std). 7. I just removed this filter, we now allow speculation if there is more than one task. Also, I changed the behavior of the filter that only allows tasks that have run for more than a minute to be speculated. To do this for now, I've set it to 0, which means that tasks aren't being filtered, but this way we can keep an eye out while testing and easily turn it back on if we want the filter back. I think it is just a remnant of the original speculative execution heuristic. I have been testing this patch on small sort jobs on a 10 node EC2 cluster for a couple of days now. I've been simulating laggards by running nice -n -20 ruby -e "while true;;end" loops also dd if=/dev/zero of=/tmp/tmpfile bs=100000. Hopefully large scale testing will flush out any bugs I've missed. Other thoughts and some ideas for near term future work: As we've talked about some already, after this patch gets tested and committed, we should update the way we calculate task progress, probably normalizing by data input size to task. Also, we might think about using only the first two phases of the reduce tasks to estimate the performance of Task Trackers because we know more about their behavior. We should further improve isSlowTracker() with regards to how we handle Task Trackers that have not reported any successful tasks for this job. Right now if a TT is a laggard and 1) is really slow, or 2) was added to the cluster near the end of a job, or 3) the job is smaller than the cluster size and is thus spreading out its tasks thinly; then the task tracker might not have reported a successful task by the time we start looking to run speculative tasks. In this case we don't know if the task tracker is a laggard since we use a TT's history to determine if it is slow or not. Currently, we just assume it might be a laggard and thus isSlowTracker() will return true. In the near future it will be better to allow assignment of a spec task to a TT if: 1) the TT has run at least one successful task for this job already and it's average task duration is less than slowNodeThreshold standard deviations below the average task duration of all completed tasks for this job. 2) if the TT has not been assigned any tasks for this job yet (i.e. has been assigned a task for this job bus the task has not completed yet) Finally, we might want to think up some unit test cases for speculative execution.
          Andy Konwinski made changes -
          Attachment HADOOP-2141.v7.patch [ 12407448 ]
          Hide
          Andy Konwinski added a comment -

          Updated patch with bug fixes and feedback factored in.

          Show
          Andy Konwinski added a comment - Updated patch with bug fixes and feedback factored in.
          Hide
          Devaraj Das added a comment -

          I forgot to mention one point earlier. TTs that didn't get a chance to run virgin tasks of a job earlier (maybe because the #tasks were fewer than the number of nodes in the cluster) should be able to run speculative tasks of the same job (if any). In the current patch it seems like this won't happen.

          Show
          Devaraj Das added a comment - I forgot to mention one point earlier. TTs that didn't get a chance to run virgin tasks of a job earlier (maybe because the #tasks were fewer than the number of nodes in the cluster) should be able to run speculative tasks of the same job (if any). In the current patch it seems like this won't happen.
          Hide
          Devaraj Das added a comment -

          Eric, I am not sure how much of a good gain would the locality aspect be. Considering that we will have very few tasks that we launch speculatively, the probability that a TT comes and gets a data local spec task would be quite low IMO. But yes, it makes sense to keep the existing logic for running node/rack local speculative task around.. So I'd suggest something like:
          if (TT is not slow) {
          if (exists node-local task that is running slower than others)

          { assign that task to the TT }

          else

          { assign some task from the higher level rack-cache if available; else look at the entire list of running TIPs to find a slow task }

          }
          The above is essentially the same as what happens in today's trunk. The only additional constraint we are adding here is the check for whether a TT is GOOD (meets the criteria for running spec tasks).

          Show
          Devaraj Das added a comment - Eric, I am not sure how much of a good gain would the locality aspect be. Considering that we will have very few tasks that we launch speculatively, the probability that a TT comes and gets a data local spec task would be quite low IMO. But yes, it makes sense to keep the existing logic for running node/rack local speculative task around.. So I'd suggest something like: if (TT is not slow) { if (exists node-local task that is running slower than others) { assign that task to the TT } else { assign some task from the higher level rack-cache if available; else look at the entire list of running TIPs to find a slow task } } The above is essentially the same as what happens in today's trunk. The only additional constraint we are adding here is the check for whether a TT is GOOD (meets the criteria for running spec tasks).
          Hide
          eric baldeschwieler added a comment -

          Can't you use the the task dispatch time from the JT as the base time to estimate progress? That would be consistent and a lot simpler than push responsibility to the TT.

          The assertion that it is ok to run a speculative remotely is a little worrisome. It seems to me we should try to run them locally if possible. After all, we might be running speculative execution exactly because a task is slow due to networking issues... I'd like to see this tested. Another issue is that the disk the source data is on may be overloaded or slow. So a preference to run local to another copy of the data be optimal for maps...

          Seems like a rich area for further research once this first patch is in. Andy, Devaraj - Let's coordinate the validation of this patch on Y! resource. We have some good test cases from the terasort contest.

          Show
          eric baldeschwieler added a comment - Can't you use the the task dispatch time from the JT as the base time to estimate progress? That would be consistent and a lot simpler than push responsibility to the TT. The assertion that it is ok to run a speculative remotely is a little worrisome. It seems to me we should try to run them locally if possible. After all, we might be running speculative execution exactly because a task is slow due to networking issues... I'd like to see this tested. Another issue is that the disk the source data is on may be overloaded or slow. So a preference to run local to another copy of the data be optimal for maps... Seems like a rich area for further research once this first patch is in. Andy, Devaraj - Let's coordinate the validation of this patch on Y! resource. We have some good test cases from the terasort contest.
          Hide
          Andy Konwinski added a comment -

          Hi Devaraj, Thanks for the code review. I have a lot of your comments implemented already and am working on the more significant ones still, should have a new patch ready in the next 2 to 3 days. Before then, though I wanted to post an update on my progress and respond to some of your suggestions to allow for wider input. Patch to come soon!

          1. Done

          2. The problem with this is that we want to be able to identify laggard tasks even when they are not reporting progress. I.e. if we don't get a TaskStatus update for that task from the TT (perhaps because the TT is down, or the task is hanging) we want it to appear slower as time goes on from the JT's perspective.

          3. Currently it is a field because I am only recalculating the task tracker ordering every SLOW_TRACKER_SORT_DELAY minutes (currently set to 2 min) so we have to keep the progress rate scores around between those sorts. However, since I'm rewriting isSlowTracker() anyway (see 6 below) this is no longer relevant.

          4. Done

          5. Done

          6. I've spoken with Matei about this and we've decided that the mean and variance (i.e. is the ave ProgressRate of tasks that finished on the tracker less than a standard deviation below the ave ProgressRate of tasks on other trackers) to determine if a TaskTracker is slow is much better than using a percentile. The current plan is to create new class, DataStatistics, used to track statistics for a set of numbers (by storing count, sum, and sum of squares). DataStatistics will provide mean() and std() functions. The object will be used at two levels:

          • a field of JobInProgress, taskStats, for tracking stats of all tasks
          • a map field of JobInProgress, trackerStats, with key of type TaskTracker name, value of type DataStatistics

          Updating of the statistics data structures above will be a constant time operation done when TaskTrackers report tasks as complete.

          All of this makes isSlowTracker() really simple. Basically it consists of:
          if (trackerStats.get(taskTracker).mean() < taskStats.mean() - taskStats.std())

          { return true; }

          7. Let's use a percentage instead (10%?)

          -------
          One other comment: while discussing 2 above with Devaraj and Matei, we think it is important to more closely consider the mechanism used to calculate a task's progress rate. The mechanism we're using in the patch so far (i.e., using task's (progress/currentTime - startTime)) which can be seen in TaskStatus.updateProgressRate, might be improved by looking more closely at how to normalize the amount of time the task has been running by the amount of data it has processed (potentially phase-wise). When Matei and I wrote the original LATE paper, we didn't dig very deep into the task progress reporting mechanisms, but rather just used the progress as it was reported, while making note of some of the oddities re. the three phases. I am still trying to validate for myself how closely the progress as reported by tasks to the TaskTracker reflects the amount of data processed thus far. However, pending a deeper look into this, it might be advantageous to revisit the progressRate mechanism after we commit a simple version of the patch which uses progressRate as is (assuming that testing at scale shows performance improvements).

          Again, the patch will be up in the next few of days.
          Andy

          Show
          Andy Konwinski added a comment - Hi Devaraj, Thanks for the code review. I have a lot of your comments implemented already and am working on the more significant ones still, should have a new patch ready in the next 2 to 3 days. Before then, though I wanted to post an update on my progress and respond to some of your suggestions to allow for wider input. Patch to come soon! 1. Done 2. The problem with this is that we want to be able to identify laggard tasks even when they are not reporting progress. I.e. if we don't get a TaskStatus update for that task from the TT (perhaps because the TT is down, or the task is hanging) we want it to appear slower as time goes on from the JT's perspective. 3. Currently it is a field because I am only recalculating the task tracker ordering every SLOW_TRACKER_SORT_DELAY minutes (currently set to 2 min) so we have to keep the progress rate scores around between those sorts. However, since I'm rewriting isSlowTracker() anyway (see 6 below) this is no longer relevant. 4. Done 5. Done 6. I've spoken with Matei about this and we've decided that the mean and variance (i.e. is the ave ProgressRate of tasks that finished on the tracker less than a standard deviation below the ave ProgressRate of tasks on other trackers) to determine if a TaskTracker is slow is much better than using a percentile. The current plan is to create new class, DataStatistics, used to track statistics for a set of numbers (by storing count, sum, and sum of squares). DataStatistics will provide mean() and std() functions. The object will be used at two levels: a field of JobInProgress, taskStats, for tracking stats of all tasks a map field of JobInProgress, trackerStats, with key of type TaskTracker name, value of type DataStatistics Updating of the statistics data structures above will be a constant time operation done when TaskTrackers report tasks as complete. All of this makes isSlowTracker() really simple. Basically it consists of: if (trackerStats.get(taskTracker).mean() < taskStats.mean() - taskStats.std()) { return true; } 7. Let's use a percentage instead (10%?) ------- One other comment: while discussing 2 above with Devaraj and Matei, we think it is important to more closely consider the mechanism used to calculate a task's progress rate. The mechanism we're using in the patch so far (i.e., using task's (progress/currentTime - startTime)) which can be seen in TaskStatus.updateProgressRate, might be improved by looking more closely at how to normalize the amount of time the task has been running by the amount of data it has processed (potentially phase-wise). When Matei and I wrote the original LATE paper, we didn't dig very deep into the task progress reporting mechanisms, but rather just used the progress as it was reported, while making note of some of the oddities re. the three phases. I am still trying to validate for myself how closely the progress as reported by tasks to the TaskTracker reflects the amount of data processed thus far. However, pending a deeper look into this, it might be advantageous to revisit the progressRate mechanism after we commit a simple version of the patch which uses progressRate as is (assuming that testing at scale shows performance improvements). Again, the patch will be up in the next few of days. Andy
          Hide
          Devaraj Das added a comment -

          Ok some more points:
          1. The method TaskInProgress.canBeSpeculated should include a check for !skipping (as in the trunk's TaskInProgress.hasSpeculativeTask)

          2. The progressRate in the TaskStatus is computed by the JobTracker. But the startTime in that computation is what the TaskTracker saw. This is a potential problem if the clocks in the cluster nodes are not synchronized. I am wondering whether it makes sense to have the TaskTrackers compute progress rates and let the JT know. Also, there is a bug for the very first status report, updateProgress would be called without a valid startTime (since startTime is read later on in TaskStatus.readFields)

          3. Why does progRateScores in isSlowTracker need to be a JobInProgress field (as opposed to it being a local variable)?

          4. The init for the speculative thresholds can go to a separate method and the two JobInProgress constructors can invoke that.

          5. You should change JobInProgress.scheduleMap to not create running map caches for each level. Instead just have one flat list for running tasks (earlier findSpeculativeTask would try to schedule tasks based on locality but it makes sense to ignore locality for speculative tasks since they will be very few). That will mean that you also change your new method getSpeculativeMap to not do lookup on the cache and instead just look at this list. Furthermore, the retireMap method need to be changed in a similar way. Overall, these changes will bring the handling of running-map-tasks close to how running-reduces is handled (runningReduces is the list).

          6. The Collection<TaskInProgress> that isSlowTracker is passed will be only the running TIPs. But isSlowTracker should take into account the TIPs that ran successfully as well (and in the implementation of isSlowTracker, you indeed make the check 'state == TaskStatus.State.SUCCEEDED' ). In order to implement that, you probably need to maintain a global HashMap of TT -> progress-rate-of-completed-tasks in the JobInProgress. The HashMap should be updated with the progressrate whenever a task sucessfully completes (just like the updates to the entries in progRateScores for running tasks). It also means that you need to have a global completedTaskCount (similar to taskCount for running tasks).

          7. findSpeculativeTask in the patch returns null when the list has fewer items than MIN_TASKS_FOR_SPECULATIVE_FILTER (which is hardcoded to 10). The corner case here is if a job has only that many tasks to run in total, no speculative tasks would be run at all. I wonder whether we should have that check at all..

          Show
          Devaraj Das added a comment - Ok some more points: 1. The method TaskInProgress.canBeSpeculated should include a check for !skipping (as in the trunk's TaskInProgress.hasSpeculativeTask) 2. The progressRate in the TaskStatus is computed by the JobTracker. But the startTime in that computation is what the TaskTracker saw. This is a potential problem if the clocks in the cluster nodes are not synchronized. I am wondering whether it makes sense to have the TaskTrackers compute progress rates and let the JT know. Also, there is a bug for the very first status report, updateProgress would be called without a valid startTime (since startTime is read later on in TaskStatus.readFields) 3. Why does progRateScores in isSlowTracker need to be a JobInProgress field (as opposed to it being a local variable)? 4. The init for the speculative thresholds can go to a separate method and the two JobInProgress constructors can invoke that. 5. You should change JobInProgress.scheduleMap to not create running map caches for each level. Instead just have one flat list for running tasks (earlier findSpeculativeTask would try to schedule tasks based on locality but it makes sense to ignore locality for speculative tasks since they will be very few). That will mean that you also change your new method getSpeculativeMap to not do lookup on the cache and instead just look at this list. Furthermore, the retireMap method need to be changed in a similar way. Overall, these changes will bring the handling of running-map-tasks close to how running-reduces is handled (runningReduces is the list). 6. The Collection<TaskInProgress> that isSlowTracker is passed will be only the running TIPs. But isSlowTracker should take into account the TIPs that ran successfully as well (and in the implementation of isSlowTracker, you indeed make the check 'state == TaskStatus.State.SUCCEEDED' ). In order to implement that, you probably need to maintain a global HashMap of TT -> progress-rate-of-completed-tasks in the JobInProgress. The HashMap should be updated with the progressrate whenever a task sucessfully completes (just like the updates to the entries in progRateScores for running tasks). It also means that you need to have a global completedTaskCount (similar to taskCount for running tasks). 7. findSpeculativeTask in the patch returns null when the list has fewer items than MIN_TASKS_FOR_SPECULATIVE_FILTER (which is hardcoded to 10). The corner case here is if a job has only that many tasks to run in total, no speculative tasks would be run at all. I wonder whether we should have that check at all..
          Hide
          Devaraj Das added a comment -

          Andy, the current patch doesn't apply unless the the fuzz factor is set to 3 - "patch -p0 -F 3 < HADOOP-2141-v6.patch". There is a NPE, and you can reproduce that if you run the test TestMiniMRDFSSort - "ant -Dtestcase=TestMiniMRDFSSort test -Dtest.output=yes", in the heartbeat method and the test never comes out since the TTs continues to resend the heartbeat forever. The NPE comes from isSlowTracker method. Looking more closely at the isSlowTracker method, i think that requires some rework. The isSlowTracker method currently looks at progress rates of only the running TIPs (although you do a check for TaskStatus.State.SUCCEEDED but this would be always false for RUNNING tips, and that is what is passed to the method) and attaches that to the TaskTrackers that are running them. But wouldn't you want to look at the history, i.e., successful TIPs that ran on the TaskTrackers.
          I am thinking that it would make sense to give one credit to a TT upon running a task successfully and base isSlowTracker purely on that (rather than the running tasks).. That way, even the TT's progress can be maintained inline and you wouldn't have to iterate over the running TIPs and compute that upon a TT heartbeat.. Thoughts?

          Show
          Devaraj Das added a comment - Andy, the current patch doesn't apply unless the the fuzz factor is set to 3 - "patch -p0 -F 3 < HADOOP-2141 -v6.patch". There is a NPE, and you can reproduce that if you run the test TestMiniMRDFSSort - "ant -Dtestcase=TestMiniMRDFSSort test -Dtest.output=yes", in the heartbeat method and the test never comes out since the TTs continues to resend the heartbeat forever. The NPE comes from isSlowTracker method. Looking more closely at the isSlowTracker method, i think that requires some rework. The isSlowTracker method currently looks at progress rates of only the running TIPs (although you do a check for TaskStatus.State.SUCCEEDED but this would be always false for RUNNING tips, and that is what is passed to the method) and attaches that to the TaskTrackers that are running them. But wouldn't you want to look at the history, i.e., successful TIPs that ran on the TaskTrackers. I am thinking that it would make sense to give one credit to a TT upon running a task successfully and base isSlowTracker purely on that (rather than the running tasks).. That way, even the TT's progress can be maintained inline and you wouldn't have to iterate over the running TIPs and compute that upon a TT heartbeat.. Thoughts?
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12403769/HADOOP-2141-v6.patch
          against trunk revision 761838.

          +1 @author. The patch does not contain any @author tags.

          -1 tests included. The patch doesn't appear to include any new or modified tests.
          Please justify why no tests are needed for this patch.

          -1 patch. The patch command could not apply the patch.

          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch-vesta.apache.org/153/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12403769/HADOOP-2141-v6.patch against trunk revision 761838. +1 @author. The patch does not contain any @author tags. -1 tests included. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. -1 patch. The patch command could not apply the patch. Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch-vesta.apache.org/153/console This message is automatically generated.
          Andy Konwinski made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Affects Version/s 0.21.0 [ 12313563 ]
          Affects Version/s 0.19.0 [ 12313211 ]
          Hide
          Andy Konwinski added a comment -

          Responding to Devaraj's comments:

          "The field TaskInProgress.mostRecentStartTime is updated with the same value of execStartTime each time (since execStartTime is updated only once in the life of the TIP). Did you mean to do this?"

          No, good catch. mostRecentStartTime should be updated with the current time each time getTaskToRun is called. I have made this change.

          "They should be decremented in TIP.incompleteSubTask and TIP.completedTask (basically, places where activeTasks.remove) is done. The decrement should happen if activeTasks.size for the TIP is >1. Makes sense?"

          Thanks to Devaraj for writing the decrementSpeculativeCount() function, which is called from failedTask() and completedTask(). I have replaced the countSpeculating() function call in atSpeculativeCap() with the sum of speculativeMapTasks+speculativeReduceTasks.

          "Couldn't it be checked whether TIP.isComplete() returns true before launching a speculative attempt?"

          Yes, I think this could be done as an optimization. It would add a little bit of complexity though and before making too many more changes maybe it would be good to test the current functionality. Again, it would be nice if we could get a few people to test the performance impact of this patch at scale.

          Show
          Andy Konwinski added a comment - Responding to Devaraj's comments: "The field TaskInProgress.mostRecentStartTime is updated with the same value of execStartTime each time (since execStartTime is updated only once in the life of the TIP). Did you mean to do this?" No, good catch. mostRecentStartTime should be updated with the current time each time getTaskToRun is called. I have made this change. "They should be decremented in TIP.incompleteSubTask and TIP.completedTask (basically, places where activeTasks.remove) is done. The decrement should happen if activeTasks.size for the TIP is >1. Makes sense?" Thanks to Devaraj for writing the decrementSpeculativeCount() function, which is called from failedTask() and completedTask(). I have replaced the countSpeculating() function call in atSpeculativeCap() with the sum of speculativeMapTasks+speculativeReduceTasks. "Couldn't it be checked whether TIP.isComplete() returns true before launching a speculative attempt?" Yes, I think this could be done as an optimization. It would add a little bit of complexity though and before making too many more changes maybe it would be good to test the current functionality. Again, it would be nice if we could get a few people to test the performance impact of this patch at scale.
          Andy Konwinski made changes -
          Attachment HADOOP-2141-v6.patch [ 12403769 ]
          Hide
          Andy Konwinski added a comment -

          updated patch with Devaraj's suggestions factored in.

          Show
          Andy Konwinski added a comment - updated patch with Devaraj's suggestions factored in.
          Hide
          Devaraj Das added a comment -

          The field TaskInProgress.mostRecentStartTime is updated with the same value of execStartTime each time (since execStartTime is updated only once in the life of the TIP). Did you mean to do this?

          I am getting a little lost digging through the code trying to figure out where these variables would need to be decremented at

          They should be decremented in TIP.incompleteSubTask and TIP.completedTask (basically, places where activeTasks.remove) is done. The decrement should happen if activeTasks.size for the TIP is >1. Makes sense?

          I can't find the TaskCommitThread that it references

          Yes this comment shouldn't be there. TaskCommitThread used to be there at one point..

          there would be a possibility of speculating a task that has already completed.

          Couldn't it be checked whether TIP.isComplete() returns true before launching a speculative attempt?

          Show
          Devaraj Das added a comment - The field TaskInProgress.mostRecentStartTime is updated with the same value of execStartTime each time (since execStartTime is updated only once in the life of the TIP). Did you mean to do this? I am getting a little lost digging through the code trying to figure out where these variables would need to be decremented at They should be decremented in TIP.incompleteSubTask and TIP.completedTask (basically, places where activeTasks.remove) is done. The decrement should happen if activeTasks.size for the TIP is >1. Makes sense? I can't find the TaskCommitThread that it references Yes this comment shouldn't be there. TaskCommitThread used to be there at one point.. there would be a possibility of speculating a task that has already completed. Couldn't it be checked whether TIP.isComplete() returns true before launching a speculative attempt?
          Hide
          Devaraj Das added a comment -

          I wanted to double check with Devaraj that you didn't add any new functionality or bug fixes in your patch, but instead that it was just merging with trunk

          No, it was a plain merge

          In particular I noticed some properties that your patch adds to mapred-default.xml that don't seem to be related to this JIRA or used in the rest of the patch (e.g. mapred.shuffle.maxFetchPerHost). Were these included intentionally?

          Sorry these shouldn't have been there. I meant to just include the configs you added.

          Show
          Devaraj Das added a comment - I wanted to double check with Devaraj that you didn't add any new functionality or bug fixes in your patch, but instead that it was just merging with trunk No, it was a plain merge In particular I noticed some properties that your patch adds to mapred-default.xml that don't seem to be related to this JIRA or used in the rest of the patch (e.g. mapred.shuffle.maxFetchPerHost). Were these included intentionally? Sorry these shouldn't have been there. I meant to just include the configs you added.
          Andy Konwinski made changes -
          Attachment HADOOP-2141-v5.patch [ 12400597 ]
          Hide
          Andy Konwinski added a comment -

          First, I found a significant bug in the current patch in the logic of isSlowTracker() that turns sums of each taskTrackers tasks to averages. This attached updated patch contains the bug fix.

          Devaraj, regarding your suggestion of removing countSpeculating() in favor of having class fields which maintain counts of running speculative map tasks and reduces, I agree that this might perform better, and it is easy to increment this variable in the correct spot (i.e. in getSpeculativeMap() and getSpeculativeReduce()), however it isn't as clear where to decrement the counts. They need to be decremented when a speculative task is killed or completes, and the code that manages this state transition seems to be convoluted since there are a number of scenarios that are handled (failed task trackers, speculative task attempt succeeds, speculative attempt is killed because original attempt succeeds). I am getting a little lost digging through the code trying to figure out where these variables would need to be decremented at. There is a comment in JobInProgress.completedTask() that says "TaskCommitThread in the JobTracker marks other, completed, speculative tasks as complete." but I can't find the TaskCommitThread that it references and I don't think that just adjusting the counts when speculative tasks complete (as opposed to being killed or failing) would be enough. My vote is that we put this off for now.

          Regarding modifications to keep the sorted list of candidates around, one potential problem I see with this is if a task that is cached in the sorted list of tips we are keeping around finishes before we recompute the sorted list again, then there would be a possibility of speculating a task that has already completed.

          I have implemented your suggestion to keep a list of task trackers around, and have set the time to 2 minutes (using the SLOW_TRACKER_SORT_DELAY constant).

          One thing that I think is important is to test the effects of this patch on MapReduce performance since a lot of the code base has changed and also this patch is quite different than the one we used for the experiments in the OSDI paper.

          Finally, I wanted to double check with Devaraj that you didn't add any new functionality or bug fixes in your patch, but instead that it was just merging with trunk (and putting the default values for the parameters in mapred-default.xml instead of hadoop-default.xml). In particular I noticed some properties that your patch adds to mapred-default.xml that don't seem to be related to this JIRA or used in the rest of the patch (e.g. mapred.shuffle.maxFetchPerHost). Were these included intentionally?

          Show
          Andy Konwinski added a comment - First, I found a significant bug in the current patch in the logic of isSlowTracker() that turns sums of each taskTrackers tasks to averages. This attached updated patch contains the bug fix. Devaraj, regarding your suggestion of removing countSpeculating() in favor of having class fields which maintain counts of running speculative map tasks and reduces, I agree that this might perform better, and it is easy to increment this variable in the correct spot (i.e. in getSpeculativeMap() and getSpeculativeReduce()), however it isn't as clear where to decrement the counts. They need to be decremented when a speculative task is killed or completes, and the code that manages this state transition seems to be convoluted since there are a number of scenarios that are handled (failed task trackers, speculative task attempt succeeds, speculative attempt is killed because original attempt succeeds). I am getting a little lost digging through the code trying to figure out where these variables would need to be decremented at. There is a comment in JobInProgress.completedTask() that says "TaskCommitThread in the JobTracker marks other, completed, speculative tasks as complete ." but I can't find the TaskCommitThread that it references and I don't think that just adjusting the counts when speculative tasks complete (as opposed to being killed or failing) would be enough. My vote is that we put this off for now. Regarding modifications to keep the sorted list of candidates around, one potential problem I see with this is if a task that is cached in the sorted list of tips we are keeping around finishes before we recompute the sorted list again, then there would be a possibility of speculating a task that has already completed. I have implemented your suggestion to keep a list of task trackers around, and have set the time to 2 minutes (using the SLOW_TRACKER_SORT_DELAY constant). One thing that I think is important is to test the effects of this patch on MapReduce performance since a lot of the code base has changed and also this patch is quite different than the one we used for the experiments in the OSDI paper. Finally, I wanted to double check with Devaraj that you didn't add any new functionality or bug fixes in your patch, but instead that it was just merging with trunk (and putting the default values for the parameters in mapred-default.xml instead of hadoop-default.xml). In particular I noticed some properties that your patch adds to mapred-default.xml that don't seem to be related to this JIRA or used in the rest of the patch (e.g. mapred.shuffle.maxFetchPerHost). Were these included intentionally?
          Hide
          Devaraj Das added a comment -

          From the performance point of view, actually it might make sense to keep the sorted list of candidates around (as Andy had mentioned in point (J) in the recent comment). Also, the method isSlowTracker needn't compute whether the tracker is slow every time per a job. It should be okay to to do it once every 2-3 minutes. The assumption is that tasktrackers shouldn't become slow suddenly and that's probably a reasonable assumption..

          Show
          Devaraj Das added a comment - From the performance point of view, actually it might make sense to keep the sorted list of candidates around (as Andy had mentioned in point (J) in the recent comment). Also, the method isSlowTracker needn't compute whether the tracker is slow every time per a job. It should be okay to to do it once every 2-3 minutes. The assumption is that tasktrackers shouldn't become slow suddenly and that's probably a reasonable assumption..
          Hide
          Devaraj Das added a comment -

          After going through the patch briefly, I noticed there were a few iterations over the JobInProgress's Running TIPs/TaskStatus list. At least one such count that countSpeculating currently computes upon demand can be maintained in real time?

          Show
          Devaraj Das added a comment - After going through the patch briefly, I noticed there were a few iterations over the JobInProgress's Running TIPs/TaskStatus list. At least one such count that countSpeculating currently computes upon demand can be maintained in real time?
          Devaraj Das made changes -
          Attachment 2141.patch [ 12400261 ]
          Hide
          Devaraj Das added a comment -

          The attached patch is w.r.t the trunk rev# 744794.

          Show
          Devaraj Das added a comment - The attached patch is w.r.t the trunk rev# 744794.
          Hide
          Devaraj Das added a comment -

          I am working on merging the patch with the trunk.

          Show
          Devaraj Das added a comment - I am working on merging the patch with the trunk.
          Nigel Daley made changes -
          Fix Version/s 0.20.0 [ 12313438 ]
          Arun C Murthy made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Hide
          Arun C Murthy added a comment -

          Andy, this is looking good!

          Unfortunately this won't apply anymore - could you please update this? Thanks!

          Show
          Arun C Murthy added a comment - Andy, this is looking good! Unfortunately this won't apply anymore - could you please update this? Thanks!
          Andy Konwinski made changes -
          Attachment HADOOP-2141-v4.patch [ 12395077 ]
          Hide
          Andy Konwinski added a comment -

          Updating the patch so that it applies cleanly still.

          Show
          Andy Konwinski added a comment - Updating the patch so that it applies cleanly still.
          Arun C Murthy made changes -
          Affects Version/s 0.20.0 [ 12313438 ]
          Affects Version/s 0.19.0 [ 12313211 ]
          Fix Version/s 0.20.0 [ 12313438 ]
          Fix Version/s 0.19.0 [ 12313211 ]
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12392157/HADOOP-2141-v3.patch
          against trunk revision 704748.

          +1 @author. The patch does not contain any @author tags.

          -1 tests included. The patch doesn't appear to include any new or modified tests.
          Please justify why no tests are needed for this patch.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 Eclipse classpath. The patch retains Eclipse classpath integrity.

          -1 core tests. The patch failed core unit tests.

          -1 contrib tests. The patch failed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3463/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3463/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3463/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3463/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12392157/HADOOP-2141-v3.patch against trunk revision 704748. +1 @author. The patch does not contain any @author tags. -1 tests included. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 Eclipse classpath. The patch retains Eclipse classpath integrity. -1 core tests. The patch failed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3463/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3463/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3463/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3463/console This message is automatically generated.
          Andy Konwinski made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          Affects Version/s 0.20.0 [ 12313438 ]
          Fix Version/s 0.19.0 [ 12313211 ]
          Release Note Updated speculative execution scheduler
          Affects Version/s 0.19.0 [ 12313211 ]
          Andy Konwinski made changes -
          Attachment HADOOP-2141-v3.patch [ 12392157 ]
          Hide
          Andy Konwinski added a comment -

          Thanks Arun, I'll address your comments inline:

          • JobInProgress.getSpeculative {Map|Reduce} are both called from synchronized methods i.e. JobInProgress.findNew{Map|Reduce}

            Task; hence please mark these as synchronized too, just to be future-proof.
            A) Done

          • JobInProgress.findSpeculativeTask's 'shouldRemove' parameter is always passed in as 'false' (from getSpeculative {Map|Reduce}

            ) ... do we even need this parameter?
            B) I removed the parameter. It was left over from when Hadoop used to pass a TIP list that was specific per machine

          • JobInProgress.isTaskSlowEnoughToSpeculate gets mapred.speculative.execution.slowTaskThreshold from the JobConf always - we should just cache that in a private variable.
            C) Done

          -Ditto for JobInProgress.isSlowTracker/mapred.speculative.execution.slowNodeThreshold
          D) Done

          -and JobInProgress.atSpeculativeCap/mapred.speculative.execution.speculativeCap.
          E) Done

          -(Also please remove the LOG.info for the config variable in JobInProgress.isTaskSlowEnoughToSpeculate).
          F) Done

          • JobInProgress.findSpeculativeTask gets a List of TIPs, it then proceeds to convert that to an TIP[] for JobInProgress.isSlowTracker etc. - we should just get all apis to work with List<TIP> and do away with that conversion.
            G) Done
          • Can we keep a running count of 'progress' of TaskTrackers' tasks rather than recompute them each time in JobInProgress.isSlowTracker? For large jobs it might be significant...
            H) In this patch (v3), we are calling task.getProgressRate() on each task in the ProgressRateComparator which returns the progressRate score for that task (which isn't computed on the spot, it is updated asynchronously when the progress for that tip is reported). if we were to keep a running count in JobInProgress that the TIPs are responsible for updating as they make progress with some sort of callback, that seems like a lot of added complexity plus the overhead for that data structure to push the updates to when we only use them while looking for speculative tasks, which is a relatively infrequent operation. Thus I still see this pull model as better.
          • JobInProgress.isTaskSlowEnoughToSpeculate really bothers me. It is called from inside a loop (i.e. for each TIP) and it sorts the progress of each TIP. This is potentially very expensive. At the very least we should sort the the TIPs once and even better - we should maintain a PriorityQueue of TIPs based on their progress.
            I) I eliminated the isTaskSlowEnoughToSpeculate function and the inner loop behavior you pointed out by pulling everything into findSpeculativeTask, which adds another sort operation to the already existing sort in findSpeculativeTask (see J below).
          • I'm guessing that sorting 'candidate speculative tasks' in JobInProgress.findSpeculativeTask isn't prohibitively expensive since the number of candidates is fairly small, could you please confirm?
            J) As of this patch, we are using a double sorting behavior, which I don't see a good way around for now. The first sort is to be sure we only launch speculative tasks which are actually slow, the second one is to decide amongst those slow tasks based on their expected completion time and this second sort is considerably smaller (since it operates on the chopped down set of candidates). The first sort will be sorting through all running tasks, which for large MapReduce jobs will be in the tens of thousands, right? However, remember that the progress of each task is not computed at access time (see H above). We can't keep a snapshot of sorted progress around for very long before it grows stale, but I still think that switching to a push model (from the perspective of the tasks at progress update time) will add the overhead of a heap insert for progress updates of every task for the entire job, when we really only care about tasks that are running while we look for speculative tasks (which is hopefully only at the end of map or reduce stages of a job).

          If this is a concern, as an intermediate step before switching to a heap, we could keep the sorted list of candidates around and only recompute it at most every X (5?) seconds.

          • Minor: Please adhere to the 80 character limit per-line.
            K) Done

          ========
          Another thought: we are currently doing the first sort/chop in findSpeculativeTask to grab the slowest 25% of the tasks currently running. We originally intended slowTaskThreshold to decide if a task was slow enough to speculate, which would imply that if all currently running tasks appear approximately equally slow, then none should be launched for speculative execution. However, that is not the current behavior, which suggests we might want to use variance of progressRate instead of just sorting tasks by progressRate and taking the "slowest" 25% (which would be an arbitrary 25% of the tasks if they all had the same progressRate).

          This version of the patch differs enough from the code we were using for our experiments in the OSDI paper that we want to do some more testing with it (maybe even gridmix), but we wanted to get it up for other eyes to look at right away, hopefully aiming for 0.20 with this one.

          Show
          Andy Konwinski added a comment - Thanks Arun, I'll address your comments inline: JobInProgress.getSpeculative {Map|Reduce} are both called from synchronized methods i.e. JobInProgress.findNew{Map|Reduce} Task; hence please mark these as synchronized too, just to be future-proof. A) Done JobInProgress.findSpeculativeTask's 'shouldRemove' parameter is always passed in as 'false' (from getSpeculative {Map|Reduce} ) ... do we even need this parameter? B) I removed the parameter. It was left over from when Hadoop used to pass a TIP list that was specific per machine JobInProgress.isTaskSlowEnoughToSpeculate gets mapred.speculative.execution.slowTaskThreshold from the JobConf always - we should just cache that in a private variable. C) Done -Ditto for JobInProgress.isSlowTracker/mapred.speculative.execution.slowNodeThreshold D) Done -and JobInProgress.atSpeculativeCap/mapred.speculative.execution.speculativeCap. E) Done -(Also please remove the LOG.info for the config variable in JobInProgress.isTaskSlowEnoughToSpeculate). F) Done JobInProgress.findSpeculativeTask gets a List of TIPs, it then proceeds to convert that to an TIP[] for JobInProgress.isSlowTracker etc. - we should just get all apis to work with List<TIP> and do away with that conversion. G) Done Can we keep a running count of 'progress' of TaskTrackers' tasks rather than recompute them each time in JobInProgress.isSlowTracker? For large jobs it might be significant... H) In this patch (v3), we are calling task.getProgressRate() on each task in the ProgressRateComparator which returns the progressRate score for that task (which isn't computed on the spot, it is updated asynchronously when the progress for that tip is reported). if we were to keep a running count in JobInProgress that the TIPs are responsible for updating as they make progress with some sort of callback, that seems like a lot of added complexity plus the overhead for that data structure to push the updates to when we only use them while looking for speculative tasks, which is a relatively infrequent operation. Thus I still see this pull model as better. JobInProgress.isTaskSlowEnoughToSpeculate really bothers me. It is called from inside a loop (i.e. for each TIP) and it sorts the progress of each TIP. This is potentially very expensive. At the very least we should sort the the TIPs once and even better - we should maintain a PriorityQueue of TIPs based on their progress. I) I eliminated the isTaskSlowEnoughToSpeculate function and the inner loop behavior you pointed out by pulling everything into findSpeculativeTask, which adds another sort operation to the already existing sort in findSpeculativeTask (see J below). I'm guessing that sorting 'candidate speculative tasks' in JobInProgress.findSpeculativeTask isn't prohibitively expensive since the number of candidates is fairly small, could you please confirm? J) As of this patch, we are using a double sorting behavior, which I don't see a good way around for now. The first sort is to be sure we only launch speculative tasks which are actually slow, the second one is to decide amongst those slow tasks based on their expected completion time and this second sort is considerably smaller (since it operates on the chopped down set of candidates). The first sort will be sorting through all running tasks, which for large MapReduce jobs will be in the tens of thousands, right? However, remember that the progress of each task is not computed at access time (see H above). We can't keep a snapshot of sorted progress around for very long before it grows stale, but I still think that switching to a push model (from the perspective of the tasks at progress update time) will add the overhead of a heap insert for progress updates of every task for the entire job, when we really only care about tasks that are running while we look for speculative tasks (which is hopefully only at the end of map or reduce stages of a job). If this is a concern, as an intermediate step before switching to a heap, we could keep the sorted list of candidates around and only recompute it at most every X (5?) seconds. Minor: Please adhere to the 80 character limit per-line. K) Done ======== Another thought: we are currently doing the first sort/chop in findSpeculativeTask to grab the slowest 25% of the tasks currently running. We originally intended slowTaskThreshold to decide if a task was slow enough to speculate, which would imply that if all currently running tasks appear approximately equally slow, then none should be launched for speculative execution. However, that is not the current behavior, which suggests we might want to use variance of progressRate instead of just sorting tasks by progressRate and taking the "slowest" 25% (which would be an arbitrary 25% of the tasks if they all had the same progressRate). This version of the patch differs enough from the code we were using for our experiments in the OSDI paper that we want to do some more testing with it (maybe even gridmix), but we wanted to get it up for other eyes to look at right away, hopefully aiming for 0.20 with this one.
          Hide
          Arun C Murthy added a comment -

          Looking through this patch, a few comments:

          1. JobInProgress.getSpeculative {Map|Reduce} are both called from synchronized methods i.e. JobInProgress.findNew{Map|Reduce}

            Task; hence please mark these as synchronized too, just to be future-proof.

          2. JobInProgress.findSpeculativeTask's 'shouldRemove' parameter is always passed in as 'false' (from getSpeculative {Map|Reduce}

            ) ... do we even need this parameter?

          3. JobInProgress.isTaskSlowEnoughToSpeculate gets mapred.speculative.execution.slowTaskThreshold from the JobConf always - we should just cache that in a private variable. Ditto for JobInProgress.isSlowTracker/mapred.speculative.execution.slowNodeThreshold and JobInProgress.atSpeculativeCap/mapred.speculative.execution.speculativeCap. (Also please remove the LOG.info for the config variable in JobInProgress.isTaskSlowEnoughToSpeculate).
          4. JobInProgress.findSpeculativeTask gets a List of TIPs, it then proceeds to convert that to an TIP[] for JobInProgress.isSlowTracker etc. - we should just get all apis to work with List<TIP> and do away with that conversion.
          5. Can we keep a running count of 'progress' of TaskTrackers' tasks rather than recompute them each time in JobInProgress.isSlowTracker? For large jobs it might be significant...
          6. JobInProgress.isTaskSlowEnoughToSpeculate really bothers me. It is called from inside a loop (i.e. for each TIP) and it sorts the progress of each TIP. This is potentially very expensive. At the very least we should sort the the TIPs once and even better - we should maintain a PriorityQueue of TIPs based on their progress.
          7. I'm guessing that sorting 'candidate speculative tasks' in JobInProgress.findSpeculativeTask isn't prohibitively expensive since the number of candidates is fairly small, could you please confirm?
          8. Minor: Please adhere to the 80 character limit per-line.
          Show
          Arun C Murthy added a comment - Looking through this patch, a few comments: JobInProgress.getSpeculative {Map|Reduce} are both called from synchronized methods i.e. JobInProgress.findNew{Map|Reduce} Task; hence please mark these as synchronized too, just to be future-proof. JobInProgress.findSpeculativeTask's 'shouldRemove' parameter is always passed in as 'false' (from getSpeculative {Map|Reduce} ) ... do we even need this parameter? JobInProgress.isTaskSlowEnoughToSpeculate gets mapred.speculative.execution.slowTaskThreshold from the JobConf always - we should just cache that in a private variable. Ditto for JobInProgress.isSlowTracker/mapred.speculative.execution.slowNodeThreshold and JobInProgress.atSpeculativeCap/mapred.speculative.execution.speculativeCap. (Also please remove the LOG.info for the config variable in JobInProgress.isTaskSlowEnoughToSpeculate). JobInProgress.findSpeculativeTask gets a List of TIPs, it then proceeds to convert that to an TIP[] for JobInProgress.isSlowTracker etc. - we should just get all apis to work with List<TIP> and do away with that conversion. Can we keep a running count of 'progress' of TaskTrackers' tasks rather than recompute them each time in JobInProgress.isSlowTracker? For large jobs it might be significant... JobInProgress.isTaskSlowEnoughToSpeculate really bothers me. It is called from inside a loop (i.e. for each TIP) and it sorts the progress of each TIP. This is potentially very expensive. At the very least we should sort the the TIPs once and even better - we should maintain a PriorityQueue of TIPs based on their progress. I'm guessing that sorting 'candidate speculative tasks' in JobInProgress.findSpeculativeTask isn't prohibitively expensive since the number of candidates is fairly small, could you please confirm? Minor: Please adhere to the 80 character limit per-line.
          Andy Konwinski made changes -
          Attachment HADOOP-2141-v2.patch [ 12389273 ]
          Hide
          Andy Konwinski added a comment -

          Updated to incorporate the findbug suggestions and to apply cleanly

          Show
          Andy Konwinski added a comment - Updated to incorporate the findbug suggestions and to apply cleanly
          Owen O'Malley made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Hide
          Owen O'Malley added a comment -

          I'm sorry, but this patch doesn't apply cleanly any more. Can you update it Andy?

          Show
          Owen O'Malley added a comment - I'm sorry, but this patch doesn't apply cleanly any more. Can you update it Andy?
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12388788/HADOOP-2141.patch
          against trunk revision 688985.

          +1 @author. The patch does not contain any @author tags.

          -1 tests included. The patch doesn't appear to include any new or modified tests.
          Please justify why no tests are needed for this patch.

          -1 javadoc. The javadoc tool appears to have generated 1 warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          -1 findbugs. The patch appears to introduce 3 new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          -1 core tests. The patch failed core unit tests.

          -1 contrib tests. The patch failed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3106/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3106/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3106/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3106/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12388788/HADOOP-2141.patch against trunk revision 688985. +1 @author. The patch does not contain any @author tags. -1 tests included. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. -1 javadoc. The javadoc tool appears to have generated 1 warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 3 new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. -1 core tests. The patch failed core unit tests. -1 contrib tests. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3106/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3106/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3106/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3106/console This message is automatically generated.
          Andy Konwinski made changes -
          Affects Version/s 0.19.0 [ 12313211 ]
          Status Open [ 1 ] Patch Available [ 10002 ]
          Affects Version/s 0.15.0 [ 12312565 ]
          Hide
          Andy Konwinski added a comment -

          This patch implements a new algorithm for speculative execution.

          When a TaskTracker asks for another task, if there are no more tasks which haven't been tried at least once, and if there aren't any tasks which have failed and need to be re-run, then a speculative task may potentially be assigned. There are three configurable thresholds. A speculative task will be chosen with the following algorithm:

          1) check to be sure that there are less than SpeculativeCap speculative tasks running
          2) Ignore the request if the TaksTracker's progressRate percentile is < SlowNodeThreshold
          3) Rank currently running, non-speculative tasks by their estimated time left
          4) Chose the task with highest-ranked progressRate where progessRate < SlowTaskThreshold

          Show
          Andy Konwinski added a comment - This patch implements a new algorithm for speculative execution. When a TaskTracker asks for another task, if there are no more tasks which haven't been tried at least once, and if there aren't any tasks which have failed and need to be re-run, then a speculative task may potentially be assigned. There are three configurable thresholds. A speculative task will be chosen with the following algorithm: 1) check to be sure that there are less than SpeculativeCap speculative tasks running 2) Ignore the request if the TaksTracker's progressRate percentile is < SlowNodeThreshold 3) Rank currently running, non-speculative tasks by their estimated time left 4) Chose the task with highest-ranked progressRate where progessRate < SlowTaskThreshold
          Andy Konwinski made changes -
          Attachment HADOOP-2141.patch [ 12388788 ]
          Hide
          Andy Konwinski added a comment -

          This patch implements a new algorithm for choosing tasks for speculative execution.
          Here is a high level overview of the algorithm:

          If a task slot becomes available and there are less than SpeculativeCap speculative tasks running:

          • 1)Ignore the request if the TT's progressRate is < SlowNodeThreshold
          • 2)Rank currently running, non-speculative tasks by estimated time left
          • 3)Speculate task with highest-ranked and progressRate < SlowTaskThreshold
          Show
          Andy Konwinski added a comment - This patch implements a new algorithm for choosing tasks for speculative execution. Here is a high level overview of the algorithm: If a task slot becomes available and there are less than SpeculativeCap speculative tasks running: 1)Ignore the request if the TT's progressRate is < SlowNodeThreshold 2)Rank currently running, non-speculative tasks by estimated time left 3)Speculate task with highest-ranked and progressRate < SlowTaskThreshold
          Andy Konwinski made changes -
          Assignee Arun C Murthy [ acmurthy ] Andy Konwinski [ andyk ]
          Runping Qi made changes -
          Link This issue relates to HADOOP-2177 [ HADOOP-2177 ]
          Runping Qi made changes -
          Link This issue relates to HADOOP-2933 [ HADOOP-2933 ]
          Robert Chansler made changes -
          Fix Version/s 0.17.0 [ 12312913 ]
          Hide
          Runping Qi added a comment -

          I think it is reasonable to assume that all the tasks progress in similar speed.
          Speculative execution condition should be based on thecondition that a task progress considerably slower than the average.

          Show
          Runping Qi added a comment - I think it is reasonable to assume that all the tasks progress in similar speed. Speculative execution condition should be based on thecondition that a task progress considerably slower than the average.
          Hide
          Amar Kamat added a comment -

          Consider a scenario where the reducer does some kind of fancy, hi-fi and time consuming work based on some events. One such example would be to do some kind of learning for specific words. So in some cases the reducers will finish off faster (the main logic is never triggered) while in some case it might take a lot longer. So comparing the speed across reducers might not be good idea in this case. Also taking the progress rate might not be a good idea since sometimes the reducer just scans and sometimes it does some fancy computing. Hence in such cases speculation wont help. So my question is

          • should we consider the above mentioned scenario ( and similar others) into account (if possible) or is it too much to consider
          • how much is the performance overhead ( across jobs/cluster usage/utility) in case of speculations
          Show
          Amar Kamat added a comment - Consider a scenario where the reducer does some kind of fancy , hi-fi and time consuming work based on some events. One such example would be to do some kind of learning for specific words. So in some cases the reducers will finish off faster (the main logic is never triggered) while in some case it might take a lot longer. So comparing the speed across reducers might not be good idea in this case. Also taking the progress rate might not be a good idea since sometimes the reducer just scans and sometimes it does some fancy computing. Hence in such cases speculation wont help. So my question is should we consider the above mentioned scenario ( and similar others) into account (if possible) or is it too much to consider how much is the performance overhead ( across jobs/cluster usage/utility) in case of speculations
          Hide
          Amar Kamat added a comment - - edited

          Dumping some logs from my recent runs

          2008-02-08 15:09:19,459 INFO org.apache.hadoop.mapred.TaskInProgress: Error from task_200802080908_0005_r_001459_0: Task task_200802080908_0005_r_001459_0 failed to report status for 605 seconds. Killing!
          2008-02-08 15:09:19,460 INFO org.apache.hadoop.mapred.JobTracker: Removed completed task 'task_200802080908_0005_r_001459_0' from 'tracker'
          2008-02-08 15:09:19,474 INFO org.apache.hadoop.mapred.TaskRunner: Discarded output of task 'task_200802080908_0005_r_001459_0' - 
          2008-02-08 15:09:19,513 INFO org.apache.hadoop.mapred.JobInProgress: Choosing normal task tip_200802080908_0005_r_001459
          2008-02-08 15:09:19,514 INFO org.apache.hadoop.mapred.JobTracker: Adding task 'task_200802080908_0005_r_001459_1' to tip tip_200802080908_0005_r_001459, for tracker 'tracker'
          2008-02-08 15:09:19,517 INFO org.apache.hadoop.mapred.JobInProgress: Choosing speculative task tip_200802080908_0005_r_001459
          2008-02-08 15:09:19,517 INFO org.apache.hadoop.mapred.JobTracker: Adding task 'task_200802080908_0005_r_001459_2' to tip tip_200802080908_0005_r_001459, for tracker'
          

          The main task and speculative one got executed back to back.

          Show
          Amar Kamat added a comment - - edited Dumping some logs from my recent runs 2008-02-08 15:09:19,459 INFO org.apache.hadoop.mapred.TaskInProgress: Error from task_200802080908_0005_r_001459_0: Task task_200802080908_0005_r_001459_0 failed to report status for 605 seconds. Killing! 2008-02-08 15:09:19,460 INFO org.apache.hadoop.mapred.JobTracker: Removed completed task 'task_200802080908_0005_r_001459_0' from 'tracker' 2008-02-08 15:09:19,474 INFO org.apache.hadoop.mapred.TaskRunner: Discarded output of task 'task_200802080908_0005_r_001459_0' - 2008-02-08 15:09:19,513 INFO org.apache.hadoop.mapred.JobInProgress: Choosing normal task tip_200802080908_0005_r_001459 2008-02-08 15:09:19,514 INFO org.apache.hadoop.mapred.JobTracker: Adding task 'task_200802080908_0005_r_001459_1' to tip tip_200802080908_0005_r_001459, for tracker 'tracker' 2008-02-08 15:09:19,517 INFO org.apache.hadoop.mapred.JobInProgress: Choosing speculative task tip_200802080908_0005_r_001459 2008-02-08 15:09:19,517 INFO org.apache.hadoop.mapred.JobTracker: Adding task 'task_200802080908_0005_r_001459_2' to tip tip_200802080908_0005_r_001459, for tracker' The main task and speculative one got executed back to back.
          Hide
          Joydeep Sen Sarma added a comment -

          just wanted to leave behind some recent experiences. we have started running with gap=0.9 and only turned on spec. for maps. this is working out pretty well. i see very few spec. tasks - but they do show up when things are stuck - which is exactly what we want.

          the only remaining flaw is that hasSpeculative() should not be using 'progress' - but rather 'progress/running time'. the rationale is that a late task is always going to have low progress (even after the lag time) - so what matters is progress made relative to the run-time. if on average tasks require 5 minutes to make 20% progress then making 5% progress in 1 minute is perfectly fine (but today may cause spec. execution even for our conservative setting).

          Show
          Joydeep Sen Sarma added a comment - just wanted to leave behind some recent experiences. we have started running with gap=0.9 and only turned on spec. for maps. this is working out pretty well. i see very few spec. tasks - but they do show up when things are stuck - which is exactly what we want. the only remaining flaw is that hasSpeculative() should not be using 'progress' - but rather 'progress/running time'. the rationale is that a late task is always going to have low progress (even after the lag time) - so what matters is progress made relative to the run-time. if on average tasks require 5 minutes to make 20% progress then making 5% progress in 1 minute is perfectly fine (but today may cause spec. execution even for our conservative setting).
          Hide
          Joydeep Sen Sarma added a comment -

          we recently upgraded to 14.4 and had spec. execution on by default for a week or so. i am looking for a place to pass on some feedback and seems like we had a fine discussion going on here.

          for starters - speculative execution is a good optimization for us to have - we frequently see tasks stuck in 'Initializing' state (dunno why) and when speculative execution was turned on by default - that problem was largely resolved.

          however - we quickly realized that spec. execution was way too aggressive for us. almost always, we would have speculative reduce tasks when all was going well. given that we have a pretty small cluster, all those extra processes really started slowing us down. and we eventually had to turn the default setting to off. looking at the code, its clear that the protocol for speculative execution is not quite what we would want.

          the proposals in this thread seem interesting. there are a few additional things to consider though:

          • speculative execution should consider overall system load and be more or less aggressive depending on how busy it is.
          • if we wait until 90+% completion, our problematic case (tasks stuck for long time in initializing state) would not be handled well.
          • finally - if the tracker decides to execute speculatively - shouldn't the slower of the two tasks be killed?

          looking forward to a discussion. i would like to try something out in 0.14 itself since this is a pain point for us right now.

          Show
          Joydeep Sen Sarma added a comment - we recently upgraded to 14.4 and had spec. execution on by default for a week or so. i am looking for a place to pass on some feedback and seems like we had a fine discussion going on here. for starters - speculative execution is a good optimization for us to have - we frequently see tasks stuck in 'Initializing' state (dunno why) and when speculative execution was turned on by default - that problem was largely resolved. however - we quickly realized that spec. execution was way too aggressive for us. almost always, we would have speculative reduce tasks when all was going well. given that we have a pretty small cluster, all those extra processes really started slowing us down. and we eventually had to turn the default setting to off. looking at the code, its clear that the protocol for speculative execution is not quite what we would want. the proposals in this thread seem interesting. there are a few additional things to consider though: speculative execution should consider overall system load and be more or less aggressive depending on how busy it is. if we wait until 90+% completion, our problematic case (tasks stuck for long time in initializing state) would not be handled well. finally - if the tracker decides to execute speculatively - shouldn't the slower of the two tasks be killed? looking forward to a discussion. i would like to try something out in 0.14 itself since this is a pain point for us right now.
          Arun C Murthy made changes -
          Fix Version/s 0.17.0 [ 12312913 ]
          Fix Version/s 0.16.0 [ 12312740 ]
          Hide
          Arun C Murthy added a comment -

          Moving this to 0.17.0 as discussions are still on...

          Show
          Arun C Murthy added a comment - Moving this to 0.17.0 as discussions are still on...
          Hide
          Owen O'Malley added a comment -

          Ok, after talking to Koji, it looks like part of the problem was that the jobs were streaming and thus didn't time out after 10 minutes. I therefore filed HADOOP-2211 that proposes changing the default timeout for streaming to 1 hour instead of infinite.

          Show
          Owen O'Malley added a comment - Ok, after talking to Koji, it looks like part of the problem was that the jobs were streaming and thus didn't time out after 10 minutes. I therefore filed HADOOP-2211 that proposes changing the default timeout for streaming to 1 hour instead of infinite.
          Owen O'Malley made changes -
          Link This issue is related to HADOOP-2211 [ HADOOP-2211 ]
          Hide
          Runping Qi added a comment -

          I don't think this Jira is that urgent and we have to have a quick patch for it.
          I'd prefer to have a right framework in place to address the speculative execution policy, and in
          long term the task scheduling policy.

          It is important for the job tracker to collect accurate execution stats of the mappers/reducers
          and use the stats in task scheduling. I don't think that is complicated.

          In long term, it will be nice if the job tracker can obtain the machine
          specs of the task trackers (# of CPUs, Mem, disks, network info, etc), and the current load info on
          the machines, and use these data in scheduling.

          Show
          Runping Qi added a comment - I don't think this Jira is that urgent and we have to have a quick patch for it. I'd prefer to have a right framework in place to address the speculative execution policy, and in long term the task scheduling policy. It is important for the job tracker to collect accurate execution stats of the mappers/reducers and use the stats in task scheduling. I don't think that is complicated. In long term, it will be nice if the job tracker can obtain the machine specs of the task trackers (# of CPUs, Mem, disks, network info, etc), and the current load info on the machines, and use these data in scheduling.
          Hide
          Owen O'Malley added a comment -

          There are always going to be such corner cases, and proper speculative execution is the right solution to such problems.

          This is exactly backwards. Speculative execution absolutely can not be used as a reliability solution. Applications can and do turn it off. Therefore, the system must be completely reliable without speculative execution. Furthermore, if there is a "freezing" problem it may well strike the speculative task also, which would lock the job. Speculative execution is a pure optimization. It is not for reliability.

          Show
          Owen O'Malley added a comment - There are always going to be such corner cases, and proper speculative execution is the right solution to such problems. This is exactly backwards. Speculative execution absolutely can not be used as a reliability solution. Applications can and do turn it off. Therefore, the system must be completely reliable without speculative execution. Furthermore, if there is a "freezing" problem it may well strike the speculative task also, which would lock the job. Speculative execution is a pure optimization. It is not for reliability.
          Hide
          Arun C Murthy added a comment -

          Thanks for your comments Runping, some thoughts of my own...


          1. A speculative execution for a mapper (reducer) is started only if there are no pending non-speculative mappers (reducers)

          I believe this is already the case for choosing speculative tasks... I'll double-check.

          2. We should estimate the expected finish time for a mapper(reducer) based on its current progression state and progression rate. A speculative execution for a mapper (reducer) is starte only if the projected finish time is far away than the average execution time of mappers(reducers)

          Hmm... I'm concerned this could lead to some aggressively spawned reduce tasks in cases that Koji reported. Do you see a way to do this more conservatively and yet keep it simple?

          3. It is a bit treaky to compute the average execution of reducers. If a reducer started before the map phase completed, then the overalp period should be taken out.

          Ok, I agree in principle. Yet I'm concerned about whether this is an over-kill.
          We could subtract the time it took all mappers to finish... I'm not very sure.

          4. If a reducer is stucked at shuffling state, the real reason for the stall may be related to the machine(s) where the needed map outputs sit. Launching a speculative execution of the reducer may not help. In this case, we may need to declare the concerned mappers are gone and re-run them.

          I'm hoping HADOOP-1128, and more recently HADOOP-1984, take care of this; as long as we aren't too aggressive about starting speculative reduces.


          Overall, I'm very concerned about keeping this reasonably simple, atleast as a first-pass, till we have a chance to see this in action in the real-world. We can then iterate...

          Show
          Arun C Murthy added a comment - Thanks for your comments Runping, some thoughts of my own... 1. A speculative execution for a mapper (reducer) is started only if there are no pending non-speculative mappers (reducers) I believe this is already the case for choosing speculative tasks... I'll double-check. 2. We should estimate the expected finish time for a mapper(reducer) based on its current progression state and progression rate. A speculative execution for a mapper (reducer) is starte only if the projected finish time is far away than the average execution time of mappers(reducers) Hmm... I'm concerned this could lead to some aggressively spawned reduce tasks in cases that Koji reported. Do you see a way to do this more conservatively and yet keep it simple? 3. It is a bit treaky to compute the average execution of reducers. If a reducer started before the map phase completed, then the overalp period should be taken out. Ok, I agree in principle. Yet I'm concerned about whether this is an over-kill. We could subtract the time it took all mappers to finish... I'm not very sure. 4. If a reducer is stucked at shuffling state, the real reason for the stall may be related to the machine(s) where the needed map outputs sit. Launching a speculative execution of the reducer may not help. In this case, we may need to declare the concerned mappers are gone and re-run them. I'm hoping HADOOP-1128 , and more recently HADOOP-1984 , take care of this; as long as we aren't too aggressive about starting speculative reduces. Overall, I'm very concerned about keeping this reasonably simple, atleast as a first-pass, till we have a chance to see this in action in the real-world. We can then iterate...
          Hide
          Arun C Murthy added a comment -

          Is there a strong reason for disabling it for maps ?

          To me it looks like there is strong case for atleast treating maps and reduces separately, if not disabling for maps.

          Mostly maps run in the order of minutes, while reduces take much more time since they are waiting for maps to complete.
          E.g In sort500, maps take about a minute (outlier maps take 4mins) while reduces (atleast the first wave) take around an hour to complete.

          Given these (and I know there are wildly varying job characteristics) I'd like to be careful and ensure we aren't too aggressive while launching speculative, speculative-tasks (man! does that sound weird!).

          Hence, and keeping in mind that reduces are more expensive to execute willy-nilly speculatively I propose 4 parameters, to keep it reasonably simple:

          mapred.map.speculative.timegap = 2 x avg_map_completion_time
          mapred.reduce.speculative.timegap = 1.5 x avg_reduce_completion_time
          mapred.map.min.completion.for.speculation = 90
          mapred.reduce.min.completion.for.speculation = 95

          Thoughts?

          Show
          Arun C Murthy added a comment - Is there a strong reason for disabling it for maps ? To me it looks like there is strong case for atleast treating maps and reduces separately, if not disabling for maps. Mostly maps run in the order of minutes, while reduces take much more time since they are waiting for maps to complete. E.g In sort500, maps take about a minute (outlier maps take 4mins) while reduces (atleast the first wave) take around an hour to complete. Given these (and I know there are wildly varying job characteristics) I'd like to be careful and ensure we aren't too aggressive while launching speculative, speculative-tasks (man! does that sound weird!). Hence, and keeping in mind that reduces are more expensive to execute willy-nilly speculatively I propose 4 parameters, to keep it reasonably simple: mapred.map.speculative.timegap = 2 x avg_map_completion_time mapred.reduce.speculative.timegap = 1.5 x avg_reduce_completion_time mapred.map.min.completion.for.speculation = 90 mapred.reduce.min.completion.for.speculation = 95 Thoughts?
          Hide
          Runping Qi added a comment -

          The above proposal sounds reasonable.
          Here are some points to consider:

          1. A speculative execution for a mapper (reducer) is started
          only if there are no pending non-speculative mappers (reducers)

          2. We should estimate the expected finish time for a mapper(reducer) based on its
          current progression state and progression rate.
          A speculative execution for a mapper (reducer) is starte only if the projected finish time
          is far away than the average execution time of mappers(reducers)

          3. It is a bit treaky to compute the average execution of reducers.
          If a reducer started before the map phase completed, then the overalp period should \
          be taken out.

          4. If a reducer is stucked at shuffling state, the real reason for the stall may be related to the machine(s)
          where the needed map outputs sit. Launching a speculative execution of the reducer may not help.
          In this case, we may need to declare the concerned mappers are gone and re-run them.

          Show
          Runping Qi added a comment - The above proposal sounds reasonable. Here are some points to consider: 1. A speculative execution for a mapper (reducer) is started only if there are no pending non-speculative mappers (reducers) 2. We should estimate the expected finish time for a mapper(reducer) based on its current progression state and progression rate. A speculative execution for a mapper (reducer) is starte only if the projected finish time is far away than the average execution time of mappers(reducers) 3. It is a bit treaky to compute the average execution of reducers. If a reducer started before the map phase completed, then the overalp period should \ be taken out. 4. If a reducer is stucked at shuffling state, the real reason for the stall may be related to the machine(s) where the needed map outputs sit. Launching a speculative execution of the reducer may not help. In this case, we may need to declare the concerned mappers are gone and re-run them.
          Hide
          Milind Bhandarkar added a comment -

          In my past life, using 2x the average to determine outliers has worked well (I don't know the theory behind it Regarding other questions,

          We have seen stuck tasks mostly for reduces, (maybe because reduces write to DFS), but I would prefer a uniform treatment for tasks, regardless of map/reduce. Some streaming users do write to DFS in map tasks as side-effects (taking care of each attempt writing to a separate file/directory). This will help them as well.

          Is there a strong reason for disabling it for maps ?

          Show
          Milind Bhandarkar added a comment - In my past life, using 2x the average to determine outliers has worked well (I don't know the theory behind it Regarding other questions, We have seen stuck tasks mostly for reduces, (maybe because reduces write to DFS), but I would prefer a uniform treatment for tasks, regardless of map/reduce. Some streaming users do write to DFS in map tasks as side-effects (taking care of each attempt writing to a separate file/directory). This will help them as well. Is there a strong reason for disabling it for maps ?
          Hide
          Arun C Murthy added a comment -

          Here are some thoughts about how to go about it:

          I propose we track average completion time of maps and reduces (separately, of course) and spawn speculative tasks when the tasks are 1.5x or 2x slower (should we be more/less conservative). However, to ensure that the system isn't inundated with too many speculative tasks I propose that this comes into effect only when more than 90% or 95% of the tasks (of that kind) are complete.

          So, if we have 2000 reduces and average completion time of reduces is 60 minutes, we should launch speculative reduces iff 1800 reduces are done and a reducer has run for more than 90minutes, we spawn a new reduce task.

          Should we have disable this for maps?
          Should we have separate policies for maps and reduces (percentage and the running-time lag vis-a-vis completed tasks)?

          Thoughts?

          Show
          Arun C Murthy added a comment - Here are some thoughts about how to go about it: I propose we track average completion time of maps and reduces (separately, of course) and spawn speculative tasks when the tasks are 1.5x or 2x slower (should we be more/less conservative). However, to ensure that the system isn't inundated with too many speculative tasks I propose that this comes into effect only when more than 90% or 95% of the tasks (of that kind) are complete. So, if we have 2000 reduces and average completion time of reduces is 60 minutes, we should launch speculative reduces iff 1800 reduces are done and a reducer has run for more than 90minutes, we spawn a new reduce task. Should we have disable this for maps? Should we have separate policies for maps and reduces (percentage and the running-time lag vis-a-vis completed tasks)? Thoughts?
          Hide
          Milind Bhandarkar added a comment -

          As I mentioned earlier, we have been seeing this problem more and more over the last few weeks. One of the bugs discovered by Koji is that dfsclient to datanode writes do not have a timeout, so if a datanode goes bad during the write, the entire task gets stuck. There are always going to be such corner cases, and proper speculative execution is the right solution to such problems.

          I am a bit ambivalent about making this a fix for 0.16, rather than 0.15.1, because

          1. This will make us postpone debugging the real issues
          2. Critical projects will have to wait for a long time for the 0.16 release to be deployed.

          Weighing these two against each other, I would rather have this fix, rather than debugging all the issues that can possibly cause a task to get stuck. Thoughts ?

          Show
          Milind Bhandarkar added a comment - As I mentioned earlier, we have been seeing this problem more and more over the last few weeks. One of the bugs discovered by Koji is that dfsclient to datanode writes do not have a timeout, so if a datanode goes bad during the write, the entire task gets stuck. There are always going to be such corner cases, and proper speculative execution is the right solution to such problems. I am a bit ambivalent about making this a fix for 0.16, rather than 0.15.1, because 1. This will make us postpone debugging the real issues 2. Critical projects will have to wait for a long time for the 0.16 release to be deployed. Weighing these two against each other, I would rather have this fix, rather than debugging all the issues that can possibly cause a task to get stuck. Thoughts ?
          Owen O'Malley made changes -
          Fix Version/s 0.15.1 [ 12312850 ]
          Priority Blocker [ 1 ] Major [ 3 ]
          Fix Version/s 0.16.0 [ 12312740 ]
          Hide
          Arun C Murthy added a comment -

          More importantly I'd like to make sure that the original blocker (if any) isn't hidden away by the speculative-execution bandaid...

          Show
          Arun C Murthy added a comment - More importantly I'd like to make sure that the original blocker (if any) isn't hidden away by the speculative-execution bandaid...
          Hide
          Owen O'Malley added a comment -

          This jira as stated is not a blocker. The blocker is why the maps are getting stuck. But you haven't provided any information about that. The change to the speculative execution policy is a change and should be 0.16 only.

          Show
          Owen O'Malley added a comment - This jira as stated is not a blocker. The blocker is why the maps are getting stuck. But you haven't provided any information about that. The change to the speculative execution policy is a change and should be 0.16 only.
          Hide
          Arun C Murthy added a comment - - edited

          Do we know why the tasks got stuck?

          I'm sure speculation will help, only question is whether that is the correct fix.

          Don't get me wrong... I'm pretty sure this is an important feature!

          Show
          Arun C Murthy added a comment - - edited Do we know why the tasks got stuck? I'm sure speculation will help, only question is whether that is the correct fix. Don't get me wrong... I'm pretty sure this is an important feature!
          Devaraj Das made changes -
          Assignee Arun C Murthy [ acmurthy ]
          Milind Bhandarkar made changes -
          Field Original Value New Value
          Affects Version/s 0.14.3 [ 12312830 ]
          Fix Version/s 0.15.1 [ 12312850 ]
          Affects Version/s 0.15.0 [ 12312565 ]
          Priority Major [ 3 ] Blocker [ 1 ]
          Fix Version/s 0.16.0 [ 12312740 ]
          Hide
          Milind Bhandarkar added a comment -

          +1

          We have started seeing this on more and more jobs. I am changing thiss to a blocker for 0.15.1.

          Show
          Milind Bhandarkar added a comment - +1 We have started seeing this on more and more jobs. I am changing thiss to a blocker for 0.15.1.
          Hide
          Runping Qi added a comment -

          +1

          Speculative execution should start of the original execution is a lot slower than the average execution speed.

          Show
          Runping Qi added a comment - +1 Speculative execution should start of the original execution is a lot slower than the average execution speed.
          Koji Noguchi created issue -

            People

            • Assignee:
              Andy Konwinski
              Reporter:
              Koji Noguchi
            • Votes:
              0 Vote for this issue
              Watchers:
              25 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development