Hadoop Common
  1. Hadoop Common
  2. HADOOP-2119

JobTracker becomes non-responsive if the task trackers finish task too fast

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 0.16.0
    • Fix Version/s: 0.17.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Hide
      This removes many inefficiencies in task placement and scheduling logic. The JobTracker would perform linear scans of the list of submitted tasks in cases where it did not find an obvious candidate task for a node. With better data structures for managing job state, all task placement operations now run in constant time (in most cases). Also, the task output promotions are batched.
      Show
      This removes many inefficiencies in task placement and scheduling logic. The JobTracker would perform linear scans of the list of submitted tasks in cases where it did not find an obvious candidate task for a node. With better data structures for managing job state, all task placement operations now run in constant time (in most cases). Also, the task output promotions are batched.

      Description

      I ran a job with 0 reducer on a cluster with 390 nodes.
      The mappers ran very fast.
      The jobtracker lacks behind on committing completed mapper tasks.
      The number of running mappers displayed on web UI getting bigger and bigger.
      The jos tracker eventually stopped responding to web UI.

      No progress is reported afterwards.

      Job tracker is running on a separate node.
      The job tracker process consumed 100% cpu, with vm size 1.01g (reach the heap space limit).

      1. hadoop-jobtracker-thread-dump.txt
        60 kB
        Christian Kunz
      2. HADOOP-2119-v5.2.patch
        51 kB
        Amar Kamat
      3. HADOOP-2119-v5.1.patch
        52 kB
        Amar Kamat
      4. HADOOP-2119-v5.1.patch
        55 kB
        Amar Kamat
      5. HADOOP-2119-v4.1.patch
        52 kB
        Amar Kamat
      6. hadoop-2119.patch
        11 kB
        Srikanth Kakani

        Issue Links

          Activity

          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Hadoop-trunk #443 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/443/ )
          Hide
          Owen O'Malley added a comment -

          Ok, now I committed it. Thanks, Amar!

          Show
          Owen O'Malley added a comment - Ok, now I committed it. Thanks, Amar!
          Hide
          Devaraj Das added a comment -

          +1

          Show
          Devaraj Das added a comment - +1
          Hide
          Amar Kamat added a comment -

          The findbugs warning is due to the call to System.exit(-1) in the JobTracker. In case of misconfiguration (w.r.t cache levels), the JobTracker will try and shutdown and if the shutdown throws an exception there is a System.exit(-1) in the catch block to stop/kill the JobTracker forcefully. Hence I think this should not be a problem.

          Show
          Amar Kamat added a comment - The findbugs warning is due to the call to System.exit(-1) in the JobTracker. In case of misconfiguration (w.r.t cache levels), the JobTracker will try and shutdown and if the shutdown throws an exception there is a System.exit(-1) in the catch block to stop/kill the JobTracker forcefully. Hence I think this should not be a problem.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12378413/HADOOP-2119-v5.2.patch
          against trunk revision 619744.

          @author +1. The patch does not contain any @author tags.

          tests included -1. The patch doesn't appear to include any new or modified tests.
          Please justify why no tests are needed for this patch.

          javadoc +1. The javadoc tool did not generate any warning messages.

          javac +1. The applied patch does not generate any new javac compiler warnings.

          release audit +1. The applied patch does not generate any new release audit warnings.

          findbugs -1. The patch appears to introduce 1 new Findbugs warnings.

          core tests +1. The patch passed core unit tests.

          contrib tests +1. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2028/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2028/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2028/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2028/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12378413/HADOOP-2119-v5.2.patch against trunk revision 619744. @author +1. The patch does not contain any @author tags. tests included -1. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new javac compiler warnings. release audit +1. The applied patch does not generate any new release audit warnings. findbugs -1. The patch appears to introduce 1 new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests +1. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2028/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2028/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2028/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2028/console This message is automatically generated.
          Hide
          Amar Kamat added a comment -

          Btw in case of misconfiguration, the JT will try and shutdown and if the shutdown throws an exception there is a System.exit(-1). This will be detected by the findbugs but afaik this is the only way .

          Show
          Amar Kamat added a comment - Btw in case of misconfiguration, the JT will try and shutdown and if the shutdown throws an exception there is a System.exit(-1) . This will be detected by the findbugs but afaik this is the only way .
          Hide
          Amar Kamat added a comment -

          Taken into consideration Owen's comments. Here is what is done

          I really wish that the synchronization changes could be done in another patch ...

          +1. Removed all the synchronization changes. Will open another issue regarding the same.

          siblingSetAtLevel seems really arcane. I would propose that instead you add getChildren to the ...

          Maintaining this information at Node level might involve more complexity and will require more testing. A concept of children is already there in NodeBase but looking at the code it is not very clear what they are for and how to use them. Now there is just a single set of nodes at maxlevel maintained at the JobTracker. For now this seems to be a simpler solution.

          why is there yet another map from hostname to Node? This is already done in the node mapping.

          This is done to incur less penalty during the job execution. While the job is running the only penalty incurred is for the resolution of datanodes and newly joining trackers while resolution of trackers (before the job is submitted) is done as a part of heart beat (separate thread). Without this mapping there is no way to find out the Node given the hostname. Also I have renamed the variable trackerNameToNodeMap which is there in the trunk. I am also using it to store the mapping for datanodes mapping too.

          I'm really concerned that we are adding 5 new fields holding collections to the JobInProgress

          As I said this is required to get away with the array and also that the total space is somewhat bounded by the total number of TIPs. Either the TIPs are local or not. Also the TIPs are either running or not-running. Mostly they move from one list to other. Hence
          local-maps-non-running + local-maps-running + non-local-maps-non-running + non-local-maps-running ~ total-map-tips
          and
          non-running-reduces + running-reduces ~ total-reduce-tips.

          reducers is a really bad name.

          Fixed.

          nodesToMaps should be runnableMaps

          runnable means !failed && not-completed. Running and non-running both belong to the runnable category. But I have used a different name for this variable.

          Don't use assignment in a parameter to a method in initTasks

          Fixed.

          I'm bothered by all of the checks for null Nodes that just skip the location.

          Fixed. Now there are no null checks.

          Shouldn't we remove the node from the nodesToMaps regardless of the level?

          Consider a case where tip1 fails on host1. host1 belongs to rack1. Now host1 runs out of cached tips and queries rack1's cache. In such a case it should not remove the tip since some other tracker in the same rack can schedule it.

          nodesToMaps being null should be a fatal error

          Fixed.

          nodesToMaps being null should be a fatal error

          Done. In case of misconfiguration (i.e nodesToMaps = null) the JobTracker will give a fatal error and shutdown.

          Show
          Amar Kamat added a comment - Taken into consideration Owen's comments. Here is what is done I really wish that the synchronization changes could be done in another patch ... +1. Removed all the synchronization changes. Will open another issue regarding the same. siblingSetAtLevel seems really arcane. I would propose that instead you add getChildren to the ... Maintaining this information at Node level might involve more complexity and will require more testing. A concept of children is already there in NodeBase but looking at the code it is not very clear what they are for and how to use them. Now there is just a single set of nodes at maxlevel maintained at the JobTracker. For now this seems to be a simpler solution. why is there yet another map from hostname to Node? This is already done in the node mapping. This is done to incur less penalty during the job execution. While the job is running the only penalty incurred is for the resolution of datanodes and newly joining trackers while resolution of trackers (before the job is submitted) is done as a part of heart beat (separate thread). Without this mapping there is no way to find out the Node given the hostname. Also I have renamed the variable trackerNameToNodeMap which is there in the trunk. I am also using it to store the mapping for datanodes mapping too. I'm really concerned that we are adding 5 new fields holding collections to the JobInProgress As I said this is required to get away with the array and also that the total space is somewhat bounded by the total number of TIPs. Either the TIPs are local or not. Also the TIPs are either running or not-running. Mostly they move from one list to other. Hence local-maps-non-running + local-maps-running + non-local-maps-non-running + non-local-maps-running ~ total-map-tips and non-running-reduces + running-reduces ~ total-reduce-tips . reducers is a really bad name. Fixed. nodesToMaps should be runnableMaps runnable means !failed && not-completed. Running and non-running both belong to the runnable category. But I have used a different name for this variable. Don't use assignment in a parameter to a method in initTasks Fixed. I'm bothered by all of the checks for null Nodes that just skip the location. Fixed. Now there are no null checks. Shouldn't we remove the node from the nodesToMaps regardless of the level? Consider a case where tip1 fails on host1 . host1 belongs to rack1 . Now host1 runs out of cached tips and queries rack1 's cache. In such a case it should not remove the tip since some other tracker in the same rack can schedule it. nodesToMaps being null should be a fatal error Fixed. nodesToMaps being null should be a fatal error Done. In case of misconfiguration (i.e nodesToMaps = null) the JobTracker will give a fatal error and shutdown.
          Hide
          Amar Kamat added a comment -

          siblingSetAtLevel seems really arcane.

          Node is also used elsewhere. So changing Node might prove risky. So for now we thought we will have a JT level mapping and address this as a separate issue.

          why is there yet another map from hostname to Node?

          There is not extra mapping in JobTracker. The variable is just renamed. Earlier the map was from the tracker-name to tracker-node. Now even the datanodes are mapped.

          I'm really concerned that we are adding 5 new fields holding collections to the JobInProgress

          This will be fine once we remove the array (maps/reduces)

          I'm bothered by all of the checks for null Nodes ....

          If nodes are null (anywhere in the cache topology) then there wont be any cache. The cache (as per trunk) is created only if the configuration is correct. The only place the node can be null is when a tracker just joins in. In that case we iterate over all the parent nodes and schedule a task. I agree that there should be sufficient amount of logging.

          Shouldn't we remove the node from the nodesToMaps regardless of the level?

          We cant remove from the runnable-cache since it will be a costly operation. Its a list!!

          nodesToMaps being null should be a fatal error

          With the latest patch there will be Null pointer exception.

          nodesToMaps should be a Map<Node,Set<Tip>> ...

          It can be a LinkedHashSet. Where the order of sort is the order of insertion. Since this is what exactly we wanted, But then we would not be able to add failed tips in the front. We can maintain a separate cache for failed tips.

          Show
          Amar Kamat added a comment - siblingSetAtLevel seems really arcane. Node is also used elsewhere. So changing Node might prove risky. So for now we thought we will have a JT level mapping and address this as a separate issue. why is there yet another map from hostname to Node? There is not extra mapping in JobTracker. The variable is just renamed. Earlier the map was from the tracker-name to tracker-node. Now even the datanodes are mapped. I'm really concerned that we are adding 5 new fields holding collections to the JobInProgress This will be fine once we remove the array (maps/reduces) I'm bothered by all of the checks for null Nodes .... If nodes are null (anywhere in the cache topology) then there wont be any cache. The cache (as per trunk) is created only if the configuration is correct. The only place the node can be null is when a tracker just joins in. In that case we iterate over all the parent nodes and schedule a task. I agree that there should be sufficient amount of logging. Shouldn't we remove the node from the nodesToMaps regardless of the level? We cant remove from the runnable-cache since it will be a costly operation. Its a list!! nodesToMaps being null should be a fatal error With the latest patch there will be Null pointer exception. nodesToMaps should be a Map<Node,Set<Tip>> ... It can be a LinkedHashSet. Where the order of sort is the order of insertion. Since this is what exactly we wanted, But then we would not be able to add failed tips in the front. We can maintain a separate cache for failed tips.
          Hide
          Owen O'Malley added a comment -

          Ah, we can't use Map<Node,Set<TIP>> because iteration would be unordered. Darn. Smile

          Show
          Owen O'Malley added a comment - Ah, we can't use Map<Node,Set<TIP>> because iteration would be unordered. Darn. Smile
          Hide
          Owen O'Malley added a comment -

          In the JobTracker:

          • siblingSetAtLevel seems really arcane. I would propose that instead you add getChildren to the Node interface.
          • why is there yet another map from hostname to Node? This is already done in the node mapping.

          In the JobInProgress:

          • I'm really concerned that we are adding 5 new fields holding collections to the JobInProgress
          • reducers is a really bad name. I'd suggest runnableReduces or something.
          • nodesToMaps should be runnableMaps
          • Don't use assignment in a parameter to a method in initTasks
          • I'm bothered by all of the checks for null Nodes that just skip the location. I think it should be a warn in the job tracker logs so that admins can find the problem and should be the default node/ default rack.
          • Shouldn't we remove the node from the nodesToMaps regardless of the level? Since if it is running, it is also in the runningMaps list and we can speculate out of there.
          • nodesToMaps being null should be a fatal error
          • nodesToMaps should be a Map<Node,Set<Tip>> rather than a list of tips, so that we can remove things reasonably fast

          i'll look some more tonight.

          Show
          Owen O'Malley added a comment - In the JobTracker: siblingSetAtLevel seems really arcane. I would propose that instead you add getChildren to the Node interface. why is there yet another map from hostname to Node? This is already done in the node mapping. In the JobInProgress: I'm really concerned that we are adding 5 new fields holding collections to the JobInProgress reducers is a really bad name. I'd suggest runnableReduces or something. nodesToMaps should be runnableMaps Don't use assignment in a parameter to a method in initTasks I'm bothered by all of the checks for null Nodes that just skip the location. I think it should be a warn in the job tracker logs so that admins can find the problem and should be the default node/ default rack. Shouldn't we remove the node from the nodesToMaps regardless of the level? Since if it is running, it is also in the runningMaps list and we can speculate out of there. nodesToMaps being null should be a fatal error nodesToMaps should be a Map<Node,Set<Tip>> rather than a list of tips, so that we can remove things reasonably fast i'll look some more tonight.
          Hide
          Owen O'Malley added a comment -

          I really wish that the synchronization changes could be done in another patch. Without a very careful design of the locking protocols, there are bound to be problems that will take us a long time to discover. The last time someone changed the synchronization it took a couple weeks before everyone could agree there weren't new race conditions.

          Show
          Owen O'Malley added a comment - I really wish that the synchronization changes could be done in another patch. Without a very careful design of the locking protocols, there are bound to be problems that will take us a long time to discover. The last time someone changed the synchronization it took a couple weeks before everyone could agree there weren't new race conditions.
          Hide
          Amar Kamat added a comment -

          Note that the staleness will be for a very short time. It will be visible at the client (JobClient/WebUI) side only for the current request.

          Show
          Amar Kamat added a comment - Note that the staleness will be for a very short time. It will be visible at the client (JobClient/WebUI) side only for the current request.
          Hide
          Amar Kamat added a comment -

          Some comments about the synchronization changes
          1) The changes for synchronization are done to avoid the JobTracker locking wherever possible
          2) At the JobTracker following are the API's that can be unsynchronized w.r.t JobTracker

            a) getMapTaskReports
            b) getReduceTaskReports
            c) getTaskDiagnostics
            d) getTaskCompletionEvents
          

          3) a, b and c are the APIs for JobClient while d is for the reduceTasks
          4) a and b basically locks the JobTracker (then the JobInProgress and then the TaskInProgress) so that it can get the correct values of completes (via isComplete()) while d locks for diagnostic information (taskDiagnosticData) (via taskDiagnosticData() , generateSingleReport() and addDiagnosticInfo() ).
          5) I made completes as AtomicInteger. Updates to taskDiagnosticData is done only after sync on taskDiagnosticData i.e the object itself.
          6) Also the patch makes sure that data is always correct but it might be stale. For example when a task Task1 completes the TaskInProgress (via TaskInProgress.setSuccessfulTaskid(Task1)) there will not be any case the isComplete() is true and the completes(Task1) is false.
          7) d actually need not lock the JobTracker. JobInProgress locking seems sufficient. Removing the synchronization doesn't affect in any sense.

          Show
          Amar Kamat added a comment - Some comments about the synchronization changes 1) The changes for synchronization are done to avoid the JobTracker locking wherever possible 2) At the JobTracker following are the API's that can be unsynchronized w.r.t JobTracker a) getMapTaskReports b) getReduceTaskReports c) getTaskDiagnostics d) getTaskCompletionEvents 3) a , b and c are the APIs for JobClient while d is for the reduceTasks 4) a and b basically locks the JobTracker (then the JobInProgress and then the TaskInProgress) so that it can get the correct values of completes (via isComplete() ) while d locks for diagnostic information ( taskDiagnosticData ) (via taskDiagnosticData() , generateSingleReport() and addDiagnosticInfo() ). 5) I made completes as AtomicInteger. Updates to taskDiagnosticData is done only after sync on taskDiagnosticData i.e the object itself. 6) Also the patch makes sure that data is always correct but it might be stale. For example when a task Task1 completes the TaskInProgress (via TaskInProgress.setSuccessfulTaskid(Task1) ) there will not be any case the isComplete() is true and the completes(Task1) is false. 7) d actually need not lock the JobTracker. JobInProgress locking seems sufficient. Removing the synchronization doesn't affect in any sense.
          Hide
          Owen O'Malley added a comment -

          I really wish the removing of synchronization had been done in a different patch. It makes me very nervous...

          Show
          Owen O'Malley added a comment - I really wish the removing of synchronization had been done in a different patch. It makes me very nervous...
          Hide
          Owen O'Malley added a comment -

          You can not assume that i+, i--, i=1, etc are atomic operations.

          You must use the AtomicInteger class to get the semantics that you are looking for.

          Show
          Owen O'Malley added a comment - You can not assume that i+ , i--, i =1, etc are atomic operations. You must use the AtomicInteger class to get the semantics that you are looking for.
          Hide
          Amar Kamat added a comment -

          Submitting a patch after incorporating Devaraj's comments.

          Show
          Amar Kamat added a comment - Submitting a patch after incorporating Devaraj's comments.
          Hide
          Devaraj Das added a comment -

          Some comments.
          1) Remove default-node --> use a separate list for non-local running/non-running maps. So instead of falling to the array on a cache miss you hit the list that you can update as well (remove items, and add them to a equivalent list for running, etc.).
          2) Maintain a mapping from the level to the set of nodes in that level (except level 0). You should look at the TIPs at the topmost level cache (in case max cache level is 2, then that will mean all racks), when you look for something to run on a cache miss.
          3) Change the JobInProgress code to reflect proper terminologies like caches/lists etc
          4) TIPs that don't have locations get added to a special list instead of the default-node cache (point 1)
          5) Change the signature of findNewCachedTask to take the level instead of a boolean. Also, i think it'd be better if you call the method findTaskFromList since it caters to both maps and reduces and reduces really don't have a cache.
          6) getCurrentTime should be moved out to a place where it is called exactly once per findTask
          7) I don't think it is that important to move tasks to the back of the list in case of speculative tasks.

          Show
          Devaraj Das added a comment - Some comments. 1) Remove default-node --> use a separate list for non-local running/non-running maps. So instead of falling to the array on a cache miss you hit the list that you can update as well (remove items, and add them to a equivalent list for running, etc.). 2) Maintain a mapping from the level to the set of nodes in that level (except level 0). You should look at the TIPs at the topmost level cache (in case max cache level is 2, then that will mean all racks), when you look for something to run on a cache miss. 3) Change the JobInProgress code to reflect proper terminologies like caches/lists etc 4) TIPs that don't have locations get added to a special list instead of the default-node cache (point 1) 5) Change the signature of findNewCachedTask to take the level instead of a boolean. Also, i think it'd be better if you call the method findTaskFromList since it caters to both maps and reduces and reduces really don't have a cache. 6) getCurrentTime should be moved out to a place where it is called exactly once per findTask 7) I don't think it is that important to move tasks to the back of the list in case of speculative tasks.
          Hide
          Amar Kamat added a comment -

          Some comments on the attached patch
          1) It uses hostname to detect if the tip failed on a machine as compared to tracker-name. This becomes an issue if there are two trackers on a same node e.g ant tests. This is the reason why some of the tests failed.
          2) The list of ancestors maintained at the JT can be incomplete leading to stuck jobs. This can happen if the nodes have just the datanodes and no trackers.
          3) isJobComplete logic is broken. It should also consider failed TIPs.


          Also, JobInProgress.isJobComplete() now depends on failedMapTIPs and failedReduceTIPs. The patch fixes the update to failedMapTIPs/failedReduceTIPs in failedTask since it was broken (in cases where a TIP has a speculative task).

          Show
          Amar Kamat added a comment - Some comments on the attached patch 1) It uses hostname to detect if the tip failed on a machine as compared to tracker-name. This becomes an issue if there are two trackers on a same node e.g ant tests. This is the reason why some of the tests failed. 2) The list of ancestors maintained at the JT can be incomplete leading to stuck jobs. This can happen if the nodes have just the datanodes and no trackers. 3) isJobComplete logic is broken. It should also consider failed TIPs. Also, JobInProgress.isJobComplete() now depends on failedMapTIPs and failedReduceTIPs . The patch fixes the update to failedMapTIPs/failedReduceTIPs in failedTask since it was broken (in cases where a TIP has a speculative task).
          Hide
          Amar Kamat added a comment -

          The attached patch does the following
          Maps :
          1) Replaces ArrayList with LinkedList for the currently used caches (call it NR caches).
          2) Failed TIPs are added (if it can be) at the front of the NR caches. [for fail-early]
          3) Removal of a tip from the NR caches is on demand i.e remove running/non-runnable TIPs while searching for a new TIP.
          4) Maintains a new set of caches called R caches for running TIPs. This caches are similar to the NR caches but provides faster removal. Additions to the caches are in the form of appends. Removal is one shot i.e a non-running TIP is removed at once from all the R caches. [for speculation]

          Reduces :
          1) Maintains a LinkedList of non-running reducers i.e NR cache. [for non-running tasks]
          2) Failed reducers are added to the front of NR cache. [for fail-early]
          3) Maintains a set of running reducers with faster removal capability. [for speculation]


          Also,
          1) Search preference is as follows FAILED, NON-RUNNING, RUNNING
          2) Search order is as follows

              1. Search local cache i.e strong locality
              2. Search bottom-up (i.e from the node's parent to the node's top level ancestor) for a TIP i.e weak locality.
              3. Search breadth wise across top-level ancestors for a TIP i.e for a non local TIP.
          

          3) Introducing a default-node. TIP's that are not local to any of the node are local to default node. This node takes care of random-writer like cases i.e adapting the random-writer like cases to the cache structure. default-node belongs to default-rack and hence all the nodes share the non-local TIPs through default-rack.
          4) The JobTracker need not be synchronized for providing reports to the JobClient and hence these API's doesn't lock the JT. Some staleness is okay.
          5) Commits are now in batches. But batching takes fixed number of tasks at a time. Default is 5000. So at a time 5000 tasks will be batch committed. The reason for doing this 'fixed sized batching' is that committing too many TIPs in one go locks the JobTracker for a very long duration causing lost rpc/tracker issues.
          6) TIPs use trackers hostname instead of tracker name for maintaining the list of machines where the TIP failed.
          7) One major bottleneck which we observed was in JobInProgress.isJobComplete() where all the TIPs were scanned. This is costly since isJobComplete() is called once every completed/failed task (via TaskCommit thread) and proves costly in case of large number of maps. Now this check is done by using the counts of finished TIPs.

          Show
          Amar Kamat added a comment - The attached patch does the following Maps : 1) Replaces ArrayList with LinkedList for the currently used caches (call it NR caches). 2) Failed TIPs are added (if it can be) at the front of the NR caches. [for fail-early] 3) Removal of a tip from the NR caches is on demand i.e remove running/non-runnable TIPs while searching for a new TIP. 4) Maintains a new set of caches called R caches for running TIPs. This caches are similar to the NR caches but provides faster removal. Additions to the caches are in the form of appends. Removal is one shot i.e a non-running TIP is removed at once from all the R caches. [for speculation] Reduces : 1) Maintains a LinkedList of non-running reducers i.e NR cache. [for non-running tasks] 2) Failed reducers are added to the front of NR cache. [for fail-early] 3) Maintains a set of running reducers with faster removal capability. [for speculation] Also, 1) Search preference is as follows FAILED , NON-RUNNING , RUNNING 2) Search order is as follows 1. Search local cache i.e strong locality 2. Search bottom-up (i.e from the node's parent to the node's top level ancestor) for a TIP i.e weak locality. 3. Search breadth wise across top-level ancestors for a TIP i.e for a non local TIP. 3) Introducing a default-node . TIP's that are not local to any of the node are local to default node. This node takes care of random-writer like cases i.e adapting the random-writer like cases to the cache structure. default-node belongs to default-rack and hence all the nodes share the non-local TIPs through default-rack . 4) The JobTracker need not be synchronized for providing reports to the JobClient and hence these API's doesn't lock the JT. Some staleness is okay. 5) Commits are now in batches. But batching takes fixed number of tasks at a time. Default is 5000. So at a time 5000 tasks will be batch committed. The reason for doing this 'fixed sized batching' is that committing too many TIPs in one go locks the JobTracker for a very long duration causing lost rpc/tracker issues. 6) TIPs use trackers hostname instead of tracker name for maintaining the list of machines where the TIP failed. 7) One major bottleneck which we observed was in JobInProgress.isJobComplete() where all the TIPs were scanned. This is costly since isJobComplete() is called once every completed/failed task (via TaskCommit thread) and proves costly in case of large number of maps. Now this check is done by using the counts of finished TIPs.
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12377817/HADOOP-2119-v4.1.patch
          against trunk revision 619744.

          @author +1. The patch does not contain any @author tags.

          tests included -1. The patch doesn't appear to include any new or modified tests.
          Please justify why no tests are needed for this patch.

          javadoc +1. The javadoc tool did not generate any warning messages.

          javac +1. The applied patch does not generate any new javac compiler warnings.

          release audit +1. The applied patch does not generate any new release audit warnings.

          findbugs +1. The patch does not introduce any new Findbugs warnings.

          core tests -1. The patch failed core unit tests.

          contrib tests -1. The patch failed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1978/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1978/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1978/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1978/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12377817/HADOOP-2119-v4.1.patch against trunk revision 619744. @author +1. The patch does not contain any @author tags. tests included -1. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new javac compiler warnings. release audit +1. The applied patch does not generate any new release audit warnings. findbugs +1. The patch does not introduce any new Findbugs warnings. core tests -1. The patch failed core unit tests. contrib tests -1. The patch failed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1978/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1978/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1978/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/1978/console This message is automatically generated.
          Hide
          Amar Kamat added a comment -

          Attaching a patch for review.

          Show
          Amar Kamat added a comment - Attaching a patch for review.
          Hide
          Amar Kamat added a comment -

          True. My main concern there was fastening the reducers. Which as of now can be done by starting the reducers early and speeding up the shuffling process. Maps takes ~1hr while shuffling takes ~4hrs in the context of the benchmarks reported. Hence the reducers are hit more because of the slow shuffling. Fixing the load logic will require detailed analysis whereas improving the shuffling might not (parametric tweaks might make it better). Hence the load logic should scale.

          ... should be done as a separate Jira IMO

          +1

          Show
          Amar Kamat added a comment - True. My main concern there was fastening the reducers. Which as of now can be done by starting the reducers early and speeding up the shuffling process. Maps takes ~1hr while shuffling takes ~4hrs in the context of the benchmarks reported. Hence the reducers are hit more because of the slow shuffling. Fixing the load logic will require detailed analysis whereas improving the shuffling might not (parametric tweaks might make it better). Hence the load logic should scale. ... should be done as a separate Jira IMO +1
          Hide
          Devaraj Das added a comment -

          The only problem is that of the reducer-scheduling from the JT. The maps finish so fast that the map load is always low and the reducers always start after the maps are done. Simple tricks of increasing the number of task completion events, jetty threads etc might help but wont provide a scalable solution. So it seems that tweaking the load logic in the JT i.e getNewTaskForTaskTracker() is the only way.

          The load logic seems to be there by design and is there even in the existing codebase. Since the maps are really small and they complete really fast (even before the scheduled tasktracker heartbeat interval), the tasktracker always reports with countMapTasks() = 0. Thus they always get a map task. Increasing the number of taskcompletion events or the Jetty threads will not help here since the reducers are not even launched. If we decide to tweak the load logic it should be done as a separate Jira IMO.

          Show
          Devaraj Das added a comment - The only problem is that of the reducer-scheduling from the JT. The maps finish so fast that the map load is always low and the reducers always start after the maps are done. Simple tricks of increasing the number of task completion events, jetty threads etc might help but wont provide a scalable solution. So it seems that tweaking the load logic in the JT i.e getNewTaskForTaskTracker() is the only way. The load logic seems to be there by design and is there even in the existing codebase. Since the maps are really small and they complete really fast (even before the scheduled tasktracker heartbeat interval), the tasktracker always reports with countMapTasks() = 0. Thus they always get a map task. Increasing the number of taskcompletion events or the Jetty threads will not help here since the reducers are not even launched. If we decide to tweak the load logic it should be done as a separate Jira IMO.
          Hide
          Amar Kamat added a comment - - edited

          With a similar approach as discussed above and some optimizations (one of which is that the batching (task commit) now is in stages i.e batch-size tips from the queue get batch committed in one go) we could process large number of maps successfully.
          The job description is as follows
          1) 250 nodes
          2) random-writer modified to do the following : map data goes to the local filesystem and reducers do nothing.
          3) num maps : 3,20,000
          4) num reducers : 450
          5) bytes per map : 8mb
          6) total data : 2.5 TB
          7) batch commit size = 5000 i.e at a time only 5000 tips are committed
          The map phase took approx 40 min.
          The only problem is that of the reducer-scheduling from the JT. The maps finish so fast that the map load is always low and the reducers always start after the maps are done. Simple tricks of increasing the number of task completion events, jetty threads etc might help but wont provide a scalable solution. So it seems that tweaking the load logic in the JT i.e getNewTaskForTaskTracker() is the only way. We are currently trying lots of optimizations and will post a stable/final version of the approach along with a patch soon.

          Show
          Amar Kamat added a comment - - edited With a similar approach as discussed above and some optimizations (one of which is that the batching (task commit) now is in stages i.e batch-size tips from the queue get batch committed in one go) we could process large number of maps successfully. The job description is as follows 1) 250 nodes 2) random-writer modified to do the following : map data goes to the local filesystem and reducers do nothing. 3) num maps : 3,20,000 4) num reducers : 450 5) bytes per map : 8mb 6) total data : 2.5 TB 7) batch commit size = 5000 i.e at a time only 5000 tips are committed The map phase took approx 40 min. The only problem is that of the reducer-scheduling from the JT. The maps finish so fast that the map load is always low and the reducers always start after the maps are done. Simple tricks of increasing the number of task completion events , jetty threads etc might help but wont provide a scalable solution. So it seems that tweaking the load logic in the JT i.e getNewTaskForTaskTracker() is the only way. We are currently trying lots of optimizations and will post a stable/final version of the approach along with a patch soon.
          Hide
          Runping Qi added a comment -

          Devaraj,

          Chris D. has a simple test job that can reproduce this problem.
          You may use that job to validate your patch.

          Show
          Runping Qi added a comment - Devaraj, Chris D. has a simple test job that can reproduce this problem. You may use that job to validate your patch.
          Hide
          Runping Qi added a comment -

          I saw a lot warning messages in hadoop.log like:

          2008-02-27 14:24:16,366 WARN org.apache.hadoop.ipc.Server: Call queue overflow discarding oldest call getProtocolVersion(org.apache.hadoop.mapred.InterTrackerProtocol, 5) from xxxxxxxxxxxx

          Show
          Runping Qi added a comment - I saw a lot warning messages in hadoop.log like: 2008-02-27 14:24:16,366 WARN org.apache.hadoop.ipc.Server: Call queue overflow discarding oldest call getProtocolVersion(org.apache.hadoop.mapred.InterTrackerProtocol, 5) from xxxxxxxxxxxx
          Hide
          Runping Qi added a comment -

          This problem occurred again when I ran a large map/reduce job.
          The job tracker process was not responsive while consuming 100% cpu.
          The process reached its heaapstack limit.

          Runping

          Show
          Runping Qi added a comment - This problem occurred again when I ran a large map/reduce job. The job tracker process was not responsive while consuming 100% cpu. The process reached its heaapstack limit. Runping
          Hide
          Devaraj Das added a comment -

          Ok, after some offline discussions I am now tending to go with Vivek's recommendation due to reasons outlined in his last comment. We can open a new jira issue for the sparse matrix approach.

          Show
          Devaraj Das added a comment - Ok, after some offline discussions I am now tending to go with Vivek's recommendation due to reasons outlined in his last comment. We can open a new jira issue for the sparse matrix approach.
          Hide
          Owen O'Malley added a comment -

          +1 for the matrix with the failed, pending, and potentially speculative together in a single data structure.

          Note that reduces don't have locality information, so the pending tasks can be stored as a list. One option would be to add next/prev pointers into the TIP itself to enable removing tasks from the pending list in O(1).

          Show
          Owen O'Malley added a comment - +1 for the matrix with the failed, pending, and potentially speculative together in a single data structure. Note that reduces don't have locality information, so the pending tasks can be stored as a list. One option would be to add next/prev pointers into the TIP itself to enable removing tasks from the pending list in O(1).
          Hide
          Vivek Ratan added a comment -

          The only difference between 1a (we need to give that a name - let's call it the 'cache map', as it's mostly based on how the cache is implemented) and a sparse matrix is that in the latter, each TIP is linked to the TIP below it in the same column. And that the linked list for a row in a spare matrix is doubly linked (you need it to efficiently delete tasks in the running list) while in 1a, the runnable list is a singly linked list. Given that, I would vote for the cache map, for the following reasons:

          • My big concern with implementing a sparse matrix now is that you're implementing a brand new data structure. Given how core this functionality is, and time constraints, it's riskier to introduce such newness in the code.
          • You already have most of the code in place for cache map, it's been there for a while and tested in production. That gives me a lot more comfort than putting in brand new code.
          • In terms of performance, the only difference between a linked list for running tasks (2a) and a sparse matrix is for speculative tasks, where the latter performs better. However, it's not clear to me how much this will reflect in the overall performance. it seems like the effect of lower performance of a linked list may be extremely minimal in the overall scheme of things, so why throw in new code? It's better, iMO, to see whether this performance is indeed significant before making big changes.
          • As I mentioned earlier, the cache map and sparse matrix are almost identical. I don't see a sparse matrix being any more simple or elegant than a cache map, i.e., i see both as fairly simple and elegant structures.

          I agree that a sparse matrix is the better option for speculative tasks, and that it may be also useful in the future for more complex scheduling decisions, as Arun points out. However, because it requires new code and in such a central/core functionality, I'd recommend a more cautious approach of using the tried&tested code you already have to solve most, if not all, of the problems you're facing today, and looking at a sparse matrix implementation if the need is great. New code always brings it problems with testing and implementation and potential side effects.

          Show
          Vivek Ratan added a comment - The only difference between 1a (we need to give that a name - let's call it the 'cache map', as it's mostly based on how the cache is implemented) and a sparse matrix is that in the latter, each TIP is linked to the TIP below it in the same column. And that the linked list for a row in a spare matrix is doubly linked (you need it to efficiently delete tasks in the running list) while in 1a, the runnable list is a singly linked list. Given that, I would vote for the cache map, for the following reasons: My big concern with implementing a sparse matrix now is that you're implementing a brand new data structure. Given how core this functionality is, and time constraints, it's riskier to introduce such newness in the code. You already have most of the code in place for cache map, it's been there for a while and tested in production. That gives me a lot more comfort than putting in brand new code. In terms of performance, the only difference between a linked list for running tasks (2a) and a sparse matrix is for speculative tasks, where the latter performs better. However, it's not clear to me how much this will reflect in the overall performance. it seems like the effect of lower performance of a linked list may be extremely minimal in the overall scheme of things, so why throw in new code? It's better, iMO, to see whether this performance is indeed significant before making big changes. As I mentioned earlier, the cache map and sparse matrix are almost identical. I don't see a sparse matrix being any more simple or elegant than a cache map, i.e., i see both as fairly simple and elegant structures. I agree that a sparse matrix is the better option for speculative tasks, and that it may be also useful in the future for more complex scheduling decisions, as Arun points out. However, because it requires new code and in such a central/core functionality, I'd recommend a more cautious approach of using the tried&tested code you already have to solve most, if not all, of the problems you're facing today, and looking at a sparse matrix implementation if the need is great. New code always brings it problems with testing and implementation and potential side effects.
          Hide
          Arun C Murthy added a comment -

          I think we should take the approach 1(b). I was talking to Amar offline and he was saying that he has his heads around the implementation already.

          +1

          I think it's a simple, elegant way to ensure we have the most up-to-date scheduling information (1a introduces a lag since deletes don't happen synchronously in findNewTask) which we could potentially leverage for more complex scheduling decisions later (e.g. how many tasks can potentially be scheduled on this node/rack etc.). I believe this is a good long-term approach.

          Show
          Arun C Murthy added a comment - I think we should take the approach 1(b). I was talking to Amar offline and he was saying that he has his heads around the implementation already. +1 I think it's a simple, elegant way to ensure we have the most up-to-date scheduling information (1a introduces a lag since deletes don't happen synchronously in findNewTask) which we could potentially leverage for more complex scheduling decisions later (e.g. how many tasks can potentially be scheduled on this node/rack etc.). I believe this is a good long-term approach.
          Hide
          Amar Kamat added a comment -

          Yeah. Things are pretty clear/simple now. I am waiting for the HADOOP-1985 patch so that I will get a complete picture of what is happening. I should be able to submit some benchmark results soon but first I will put up the pseudo code.

          Show
          Amar Kamat added a comment - Yeah. Things are pretty clear/simple now. I am waiting for the HADOOP-1985 patch so that I will get a complete picture of what is happening. I should be able to submit some benchmark results soon but first I will put up the pseudo code.
          Hide
          Devaraj Das added a comment -

          I think we should take the approach 1(b). I was talking to Amar offline and he was saying that he has his heads around the implementation already.

          Show
          Devaraj Das added a comment - I think we should take the approach 1(b). I was talking to Amar offline and he was saying that he has his heads around the implementation already.
          Hide
          Devaraj Das added a comment -

          BTW, if we take the sparse matrix approach, we really don't need the other datastructure for RUNNING tasks.
          In the sparse matrix proposal, note that all TIPs are running in a location if the first TIP is running since we always move TIP columns to the back whenever we choose a TIP for running. And, we don't consider tasks to execute speculatively unless we run out of virgin tasks. So when we run into the situation where we want to consider tasks for speculatve execution, we go in the order - local, rack local, off rack. We hit all all the locations in O(1) and the time to find a speculative task in a particular row is given by the placement of the first slow task in the row. We also move this corresponding TIP column to the back in exactly the same way we do for virgin tasks. This way we do speculative execution also in the order of split sizes.

          Show
          Devaraj Das added a comment - BTW, if we take the sparse matrix approach, we really don't need the other datastructure for RUNNING tasks. In the sparse matrix proposal, note that all TIPs are running in a location if the first TIP is running since we always move TIP columns to the back whenever we choose a TIP for running. And, we don't consider tasks to execute speculatively unless we run out of virgin tasks. So when we run into the situation where we want to consider tasks for speculatve execution, we go in the order - local, rack local, off rack. We hit all all the locations in O(1) and the time to find a speculative task in a particular row is given by the placement of the first slow task in the row. We also move this corresponding TIP column to the back in exactly the same way we do for virgin tasks. This way we do speculative execution also in the order of split sizes.
          Hide
          Vivek Ratan added a comment -

          We need to clarify what assumptions are made by the various proposals, how they're different and similar, and figure out the best approach. After discussions between Owen, Arun and myself, we feel the following assumptions are valid:

          • Failed tasks (tasks and TIPs are used interchangeably in this discussion) should be considered before virgin tasks where possible.
          • Failed tasks need not be sorted on input size. In fact, there isn't any one obvious way in which they should be sorted.
          • Similarly, there isn't any one obvious way in which running tasks should be sorted. Maybe the tasks that have run the longest are better candidates for speculation, but that's not obvious.

          After looking at the various data structures that have been suggested, the following are recommended:

          1. A single data structure for virgin and failed tasks is recommended (call this the runnable list). There are two options:

          1a. We use a structure similar to that for the cache. Have a hashmap of hostnames to linked lists, where each list contains the runnable tasks for the host.

          • Each list is sorted in decreasing order of input size at the beginning, as all tasks are virgin.
          • A task can occur in a list for more than one host.
          • When a task is selected to run (this is the first task in the list, unless that task is marked as running on another host, in which case it is removed from the list and the next task is considered), it is removed from the runnable list. This is an O(1) operation.
          • A task can be inserted into a runnable list when it fails, in which case it is inserted into the beginning of the list (O(1)).
          • A runnable list can end up such that failed tasks occur before virgin tasks, and the failed tasks are sorted by when they failed (the ones that failed later occur earlier in the list), and not by input size. Thus the highest priority is for a failed task that failed most recently.
          • This list can be singly linked, as we insert and delete at the head.
          • We can have a single hashmap that keeps tracks of lists for hosts or racks. The key for the hashmap is a unique name of the host or rack. By using names similar to absolute paths of files in directories, this structure can support multiple hierarchies of racks.

          1b. Another implementation option is a 2-D sparse matrix as Owen suggested earlier. Only difference between this structure and the one proposed in 1a is that tasks are additionally connected to other tasks in their column. This makes it easy to delete all tasks related to the same data input. When a task is selected to run, we can remove all other related tasks (tasks in the same column) very easily.

          2. For running tasks (call this the running list), we have the following options:

          2a. Keep a global list of running tasks and walk through it to pick a speculative task, much as we do today (the only difference being that currently, we walk through an array of all tasks, whereas with this approach, we walk through a list of only running tasks).

          • When a tasks starts to run, it is removed from the runnable list to a running list, and can be inserted at the head or tail. This is O(1).
          • When a task completes (or fails), we need to locate it in this list and remove it.
          • Perhaps the best way to implement this is for each TIP object to have a prev and next pointer. That makes location of a task in this list O(1), and removal is O(1) too as the list is doubly linked.
          • To figure out a speculative task, we walk through this list as before and pick the first task that works (we have to add options for rack awareness to the current algorithm). This is not O(1), but is no worse off than what we have today.

          2b. Use a 2-D sparse matrix, as described earlier, to represent possible speculative tasks.

          • When a task is selected to run from the runnable list, copies of it are inserted into the running list for each host where the task can be speculated (one for each host where the input is replicated, at the very least). Insertion into the matrix is O(1) if you have doubly linked lists for rows, and if you have an array, indexed by task ID, that points to the head of the column.
          • When a task completes (or fails), it is removed from this list, as are all tasks linked to it in its column. This takes constant time.
          • Finding a speculative task is O(1), as you pick the first task in the running list for the host, or for its rack. If nothing matches, pick a task from any arbitrary host.

          As far as implementation recommendations go, it makes sense to implement 1a and 2a now. Both use existing code and do not have a lot of changes to make. Picking a speculative task will be slow and not locality aware, but it will be no worse than what we have today, and implementation will be quick. At some time, depending on need, we can implement 1b and 2b, both of which refer to the same data structure. This requires more coding and testing, but will make the process of finding a speculative task faster.

          Show
          Vivek Ratan added a comment - We need to clarify what assumptions are made by the various proposals, how they're different and similar, and figure out the best approach. After discussions between Owen, Arun and myself, we feel the following assumptions are valid: Failed tasks (tasks and TIPs are used interchangeably in this discussion) should be considered before virgin tasks where possible. Failed tasks need not be sorted on input size. In fact, there isn't any one obvious way in which they should be sorted. Similarly, there isn't any one obvious way in which running tasks should be sorted. Maybe the tasks that have run the longest are better candidates for speculation, but that's not obvious. After looking at the various data structures that have been suggested, the following are recommended: 1. A single data structure for virgin and failed tasks is recommended (call this the runnable list). There are two options: 1a. We use a structure similar to that for the cache. Have a hashmap of hostnames to linked lists, where each list contains the runnable tasks for the host. Each list is sorted in decreasing order of input size at the beginning, as all tasks are virgin. A task can occur in a list for more than one host. When a task is selected to run (this is the first task in the list, unless that task is marked as running on another host, in which case it is removed from the list and the next task is considered), it is removed from the runnable list. This is an O(1) operation. A task can be inserted into a runnable list when it fails, in which case it is inserted into the beginning of the list (O(1)). A runnable list can end up such that failed tasks occur before virgin tasks, and the failed tasks are sorted by when they failed (the ones that failed later occur earlier in the list), and not by input size. Thus the highest priority is for a failed task that failed most recently. This list can be singly linked, as we insert and delete at the head. We can have a single hashmap that keeps tracks of lists for hosts or racks. The key for the hashmap is a unique name of the host or rack. By using names similar to absolute paths of files in directories, this structure can support multiple hierarchies of racks. 1b. Another implementation option is a 2-D sparse matrix as Owen suggested earlier. Only difference between this structure and the one proposed in 1a is that tasks are additionally connected to other tasks in their column. This makes it easy to delete all tasks related to the same data input. When a task is selected to run, we can remove all other related tasks (tasks in the same column) very easily. 2. For running tasks (call this the running list), we have the following options: 2a. Keep a global list of running tasks and walk through it to pick a speculative task, much as we do today (the only difference being that currently, we walk through an array of all tasks, whereas with this approach, we walk through a list of only running tasks). When a tasks starts to run, it is removed from the runnable list to a running list, and can be inserted at the head or tail. This is O(1). When a task completes (or fails), we need to locate it in this list and remove it. Perhaps the best way to implement this is for each TIP object to have a prev and next pointer. That makes location of a task in this list O(1), and removal is O(1) too as the list is doubly linked. To figure out a speculative task, we walk through this list as before and pick the first task that works (we have to add options for rack awareness to the current algorithm). This is not O(1), but is no worse off than what we have today. 2b. Use a 2-D sparse matrix, as described earlier, to represent possible speculative tasks. When a task is selected to run from the runnable list, copies of it are inserted into the running list for each host where the task can be speculated (one for each host where the input is replicated, at the very least). Insertion into the matrix is O(1) if you have doubly linked lists for rows, and if you have an array, indexed by task ID, that points to the head of the column. When a task completes (or fails), it is removed from this list, as are all tasks linked to it in its column. This takes constant time. Finding a speculative task is O(1), as you pick the first task in the running list for the host, or for its rack. If nothing matches, pick a task from any arbitrary host. As far as implementation recommendations go, it makes sense to implement 1a and 2a now. Both use existing code and do not have a lot of changes to make. Picking a speculative task will be slow and not locality aware, but it will be no worse than what we have today, and implementation will be quick. At some time, depending on need, we can implement 1b and 2b, both of which refer to the same data structure. This requires more coding and testing, but will make the process of finding a speculative task faster.
          Hide
          Devaraj Das added a comment -

          Owen, I like your approach of sparse matrix. I propose we build on that.

          Show
          Devaraj Das added a comment - Owen, I like your approach of sparse matrix. I propose we build on that.
          Hide
          Arun C Murthy added a comment -

          I guess you meant 'spare matrix' conceptually. AFAIK java doesn't have any inbuilt implementation for sparse matrix.

          No, I believe it is a concrete sparse matrix. My original proposal (alluded here: http://issues.apache.org/jira/browse/HADOOP-2119?focusedCommentId=12566309#action_12566309) had something very similar, I haven't had a chance to sync-up with Owen on his proposal...

          Originally:

          class TaskInProgress {
            // ...
            Map<Location, TaskInProgress} nextHostMap;
            Map<Location, TaskInProgress} prevHostMap;
            Map<Location, TaskInProgress} nextRackMap;
            Map<Location, TaskInProgress} prevRackMap;
            // ...
          }
          
          Map <Location, TaskInProgress> hostToTaskCache;
          Map <Location, TaskInProgress> rackToTaskCache;
          

          1. Location is either a node or a rack.
          2. The algorithm to find a new task to run is exactly same as above.
          3. The reason why the next/prev links are Maps is because every TIP is node-local/rack-local to multiple Locations.

          The central idea is to keep all related scheduling information together and design an optimal way to delete a TIP from multiple Locations given only one Location (remember that given the usual case of a replication factor of 3, a TIP can be scheduled on 6 locations i.e. 3 nodes and 3 racks, worst case). This is precisely the reason for have the backlinks (prev) ...

          1) I am not sure how easy it would be to insert into this structure.

          Insert is always insert-front or insert-back, which is O(1).

          2) Also how easily we can maintain the sortedness. This would be required for failed/running tasks.

          This approach is designed to eliminate any sorting at all. When a task fails we just insert-front the TIP on all Location lists, so O(1).

          3) Also the implementation and space complexity will also be HIGH.

          Overall, this is way more efficient time-wise (every operation is a constant time operation) at the cost of increased memory requirements. However, I believe it isn't prohibitively more expensive, the classic memory v/s performance tradeoff which is well-spent here:
          1. It is O(2x) current memory requirements, the 2x stems from the fact that currently each TIP is already on multiple lists (in cachedTasks) and the only addition is the prev links.
          2. All these are per-job data-structures and are cleaned when the job completes, and we aren't running into any memory-related issues at the JobTracker currently. Of course with HoD this is moot.

          Note on the implementation complexity: IMO it isn't insanely complicated, in fact fairly simple/obvious - but one man's food is another man's poison... there I managed to sound sagely and philosphical. smile

          Show
          Arun C Murthy added a comment - I guess you meant 'spare matrix' conceptually. AFAIK java doesn't have any inbuilt implementation for sparse matrix. No, I believe it is a concrete sparse matrix . My original proposal (alluded here: http://issues.apache.org/jira/browse/HADOOP-2119?focusedCommentId=12566309#action_12566309 ) had something very similar, I haven't had a chance to sync-up with Owen on his proposal... Originally: class TaskInProgress { // ... Map<Location, TaskInProgress} nextHostMap; Map<Location, TaskInProgress} prevHostMap; Map<Location, TaskInProgress} nextRackMap; Map<Location, TaskInProgress} prevRackMap; // ... } Map <Location, TaskInProgress> hostToTaskCache; Map <Location, TaskInProgress> rackToTaskCache; 1. Location is either a node or a rack. 2. The algorithm to find a new task to run is exactly same as above. 3. The reason why the next/prev links are Maps is because every TIP is node-local/rack-local to multiple Locations. The central idea is to keep all related scheduling information together and design an optimal way to delete a TIP from multiple Locations given only one Location (remember that given the usual case of a replication factor of 3, a TIP can be scheduled on 6 locations i.e. 3 nodes and 3 racks, worst case). This is precisely the reason for have the backlinks (prev) ... 1) I am not sure how easy it would be to insert into this structure. Insert is always insert-front or insert-back , which is O(1). 2) Also how easily we can maintain the sortedness. This would be required for failed/running tasks. This approach is designed to eliminate any sorting at all. When a task fails we just insert-front the TIP on all Location lists, so O(1). 3) Also the implementation and space complexity will also be HIGH. Overall, this is way more efficient time-wise (every operation is a constant time operation) at the cost of increased memory requirements. However, I believe it isn't prohibitively more expensive, the classic memory v/s performance tradeoff which is well-spent here: 1. It is O(2x) current memory requirements, the 2x stems from the fact that currently each TIP is already on multiple lists (in cachedTasks ) and the only addition is the prev links. 2. All these are per-job data-structures and are cleaned when the job completes, and we aren't running into any memory-related issues at the JobTracker currently. Of course with HoD this is moot. Note on the implementation complexity: IMO it isn't insanely complicated, in fact fairly simple/obvious - but one man's food is another man's poison... there I managed to sound sagely and philosphical. smile
          Hide
          Amar Kamat added a comment -

          I guess you meant 'spare matrix' conceptually. AFAIK java doesn't have any inbuilt implementation for sparse matrix. This data structure will be optimized for non-running tasks (virgin + failed) since
          1) For virgin tasks there are no insertions
          2) For failed tasks we just append in the front.

          Wondering on the following points
          1) I am not sure how easy it would be to insert into this structure.
          2) Also how easily we can maintain the sortedness. This would be required for failed/running tasks.
          3) Also the implementation and space complexity will also be HIGH.

          For HADOOP-2014, any of the designs mentioned in the table would work fine with an extra count (in the TIP) of number of trackers having the split locally. I think the strategy mentioned here along with (8) [in the above table] for running tasks would be efficient in terms of memory, space and implementation. Comments?

          Show
          Amar Kamat added a comment - I guess you meant 'spare matrix' conceptually. AFAIK java doesn't have any inbuilt implementation for sparse matrix. This data structure will be optimized for non-running tasks (virgin + failed) since 1) For virgin tasks there are no insertions 2) For failed tasks we just append in the front. Wondering on the following points 1) I am not sure how easy it would be to insert into this structure. 2) Also how easily we can maintain the sortedness. This would be required for failed/running tasks. 3) Also the implementation and space complexity will also be HIGH . For HADOOP-2014 , any of the designs mentioned in the table would work fine with an extra count (in the TIP) of number of trackers having the split locally. I think the strategy mentioned here along with (8) [in the above table] for running tasks would be efficient in terms of memory, space and implementation. Comments?
          Hide
          Owen O'Malley added a comment -

          I think we can do better than that, by using a special data structure that isn't that complicated.

          I propose that we use a 2-d sparse matrix, where each row is location (node or rack) and the columns correspond to a task in progress (TIP) that is currently runnable, but not running. I'd make the rows a doubly linked circular list and the columns a singularly linked circular list. So let's say the operations are:

          class LocationTable {
            // add to the front of the lists for all of the locations
            public void addToFront(TaskInProgress tip, String[] locations);
            // add it to the back of the lists at all of the locations
            public void addToBack(TaskInProgress tip, String[] locations);
            // get the first task in the given location and remove it from all of the lists
            public TaskInProgress getFront(String location);
          }
          

          All of the locations involve doing a look up to find the list and a O(1) operation to modify all of the lists. Doing deletes out of a doubly linked list is very fast. If we use a hash table from the location name to the front of the list for that location, then the lookup is also O(1).

          I think we should solve HADOOP-2014 at the same time http://issues.apache.org/jira/browse/HADOOP-2014?focusedCommentId=12566814#action_12566814

          So the order would be:
          1. Look at the node local list O(1)
          2. Look at the rack local list O(1)
          3. Look at the most overloaded rack from HADOOP-2014 O(# racks)

          Between the 3 of them, you'll always find a task if there are any to run. Update for all of the lists is O(1), regardless of how you found it.

          When tasks fail, you put them back at the front of all of the relevant lists.

          Which leaves the question of speculative execution... I suspect a LocationTable with the currently running tasks would work pretty well.

          Show
          Owen O'Malley added a comment - I think we can do better than that, by using a special data structure that isn't that complicated. I propose that we use a 2-d sparse matrix, where each row is location (node or rack) and the columns correspond to a task in progress (TIP) that is currently runnable, but not running. I'd make the rows a doubly linked circular list and the columns a singularly linked circular list. So let's say the operations are: class LocationTable { // add to the front of the lists for all of the locations public void addToFront(TaskInProgress tip, String [] locations); // add it to the back of the lists at all of the locations public void addToBack(TaskInProgress tip, String [] locations); // get the first task in the given location and remove it from all of the lists public TaskInProgress getFront( String location); } All of the locations involve doing a look up to find the list and a O(1) operation to modify all of the lists. Doing deletes out of a doubly linked list is very fast. If we use a hash table from the location name to the front of the list for that location, then the lookup is also O(1). I think we should solve HADOOP-2014 at the same time http://issues.apache.org/jira/browse/HADOOP-2014?focusedCommentId=12566814#action_12566814 So the order would be: 1. Look at the node local list O(1) 2. Look at the rack local list O(1) 3. Look at the most overloaded rack from HADOOP-2014 O(# racks) Between the 3 of them, you'll always find a task if there are any to run. Update for all of the lists is O(1), regardless of how you found it. When tasks fail, you put them back at the front of all of the relevant lists. Which leaves the question of speculative execution... I suspect a LocationTable with the currently running tasks would work pretty well.
          Hide
          Devaraj Das added a comment -

          I think we should prioritize FAILED tasks over VIRGIN tasks when we miss the task-cache. That way Owen's concern will be addressed. Regarding the options (5) and (6), one thing to note is this that tasks should be removed from the Running tasks datastructure as soon as a task comes to COMMIT_PENDING state. This will ensure that the the running tasks datastructure doesn't grow indefinitely (since the JT would handle COMMIT_PENDING tasks in the background).

          Also, do we care whether speculative tasks are executed in the order of split sizes?

          Overall, I think (1) + (3) + (5) looks like an approach worth trying out and benchmarking. The other thing that might help is to not do delete from the datastructure in (5) until we do a scan looking for speculative tasks (batch deletes). In general, the percentage of speculative tasks is very small and so we might hit O worst case for scan towards the end of the map/reduce phases. But should be okay to have a slightly degraded performance when looking for speculative tasks if the most frequent operations (looking for virgin/failed tasks) are efficient. Thoughts?

          Show
          Devaraj Das added a comment - I think we should prioritize FAILED tasks over VIRGIN tasks when we miss the task-cache. That way Owen's concern will be addressed. Regarding the options (5) and (6), one thing to note is this that tasks should be removed from the Running tasks datastructure as soon as a task comes to COMMIT_PENDING state. This will ensure that the the running tasks datastructure doesn't grow indefinitely (since the JT would handle COMMIT_PENDING tasks in the background). Also, do we care whether speculative tasks are executed in the order of split sizes? Overall, I think (1) + (3) + (5) looks like an approach worth trying out and benchmarking. The other thing that might help is to not do delete from the datastructure in (5) until we do a scan looking for speculative tasks (batch deletes). In general, the percentage of speculative tasks is very small and so we might hit O worst case for scan towards the end of the map/reduce phases. But should be okay to have a slightly degraded performance when looking for speculative tasks if the most frequent operations (looking for virgin/failed tasks) are efficient. Thoughts?
          Hide
          Amar Kamat added a comment - - edited

          I will summarize the designs here
          Algorithm

          1 Get a {{Runnable && ~Running && NotFailedOn(tracker)}}
             1.1 Scan the cache
             1.2 Scan all the TIPs
          2 Get a speculative TIP
          3 Get a TIP that has failed on all the machines
          

          Notations

          n            : number of TIPs
          master array : an array of TIPs sorted in some order.
          c            : number of TIPs that can potentially be executed locally.
          k            : maximum number of tasks that can be run simultaneously (<<n)
          f            : number of failed tasks (<<r)
          Pop          : get the first TIP
          b            : number of bins of TIPs = n/s (where, s  : max number of TIPs per bin (<k))
          

          The following analysis considers the worst case scenarios.

          # Algorithm For Description General Design Requirements Time Complexity Space Complexity Implementation Complexity
          1 Virgin Task Maintain a running pointer rather than starting from 0.
          The pointer will always more forward
          Fast Pop and Delete operation Pop : O(c) (worst case but will happen at most once)
          No deletions required
          O(1) Low
          2 Virgin Task Maintain a doubly linked list using array Fast Pop and Delete operation Pop : O(1)
          Delete : O(1)
          O( n )[actually 2*n] High
          3 Virgin Task Extend TIPs to have previous and next references. Maintain a doubly linked list of these TIPs.
          The master array holds the reference to these TIPs acting as a 'back reference' into the linked list.
          Fast Pop and Delete operation Pop : O(1)
          Delete : O(1)
          O( n ) [actually 2*n] Highest
          4 Virgin Task Same as 3, but no master array Fast Pop and Delete operation Pop : O(1)
          Delete : O( n )
          O( n ) [actually n] Highest
          5 Failed Task Maintain a sorted list of TIPs that have failed Small Size, Less Insert/Pop/Delete All O(log(f)) operations O(f) Low
          6 Failed Task Same as 3 Small Size, Less Insert/Pop/Delete Insert : O(f)
          Delete : O(1)
          Pop : O(1)
          O( n ) [actually 2*n] Highest
          7 Failed Task Same as 4 Small Size, Less Insert/Pop/Delete Insert : O(f)
          Delete : O(f)
          Pop : O(1)
          O( n ) [actually n] Highest
          8 Running Task Maintain an array of sorted lists (where the array is of size b).
          Membership of a TIP to a bin/group is determined by TIP.getPartition()/s
          Large Size
          Frequent Insert/Pop/Delete
          Insert : O(log(s))
          Delete O(log(s))
          Scan : O(k)
          O(k + b) Medium
          9 Running Task Maintain an array of lists (where the array is of size b).
          Membership of a TIP to a bin/group is determined by TIP.getPartition()/s
          Not sorted
          Large Size
          Frequent Insert/Delete
          Insert : O(1)
          Delete O(s)
          Scan : O(k)
          O(k + b) Medium
          10 Running Task Maintain one sorted list Large Size
          Frequent Insert/Delete
          Insert : O(log(k))
          Delete : O(log(k))
          Scan : O(k)
          O(k) Simple
          11 Running Task Same as 3 Large Size
          Frequent Insert/Delete
          Insert : O(k)
          Delete : O(1)
          O( n ) [actually 2*n] Highest
          12 Running Task Same as 4 Large Size
          Frequent Insert/Delete
          Insert : O(k)
          Delete : O(k)
          O( n ) [actually n] Highest
          Easiest to implement    :  (1) + (5) + (10)
          Most Space efficient    :  (1) + (5) + (10)
          Most Time efficient     :  (2)/(3) + (6) + (8) (most inefficient w.r.t space and implementation complexity)
          

          Now that we can efficiently find both virgin and failed TIPs, should we prefer failed TIPs over virgin ones in case of a cache miss? The reason for this is same as what Owen gave, i.e in case of deterministic failures we fail early.

          Show
          Amar Kamat added a comment - - edited I will summarize the designs here Algorithm 1 Get a {{ Runnable && ~Running && NotFailedOn(tracker)}} 1.1 Scan the cache 1.2 Scan all the TIPs 2 Get a speculative TIP 3 Get a TIP that has failed on all the machines Notations n : number of TIPs master array : an array of TIPs sorted in some order. c : number of TIPs that can potentially be executed locally. k : maximum number of tasks that can be run simultaneously (<<n) f : number of failed tasks (<<r) Pop : get the first TIP b : number of bins of TIPs = n/s (where, s : max number of TIPs per bin (<k)) The following analysis considers the worst case scenarios. # Algorithm For Description General Design Requirements Time Complexity Space Complexity Implementation Complexity 1 Virgin Task Maintain a running pointer rather than starting from 0. The pointer will always more forward Fast Pop and Delete operation Pop : O(c) (worst case but will happen at most once) No deletions required O(1) Low 2 Virgin Task Maintain a doubly linked list using array Fast Pop and Delete operation Pop : O(1) Delete : O(1) O( n ) [actually 2*n] High 3 Virgin Task Extend TIPs to have previous and next references. Maintain a doubly linked list of these TIPs. The master array holds the reference to these TIPs acting as a ' back reference ' into the linked list. Fast Pop and Delete operation Pop : O(1) Delete : O(1) O( n ) [actually 2*n] Highest 4 Virgin Task Same as 3, but no master array Fast Pop and Delete operation Pop : O(1) Delete : O( n ) O( n ) [actually n] Highest 5 Failed Task Maintain a sorted list of TIPs that have failed Small Size, Less Insert/Pop/Delete All O(log(f)) operations O(f) Low 6 Failed Task Same as 3 Small Size, Less Insert/Pop/Delete Insert : O(f) Delete : O(1) Pop : O(1) O( n ) [actually 2*n] Highest 7 Failed Task Same as 4 Small Size, Less Insert/Pop/Delete Insert : O(f) Delete : O(f) Pop : O(1) O( n ) [actually n] Highest 8 Running Task Maintain an array of sorted lists (where the array is of size b ). Membership of a TIP to a bin/group is determined by TIP.getPartition()/s Large Size Frequent Insert/Pop/Delete Insert : O(log(s)) Delete O(log(s)) Scan : O(k) O(k + b) Medium 9 Running Task Maintain an array of lists (where the array is of size b ). Membership of a TIP to a bin/group is determined by TIP.getPartition()/s Not sorted Large Size Frequent Insert/Delete Insert : O(1) Delete O(s) Scan : O(k) O(k + b) Medium 10 Running Task Maintain one sorted list Large Size Frequent Insert/Delete Insert : O(log(k)) Delete : O(log(k)) Scan : O(k) O(k) Simple 11 Running Task Same as 3 Large Size Frequent Insert/Delete Insert : O(k) Delete : O(1) O( n ) [actually 2*n] Highest 12 Running Task Same as 4 Large Size Frequent Insert/Delete Insert : O(k) Delete : O(k) O( n ) [actually n] Highest Easiest to implement : (1) + (5) + (10) Most Space efficient : (1) + (5) + (10) Most Time efficient : (2)/(3) + (6) + (8) (most inefficient w.r.t space and implementation complexity) Now that we can efficiently find both virgin and failed TIPs, should we prefer failed TIPs over virgin ones in case of a cache miss? The reason for this is same as what Owen gave, i.e in case of deterministic failures we fail early.
          Hide
          Amar Kamat added a comment -

          I guess the reason for not prioritizing failed task over virgin tasks is that the probability of the first not-running task being failed is pretty high as compared to it being virgin and hence the simplicity.


          I will explain some of the things in detail here.
          1) There is one more data structure that can be used for finding the virgin task in O(1) time but requires to hold 2*num-tips integers. Basically to have an array implementation of the doubly linked list. A pointer of the first element in the list also has to be maintained. Deletions/Top/Pop are O(1) and assuming no dynamic insertions. This is similar to the c/c++ implementations of doubly linked list giving the benefit of indexing the list as arrays providing O(1) deletions. If required I will post the implementation of this data structure.
          2) Since there will be a lot of insertions and deletions for running TIPs, the data structure should be optimized for insertions/deletions. Finding a TIP for speculation requires a scan over the running TIPs (sorted on size) as the progress changes dynamically. One good structure here will be an array of b SortedSets. Here b = max-possible-running-TIPs / k' and k' will be compile time constant by default set to 100. So now we hash the partition for the TIP to one such sortedset and then insert/delete locally in that set. Something like

          // compile time constant, number of elements per bin
          k' = 100 
          // number of bins
          b = num-tips / k'  
          SortedSet = Bin[tip.getPartition() % k']
          // operate on this tip 
          // addition/deletions happen on a smaller set of TIPs
          

          Note that for finding the TIP for speculation we will iterate over all the bins in order.
          If we decide not to care about the sortedness then use a simple ArrayList instead of a SortedSet.
          Other option would be to use a single list/structure for all the running tasks.


          Comments?

          Show
          Amar Kamat added a comment - I guess the reason for not prioritizing failed task over virgin tasks is that the probability of the first not-running task being failed is pretty high as compared to it being virgin and hence the simplicity. I will explain some of the things in detail here. 1) There is one more data structure that can be used for finding the virgin task in O(1) time but requires to hold 2*num-tips integers. Basically to have an array implementation of the doubly linked list. A pointer of the first element in the list also has to be maintained. Deletions/Top/Pop are O(1) and assuming no dynamic insertions. This is similar to the c/c++ implementations of doubly linked list giving the benefit of indexing the list as arrays providing O(1) deletions. If required I will post the implementation of this data structure. 2) Since there will be a lot of insertions and deletions for running TIPs, the data structure should be optimized for insertions/deletions. Finding a TIP for speculation requires a scan over the running TIPs (sorted on size) as the progress changes dynamically. One good structure here will be an array of b SortedSets. Here b = max-possible-running-TIPs / k' and k' will be compile time constant by default set to 100. So now we hash the partition for the TIP to one such sortedset and then insert/delete locally in that set. Something like // compile time constant, number of elements per bin k' = 100 // number of bins b = num-tips / k' SortedSet = Bin[tip.getPartition() % k'] // operate on this tip // addition/deletions happen on a smaller set of TIPs Note that for finding the TIP for speculation we will iterate over all the bins in order. If we decide not to care about the sortedness then use a simple ArrayList instead of a SortedSet. Other option would be to use a single list/structure for all the running tasks. Comments?
          Hide
          Vivek Ratan added a comment -

          >> This is unacceptable. We used to have this behavior and it was very bad. If map 0 fails deterministically, you absolutely do not want all 200,000 maps to fail 4 times each before the job fails.

          That's a valid point. However, today's code doesn't handle this situation completely well either. If you're treating a virgin task and a failed task with the same priority, then it's quite possible that you run a number of virgin tasks before you get to a failed task. You should really be prioritizing failed tasks over virgin tasks if you want to avoid the situation you've mentioned, though such a policy may cause unfairness in other situations.

          I think what we really need is what I mentioned for Options 3 and 4 - separate data structures to hold virgin tasks, failed tasks, and running tasks.

          • The data structure for virgin tasks does not need dynamic insertion capabilities (i.e., once created, you will never add any task to it). It also needs to be sorted by size. This structure needs to support deletion (once a task runs, it will be removed from this structure), and it also needs to handle large sizes (up to the number of tasks).
          • The data structure for failed tasks need to support quick insertions and deletions, but will be smaller in size (as the number of failed tasks is typically low).
          • The data structure for running tasks need to support quick insertions and deletions, and it's size is bounded (by the number of task trackers), though not necessarily small. Note that when a task is running and its status changes, we'll need to find it quickly in the running list so we can move it to another. The data structure should optimize a lookup as much as possible.

          Given all this, Devaraj, Amar, and I feel that the following data structure choices are the most appropriate:

          • We leave the cache as is.
          • For virgin tasks, we keep the tasks array, and have a running index to it (as suggested in Option 1). We could alternately have a doubly linked list structure with some sort of indexing (a list implemented as an array).
          • For failed tasks, we can keep a linked list or a sorted structure such as a Java sorted set. If we're OK not needing to keep failed tasks sorted by size (we could sort them by when they ran), then we can just keep a linked list and insert at the end.
          • For running tasks, given that the structure size is potentially larger than that for failed tasks, we can still keep a sorted map, or have some sort of a composite structure (which Devaraj/Amar have discussed, and can describe). Again, if we're OK not needing to keep running tasks sorted by size (we could sort them by when they ran), then we can just keep a linked list and insert at the end.

          By having multiple lists and keeping the array for virgin tasks, we're using more memory, but if that becomes a problem, we can use some other structure to represent virgin tasks.

          Show
          Vivek Ratan added a comment - >> This is unacceptable. We used to have this behavior and it was very bad. If map 0 fails deterministically, you absolutely do not want all 200,000 maps to fail 4 times each before the job fails. That's a valid point. However, today's code doesn't handle this situation completely well either. If you're treating a virgin task and a failed task with the same priority, then it's quite possible that you run a number of virgin tasks before you get to a failed task. You should really be prioritizing failed tasks over virgin tasks if you want to avoid the situation you've mentioned, though such a policy may cause unfairness in other situations. I think what we really need is what I mentioned for Options 3 and 4 - separate data structures to hold virgin tasks, failed tasks, and running tasks. The data structure for virgin tasks does not need dynamic insertion capabilities (i.e., once created, you will never add any task to it). It also needs to be sorted by size. This structure needs to support deletion (once a task runs, it will be removed from this structure), and it also needs to handle large sizes (up to the number of tasks). The data structure for failed tasks need to support quick insertions and deletions, but will be smaller in size (as the number of failed tasks is typically low). The data structure for running tasks need to support quick insertions and deletions, and it's size is bounded (by the number of task trackers), though not necessarily small. Note that when a task is running and its status changes, we'll need to find it quickly in the running list so we can move it to another. The data structure should optimize a lookup as much as possible. Given all this, Devaraj, Amar, and I feel that the following data structure choices are the most appropriate: We leave the cache as is. For virgin tasks, we keep the tasks array, and have a running index to it (as suggested in Option 1). We could alternately have a doubly linked list structure with some sort of indexing (a list implemented as an array). For failed tasks, we can keep a linked list or a sorted structure such as a Java sorted set. If we're OK not needing to keep failed tasks sorted by size (we could sort them by when they ran), then we can just keep a linked list and insert at the end. For running tasks, given that the structure size is potentially larger than that for failed tasks, we can still keep a sorted map, or have some sort of a composite structure (which Devaraj/Amar have discussed, and can describe). Again, if we're OK not needing to keep running tasks sorted by size (we could sort them by when they ran), then we can just keep a linked list and insert at the end. By having multiple lists and keeping the array for virgin tasks, we're using more memory, but if that becomes a problem, we can use some other structure to represent virgin tasks.
          Hide
          Vivek Ratan added a comment -

          >> However, I do believe this is a good time to start thinking about a better overall approach - especially given that HADOOP-1985 (rack-aware Map-Reduce scheduling) is almost upon us ..

          I don't think Approaches 1 and 2 are necessarily short-term. At least that's not the way we've been thinking about them (i.e., my recommendations are not based on short-term or long-term considerations). My view is that the first two approaches are simple enough and probably solve most of the problem so it's worth trying them out and then measuring performance. The other approaches require much more coding and the performance gains may not be worth the added complexity, even in the longer term. But this we can determine if we implement 1 or 2 and then see where the bottlenecks are and how often they happen.

          Also, we've gone through all these approaches with the rack-awareness implementation in mind. They handle rack-awareness just fine.

          You should definitely put down your design as well.

          Show
          Vivek Ratan added a comment - >> However, I do believe this is a good time to start thinking about a better overall approach - especially given that HADOOP-1985 (rack-aware Map-Reduce scheduling) is almost upon us .. I don't think Approaches 1 and 2 are necessarily short-term. At least that's not the way we've been thinking about them (i.e., my recommendations are not based on short-term or long-term considerations). My view is that the first two approaches are simple enough and probably solve most of the problem so it's worth trying them out and then measuring performance. The other approaches require much more coding and the performance gains may not be worth the added complexity, even in the longer term. But this we can determine if we implement 1 or 2 and then see where the bottlenecks are and how often they happen. Also, we've gone through all these approaches with the rack-awareness implementation in mind. They handle rack-awareness just fine. You should definitely put down your design as well.
          Hide
          Owen O'Malley added a comment -

          On a cache miss, if you don't mind returning a virgin task ahead of a failed task,

          This is unacceptable. We used to have this behavior and it was very bad. If map 0 fails deterministically, you absolutely do not want all 200,000 maps to fail 4 times each before the job fails.

          Show
          Owen O'Malley added a comment - On a cache miss, if you don't mind returning a virgin task ahead of a failed task, This is unacceptable. We used to have this behavior and it was very bad. If map 0 fails deterministically, you absolutely do not want all 200,000 maps to fail 4 times each before the job fails.
          Hide
          Arun C Murthy added a comment -

          I think it makes most sense to go with Option 1 for now, as it's the easiest to implement and makes the most common case run much faster. Options 3 and 4 need a fair bit of refactoring and may be an overkill for now, since you can get the most bang for the buck by just making sure that you don't scan the array from the beginning for virgin tasks.

          Vivek, it's a fair analysis and I agree it will help in the short-run.

          However, I do believe this is a good time to start thinking about a better overall approach - especially given that HADOOP-1985 (rack-aware Map-Reduce scheduling) is almost upon us ...

          I've had a brief chat with Owen about this and we both seem to have different approaches - I'll try and put up my thoughts about a completely revamped design for the scheduling data-structures in the next few days for consideration.

          Show
          Arun C Murthy added a comment - I think it makes most sense to go with Option 1 for now, as it's the easiest to implement and makes the most common case run much faster. Options 3 and 4 need a fair bit of refactoring and may be an overkill for now, since you can get the most bang for the buck by just making sure that you don't scan the array from the beginning for virgin tasks. Vivek, it's a fair analysis and I agree it will help in the short-run. However, I do believe this is a good time to start thinking about a better overall approach - especially given that HADOOP-1985 (rack-aware Map-Reduce scheduling) is almost upon us ... I've had a brief chat with Owen about this and we both seem to have different approaches - I'll try and put up my thoughts about a completely revamped design for the scheduling data-structures in the next few days for consideration.
          Hide
          Vivek Ratan added a comment -

          Regarding finding the next task to run:

          The problem is that we scan the array of TaskInProgress(TIP) objects (the 'tasks' array) from the beginning, each time we need to find a task to run and we haven't found one in the cache. When you have lots and lots of tasks, after some time the tasks array will predominantly be filled with running or completed tasks, especially at lower indexes. So it takes longer and longer to scan the array for virgin tasks (tasks that haven't run yet) or failed tasks if we always start from the beginning. There are a number of ways to get around this:

          1. On a cache miss, if you don't mind returning a virgin task ahead of a failed task, even though the failed task occurs earlier in the array than the virgin task, then you can do the following. Keep an index (a pointer) into the tasks array, which points to the first task in the array that is a virgin task. Every TIP to the left of this index will have been run at least once. Every time we need to scan the array, we look for runnable tasks from this index onwards. This index will move across the length of the array just once during the running of all tasks, so it averages out to O(1) for finding the first virgin task. This is fairly easy to code, and should provide significant savings in the cases where there are lots and lots of tasks. If a virgin task is not found, you can use the same code to look for a failed task or a speculative task.

          2. If you want to keep the same algorithm for finding the next task, i.e., if you want to return either a virgin or failed task first on a cache miss, then you can keep an index into the array which represents the first failed or virgin task. This approach won't be as fast as the first one, since this index can move back and forth across the array, but it should still be better over what we do today. In both options, the index will have to be potentially updated whenever a task's status changes.

          A more detailed approach requires maintaining separate lists for tasks with different states: one for virgin tasks, one for failed, one for completed, and one for running. As a task's status changes, it moves from one list to another. These lists can be sorted (the list of virgin tasks should be sorted in decreasing order of input size; the other lists can be sorted the same way, or perhaps in order of when the tasks ran, which simplifies things). Picking the next task is as simple as walking down the list (in many cases, just picking the first element of a list). Care must be taken that moving an element from one list to another is as effective as possible. The array of TIPS is no longer needed. I can think of at least a couple of ways of doing this:

          3. Modify a TIP object to have a reference to a 'previous' TIP object and a 'next' TIP object, so that we have linked lists of TIPS. Then a list is just a reference to the first (and last) TIP object, and a TIP belongs to just one list. If objects are added into the Running, Completed, and Failed lists at the tail only, handling a TIP's status change is O(1), but the lists are no longer sorted by size. Rather they are sorted by when the task ran. Finding the next task is pretty much constant, as we either pick the first TIP in the Virgin list, or the first TIP in the failed list (such that the TIP didn't fail on the host), or we walk through the Running list to find the first speculative task. If you need the lists sorted by size, then insertion is O. This is a fair bit of change, as far as coding is concerned.

          4. Lists can also be implemented as arrays of TIPS. This takes more space but moving a TIP from one list to another is faster than in the linked-list case. Insertion into a list sorted by size can be O(log n).

          You can also have hybrid approaches where you just keep lists for virgin tasks, for example.

          I think it makes most sense to go with Option 1 for now, as it's the easiest to implement and makes the most common case run much faster. Options 3 and 4 need a fair bit of refactoring and may be an overkill for now, since you can get the most bang for the buck by just making sure that you don't scan the array from the beginning for virgin tasks.

          Show
          Vivek Ratan added a comment - Regarding finding the next task to run: The problem is that we scan the array of TaskInProgress(TIP) objects (the 'tasks' array) from the beginning, each time we need to find a task to run and we haven't found one in the cache. When you have lots and lots of tasks, after some time the tasks array will predominantly be filled with running or completed tasks, especially at lower indexes. So it takes longer and longer to scan the array for virgin tasks (tasks that haven't run yet) or failed tasks if we always start from the beginning. There are a number of ways to get around this: 1. On a cache miss, if you don't mind returning a virgin task ahead of a failed task, even though the failed task occurs earlier in the array than the virgin task, then you can do the following. Keep an index (a pointer) into the tasks array, which points to the first task in the array that is a virgin task. Every TIP to the left of this index will have been run at least once. Every time we need to scan the array, we look for runnable tasks from this index onwards. This index will move across the length of the array just once during the running of all tasks, so it averages out to O(1) for finding the first virgin task. This is fairly easy to code, and should provide significant savings in the cases where there are lots and lots of tasks. If a virgin task is not found, you can use the same code to look for a failed task or a speculative task. 2. If you want to keep the same algorithm for finding the next task, i.e., if you want to return either a virgin or failed task first on a cache miss, then you can keep an index into the array which represents the first failed or virgin task. This approach won't be as fast as the first one, since this index can move back and forth across the array, but it should still be better over what we do today. In both options, the index will have to be potentially updated whenever a task's status changes. A more detailed approach requires maintaining separate lists for tasks with different states: one for virgin tasks, one for failed, one for completed, and one for running. As a task's status changes, it moves from one list to another. These lists can be sorted (the list of virgin tasks should be sorted in decreasing order of input size; the other lists can be sorted the same way, or perhaps in order of when the tasks ran, which simplifies things). Picking the next task is as simple as walking down the list (in many cases, just picking the first element of a list). Care must be taken that moving an element from one list to another is as effective as possible. The array of TIPS is no longer needed. I can think of at least a couple of ways of doing this: 3. Modify a TIP object to have a reference to a 'previous' TIP object and a 'next' TIP object, so that we have linked lists of TIPS. Then a list is just a reference to the first (and last) TIP object, and a TIP belongs to just one list. If objects are added into the Running, Completed, and Failed lists at the tail only, handling a TIP's status change is O(1), but the lists are no longer sorted by size. Rather they are sorted by when the task ran. Finding the next task is pretty much constant, as we either pick the first TIP in the Virgin list, or the first TIP in the failed list (such that the TIP didn't fail on the host), or we walk through the Running list to find the first speculative task. If you need the lists sorted by size, then insertion is O . This is a fair bit of change, as far as coding is concerned. 4. Lists can also be implemented as arrays of TIPS. This takes more space but moving a TIP from one list to another is faster than in the linked-list case. Insertion into a list sorted by size can be O(log n). You can also have hybrid approaches where you just keep lists for virgin tasks, for example. I think it makes most sense to go with Option 1 for now, as it's the easiest to implement and makes the most common case run much faster. Options 3 and 4 need a fair bit of refactoring and may be an overkill for now, since you can get the most bang for the buck by just making sure that you don't scan the array from the beginning for virgin tasks.
          Hide
          Amar Kamat added a comment -

          Since this issue deals with changing the cached TIPs and failed/speculative executions.

          Show
          Amar Kamat added a comment - Since this issue deals with changing the cached TIPs and failed/speculative executions.
          Hide
          Amar Kamat added a comment -

          Some doubts,

          + synchronized (JobTracker.this) {
          + for(count=0;count<jlist.size();count++) {

          this indicates that the array is scanned with the lock acquired. Is it scalable? Since the array size could be huge. Locking the JobTracker for this duration could be a performance hit or am I missing something? Currently only one task is considered at a time thus freeing the locks in between and thus the JobTracker progresses in between.

          Show
          Amar Kamat added a comment - Some doubts, + synchronized (JobTracker.this) { + for(count=0;count<jlist.size();count++) { this indicates that the array is scanned with the lock acquired. Is it scalable? Since the array size could be huge. Locking the JobTracker for this duration could be a performance hit or am I missing something? Currently only one task is considered at a time thus freeing the locks in between and thus the JobTracker progresses in between.
          Hide
          Christian Kunz added a comment -

          We ran jobs successfully with Srikanth's patch, without stuck reducers, i.e. the conclusion would be that the simple addition of '!sOnlyCommitPending()' to the condition in 'isRunnable()' in TaskInProgress.java might have undesired consequences.

          Show
          Christian Kunz added a comment - We ran jobs successfully with Srikanth's patch, without stuck reducers, i.e. the conclusion would be that the simple addition of '!sOnlyCommitPending()' to the condition in 'isRunnable()' in TaskInProgress.java might have undesired consequences.
          Hide
          Christian Kunz added a comment -

          Because of the time required to produce a comprehensive patch, I changed the blocker to 0.16.

          Show
          Christian Kunz added a comment - Because of the time required to produce a comprehensive patch, I changed the blocker to 0.16.
          Hide
          Sameer Paranjpye added a comment -

          Do we need this in 15.2? The right way to fix this appears to involve a good amount of Jobtracker refactoring – implementing more efficient data structures for task assignment. The fix proposed here is just a band-aid. Srikanth also tells me that they have an application level workaround.

          I suggest pushing this to 0.16 or beyond.

          Show
          Sameer Paranjpye added a comment - Do we need this in 15.2? The right way to fix this appears to involve a good amount of Jobtracker refactoring – implementing more efficient data structures for task assignment. The fix proposed here is just a band-aid. Srikanth also tells me that they have an application level workaround. I suggest pushing this to 0.16 or beyond.
          Hide
          Christian Kunz added a comment -

          To be precise:

          To find out the reason why reduces get stuck by not getting map output for a certain number of mappers,

          besides rolling back Srikanth's patch we also rolled back the trivially looking patch suggested by Devaraj:

          Index: src/java/org/apache/hadoop/mapred/TaskInProgress.java
          ===================================================================
          — src/java/org/apache/hadoop/mapred/TaskInProgress.java (revision 598581)
          +++ src/java/org/apache/hadoop/mapred/TaskInProgress.java (working copy)
          @@ -663,7 +663,7 @@

          • Return whether this TIP still needs to run
            */
            boolean isRunnable() { - return !failed && (completes == 0); + return !isOnlyCommitPending() && !failed && (completes == 0); }

          /**

          Show
          Christian Kunz added a comment - To be precise: To find out the reason why reduces get stuck by not getting map output for a certain number of mappers, besides rolling back Srikanth's patch we also rolled back the trivially looking patch suggested by Devaraj: Index: src/java/org/apache/hadoop/mapred/TaskInProgress.java =================================================================== — src/java/org/apache/hadoop/mapred/TaskInProgress.java (revision 598581) +++ src/java/org/apache/hadoop/mapred/TaskInProgress.java (working copy) @@ -663,7 +663,7 @@ Return whether this TIP still needs to run */ boolean isRunnable() { - return !failed && (completes == 0); + return !isOnlyCommitPending() && !failed && (completes == 0); } /**
          Hide
          Srikanth Kakani added a comment -

          For some reason this patch causes reducers to hang when fetching data from the mappers.

          Show
          Srikanth Kakani added a comment - For some reason this patch causes reducers to hang when fetching data from the mappers.
          Hide
          Christian Kunz added a comment -

          This issue has become important to us - upgrading to blocker for 0.15.2

          Show
          Christian Kunz added a comment - This issue has become important to us - upgrading to blocker for 0.15.2
          Hide
          Devaraj Das added a comment -

          The approach for batching updates looks good. The code needs to be fixed for indentation and the outermost try-catch block should not be removed. I am cancelling the patch until we have those fixes and also the fix for findNewTask...

          Show
          Devaraj Das added a comment - The approach for batching updates looks good. The code needs to be fixed for indentation and the outermost try-catch block should not be removed. I am cancelling the patch until we have those fixes and also the fix for findNewTask...
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12370493/hadoop-2119.patch
          against trunk revision r599223.

          @author +1. The patch does not contain any @author tags.

          javadoc +1. The javadoc tool did not generate any warning messages.

          javac +1. The applied patch does not generate any new compiler warnings.

          findbugs +1. The patch does not introduce any new Findbugs warnings.

          core tests +1. The patch passed core unit tests.

          contrib tests -1. The patch failed contrib unit tests.

          Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1196/testReport/
          Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1196/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1196/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1196/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12370493/hadoop-2119.patch against trunk revision r599223. @author +1. The patch does not contain any @author tags. javadoc +1. The javadoc tool did not generate any warning messages. javac +1. The applied patch does not generate any new compiler warnings. findbugs +1. The patch does not introduce any new Findbugs warnings. core tests +1. The patch passed core unit tests. contrib tests -1. The patch failed contrib unit tests. Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1196/testReport/ Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1196/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1196/artifact/trunk/build/test/checkstyle-errors.html Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1196/console This message is automatically generated.
          Hide
          Srikanth Kakani added a comment - - edited

          Batching up commits in the commit task. Greatly improves commit rate. This patch does not completely fix the problem. In order to fix the problem we need to enhance 1. fix findnewmaptasks to be O(1) 2. locking in jobtracker

          Tested the patch, it works for a job with 100,000 mappers. With almost 0 delay between the last complete map tasks and completing the job.

          Show
          Srikanth Kakani added a comment - - edited Batching up commits in the commit task. Greatly improves commit rate. This patch does not completely fix the problem. In order to fix the problem we need to enhance 1. fix findnewmaptasks to be O(1) 2. locking in jobtracker Tested the patch, it works for a job with 100,000 mappers. With almost 0 delay between the last complete map tasks and completing the job.
          Hide
          Srikanth Kakani added a comment -

          After a lot of analysis, we think that most threads on jobtracker get locked on the jobtracker object so much so that the lock acquisition time becomes 1.8 s or more.

          Now looking at the commit thread it has two synchronized(this) blocks and one promotion call per each promotion.

          So really the commit thread instead of doing much useful work waits mostly for lock acquisitions.

          The fix is that the commit thread greedily commits as many threads as possible when it has the lock. This works well both for slower maps and for faster-completing maps.

          I will be submitting a patch.

          Show
          Srikanth Kakani added a comment - After a lot of analysis, we think that most threads on jobtracker get locked on the jobtracker object so much so that the lock acquisition time becomes 1.8 s or more. Now looking at the commit thread it has two synchronized(this) blocks and one promotion call per each promotion. So really the commit thread instead of doing much useful work waits mostly for lock acquisitions. The fix is that the commit thread greedily commits as many threads as possible when it has the lock. This works well both for slower maps and for faster-completing maps. I will be submitting a patch.
          Hide
          Christian Kunz added a comment -

          2 quick hacks brought down the execution time from 100 hrs down to 6 hrs (actual execution time for all 100,000 mappers was a little bit less than 1 hr):
          1) Changed HEARTBEAT_INTERVAL to 60 secs
          2) Changed number of rpc handlers from 80 to 20
          Both reduce the time by about a factor of 4.

          Show
          Christian Kunz added a comment - 2 quick hacks brought down the execution time from 100 hrs down to 6 hrs (actual execution time for all 100,000 mappers was a little bit less than 1 hr): 1) Changed HEARTBEAT_INTERVAL to 60 secs 2) Changed number of rpc handlers from 80 to 20 Both reduce the time by about a factor of 4.
          Hide
          Christian Kunz added a comment -

          I ran into a similar problem:
          100,000 mappers with a single reducer on a 1400 node cluster with 2 tasks per node. All the waves of mappers finished in 2 hours, but are stuck in COMMIT_PENDING. Jobtracker is using 100% cpu (but distributed across 4 cpu's) and seems to take 3-4 secs yo process a completed task, such that the job will finish in about 100 hours.

          Thread dump attached

          Show
          Christian Kunz added a comment - I ran into a similar problem: 100,000 mappers with a single reducer on a 1400 node cluster with 2 tasks per node. All the waves of mappers finished in 2 hours, but are stuck in COMMIT_PENDING. Jobtracker is using 100% cpu (but distributed across 4 cpu's) and seems to take 3-4 secs yo process a completed task, such that the job will finish in about 100 hours. Thread dump attached
          Hide
          Devaraj Das added a comment -

          Yes, the #running-mappers kept on going up and went beyond the actual number of RUNNING mappers on the task trackers

          Ok, maybe the task commit thread was slow in processing the task completion events, and one reason for that could be that the namenode was slow...

          I've killed the jobtracker, thus cannot do "kill -3" anymore.

          Since it is easy to reproduce the problem, could you please have one run and post the traces of the jobtracker's thread stacks (kill -3, when the JT is at 100% CPU).

          Show
          Devaraj Das added a comment - Yes, the #running-mappers kept on going up and went beyond the actual number of RUNNING mappers on the task trackers Ok, maybe the task commit thread was slow in processing the task completion events, and one reason for that could be that the namenode was slow... I've killed the jobtracker, thus cannot do "kill -3" anymore. Since it is easy to reproduce the problem, could you please have one run and post the traces of the jobtracker's thread stacks (kill -3, when the JT is at 100% CPU).
          Hide
          Runping Qi added a comment -

          Yes, the #running-mappers kept on going up and went beyond the actual number of RUNNING mappers on the task trackers (which is roughly 3 * NUMBER OF TASK TRACKERS).

          The problem happened consistently, and easy (for me) to reproduce.
          The job has 0 reducer, just scans through a large input data set, and the mappers do not write anything out.

          I've killed the jobtracker, thus cannot do "kill -3" anymore.

          Show
          Runping Qi added a comment - Yes, the #running-mappers kept on going up and went beyond the actual number of RUNNING mappers on the task trackers (which is roughly 3 * NUMBER OF TASK TRACKERS). The problem happened consistently, and easy (for me) to reproduce. The job has 0 reducer, just scans through a large input data set, and the mappers do not write anything out. I've killed the jobtracker, thus cannot do "kill -3" anymore.
          Hide
          Devaraj Das added a comment -

          Runping, do you mean to say that the #running-mappers kept on going up and went beyond the actual number of mappers? Is the problem happening consistently? Are the maps creating anything on the dfs? Could you please do "kill -3 <jobtracker-pid>" to get a state of the threads when the job tracker goes into the state where it consumes 100% CPU, and post the output on jira.

          Show
          Devaraj Das added a comment - Runping, do you mean to say that the #running-mappers kept on going up and went beyond the actual number of mappers? Is the problem happening consistently? Are the maps creating anything on the dfs? Could you please do "kill -3 <jobtracker-pid>" to get a state of the threads when the job tracker goes into the state where it consumes 100% CPU, and post the output on jira.
          Hide
          Doug Cutting added a comment -

          I don't think this is a blocker for 0.15.0.

          Show
          Doug Cutting added a comment - I don't think this is a blocker for 0.15.0.

            People

            • Assignee:
              Amar Kamat
              Reporter:
              Runping Qi
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development