Hadoop Common
  1. Hadoop Common
  2. HADOOP-248

locating map outputs via random probing is inefficient

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.2.1
    • Fix Version/s: 0.12.0
    • Component/s: None
    • Labels:
      None

      Description

      Currently the ReduceTaskRunner polls the JobTracker for a random list of map tasks asking for their output locations. It would be better if the JobTracker kept an ordered log and the interface was changed to:

      class MapLocationResults {
      public int getTimestamp();
      public MapOutputLocation[] getLocations();
      }

      interface InterTrackerProtocol {
      ...
      MapLocationResults locateMapOutputs(int prevTimestamp);
      }

      with the intention that each time a ReduceTaskRunner calls locateMapOutputs, it passes back the "timestamp" that it got from the previous result. That way, reduces can easily find the new MapOutputs. This should help the "ramp up" when the maps first start finishing.

      1. 248-initial8.patch
        22 kB
        Devaraj Das
      2. 248-initial7.patch
        21 kB
        Devaraj Das
      3. 248-fixed1.patch
        23 kB
        Devaraj Das
      4. 248-9.patch
        21 kB
        Owen O'Malley

        Issue Links

          Activity

          Hide
          Nigel Daley added a comment -

          Sorry, ignore Hadoop QA's -1.

          Show
          Nigel Daley added a comment - Sorry, ignore Hadoop QA's -1.
          Hide
          Hadoop QA added a comment -

          -1, because the patch command could not apply the latest attachment (http://issues.apache.org/jira/secure/attachment/12351812/248-fixed1.patch) as a patch to trunk revision r510644. Please note that this message is automatically generated and may represent a problem with the automation system and not the patch. Results are at http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch

          Show
          Hadoop QA added a comment - -1, because the patch command could not apply the latest attachment ( http://issues.apache.org/jira/secure/attachment/12351812/248-fixed1.patch ) as a patch to trunk revision r510644. Please note that this message is automatically generated and may represent a problem with the automation system and not the patch. Results are at http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch
          Hide
          Doug Cutting added a comment -

          I just committed this. Thanks, Devaraj!

          Show
          Doug Cutting added a comment - I just committed this. Thanks, Devaraj!
          Hide
          Devaraj Das added a comment -

          This had a problem introduced unintentionally in the last submission (by Owen, when he corrected the spelling of OBSOLETE, etc.). The problem was that there is a variable called fromEventId which is used to track from which eventId a tasktracker should fetch events from from the jobtracker. This was earlier a IntWritable object, so that set(<somenumber>) could be done on the object and the new value of the 'int' within the object could be seen even when the method invocation returned. This variable was changed to an int and instead "fromEventId += <somenumber>" was done. Unfortunately, this would not be visible when the method invocation returned and hence the TaskTracker would get stuck at a particular eventId and would make no forward progress...
          Attached is the new patch which has the IntWritable stuff put back in, and also the method JobClient.listEvents has been modified to take two extra args - fromEventId, numEvents (this method didn't exist when I was earlier working on this issue). The JobSubmissionProtocol version has been changed also to reflect the change in the getTaskCompletionEvents protocol method (missed this in the earlier patch).

          Show
          Devaraj Das added a comment - This had a problem introduced unintentionally in the last submission (by Owen, when he corrected the spelling of OBSOLETE, etc.). The problem was that there is a variable called fromEventId which is used to track from which eventId a tasktracker should fetch events from from the jobtracker. This was earlier a IntWritable object, so that set(<somenumber>) could be done on the object and the new value of the 'int' within the object could be seen even when the method invocation returned. This variable was changed to an int and instead "fromEventId += <somenumber>" was done. Unfortunately, this would not be visible when the method invocation returned and hence the TaskTracker would get stuck at a particular eventId and would make no forward progress... Attached is the new patch which has the IntWritable stuff put back in, and also the method JobClient.listEvents has been modified to take two extra args - fromEventId, numEvents (this method didn't exist when I was earlier working on this issue). The JobSubmissionProtocol version has been changed also to reflect the change in the getTaskCompletionEvents protocol method (missed this in the earlier patch).
          Hide
          Doug Cutting added a comment -

          I just reverted this, since it was causing things to hang.

          Show
          Doug Cutting added a comment - I just reverted this, since it was causing things to hang.
          Hide
          Doug Cutting added a comment -

          I just committed this. Thanks, Devaraj!

          Show
          Doug Cutting added a comment - I just committed this. Thanks, Devaraj!
          Hide
          Hadoop QA added a comment -

          +1, because http://issues.apache.org/jira/secure/attachment/12349653/248-9.patch applied and successfully tested against trunk revision r500023.

          Show
          Hadoop QA added a comment - +1, because http://issues.apache.org/jira/secure/attachment/12349653/248-9.patch applied and successfully tested against trunk revision r500023.
          Hide
          Owen O'Malley added a comment -

          This is a minor modification of Devaraj's patch that fixes a spelling mistake (OBSELETE) and use TaskInProgress.partition instead of parsing the task id.

          Show
          Owen O'Malley added a comment - This is a minor modification of Devaraj's patch that fixes a spelling mistake (OBSELETE) and use TaskInProgress.partition instead of parsing the task id.
          Hide
          Hadoop QA added a comment -

          +1, because http://issues.apache.org/jira/secure/attachment/12349617/248-initial8.patch applied and successfully tested against trunk revision r499156.

          Show
          Hadoop QA added a comment - +1, because http://issues.apache.org/jira/secure/attachment/12349617/248-initial8.patch applied and successfully tested against trunk revision r499156.
          Hide
          Devaraj Das added a comment -

          This patch does better failure handling. It saves the location of a failed map output fetch in a hashmap for later retrial. Also, it has logic which makes it prefer a newer location to the saved one.

          Show
          Devaraj Das added a comment - This patch does better failure handling. It saves the location of a failed map output fetch in a hashmap for later retrial. Also, it has logic which makes it prefer a newer location to the saved one.
          Hide
          Devaraj Das added a comment -

          The patch does the following:
          1) Modifies the protocol version for InterTrackerProtocol since there is a major change there in the way map output is fetched. The method locateMapOutputs has been removed.
          2) The getTaskCompletion method in both InterTrackerProtocol and JobSubmission has been changed to take an extra argument - the max no. of events we want to fetch from the JobTracker.
          3) Two more fields are added in TaskCompletionEvents - the real-ID portion of the taskId string (for e.g., if the taskId is task_0001_m_000003_0, the real-ID is 3 within the job), and another boolean field to indicate whether the task is a map or not. By the way, these could have done on the Reduce side also by parsing the taskId string, but I think this is a more general way of doing it and it also is in line with the thought of having "ID objects" in the future.
          4) Only 10 events are fetched at a time by the JobClient
          5) A new value OBSELETE has been added to TaskCompletionEvent.Status to signify lost tasks (which were earlier reported as SUCCEEDED). For this, whenever a FAILED/KILLED TaskStatus is got, it is checked whether a SUCCEEDED was earlier reported for the same taskId, and if so that event is marked as OBSELETE.
          6) The number of events probed by the ReduceTaskRunner at any time is equal to max(5*numCopiers, 50).

          Feedback appreciated..

          Show
          Devaraj Das added a comment - The patch does the following: 1) Modifies the protocol version for InterTrackerProtocol since there is a major change there in the way map output is fetched. The method locateMapOutputs has been removed. 2) The getTaskCompletion method in both InterTrackerProtocol and JobSubmission has been changed to take an extra argument - the max no. of events we want to fetch from the JobTracker. 3) Two more fields are added in TaskCompletionEvents - the real-ID portion of the taskId string (for e.g., if the taskId is task_0001_m_000003_0, the real-ID is 3 within the job), and another boolean field to indicate whether the task is a map or not. By the way, these could have done on the Reduce side also by parsing the taskId string, but I think this is a more general way of doing it and it also is in line with the thought of having "ID objects" in the future. 4) Only 10 events are fetched at a time by the JobClient 5) A new value OBSELETE has been added to TaskCompletionEvent.Status to signify lost tasks (which were earlier reported as SUCCEEDED). For this, whenever a FAILED/KILLED TaskStatus is got, it is checked whether a SUCCEEDED was earlier reported for the same taskId, and if so that event is marked as OBSELETE. 6) The number of events probed by the ReduceTaskRunner at any time is equal to max(5*numCopiers, 50). Feedback appreciated..
          Hide
          Owen O'Malley added a comment -

          Sounds good, Devaraj.

          The events are per a taskid, not a tipid. So different attempts to run "map 0" would result in different events.

          That said, however, we probably should make another event "lost" or something for tasks that are lost because their output had problems or the task tracker was lost.

          We may also want to flag the "complete" events of lost tasks as obsolete so that reduces don't see them and try and fetch their outputs.

          Show
          Owen O'Malley added a comment - Sounds good, Devaraj. The events are per a taskid, not a tipid. So different attempts to run "map 0" would result in different events. That said, however, we probably should make another event "lost" or something for tasks that are lost because their output had problems or the task tracker was lost. We may also want to flag the "complete" events of lost tasks as obsolete so that reduces don't see them and try and fetch their outputs.
          Hide
          Sameer Paranjpye added a comment -

          Sounds good. How do we deal with map failures? Do we get multiple completion events when there are re-tries?

          Show
          Sameer Paranjpye added a comment - Sounds good. How do we deal with map failures? Do we get multiple completion events when there are re-tries?
          Hide
          Devaraj Das added a comment -

          Propose the following:

          1) Modify the "TaskCompletionEvent[] getTaskCompletionEvents(String jobid, int fromEventId)" defined in IntertrackerProtocol and JobSubmissionProtocol to have a new argument that will signify how many events we want to fetch. We may get a smaller number depending on how many events got registered for the job.
          So, it becomes: TaskCompletionEvent[] getTaskCompletionEvents(String jobid, int fromEventId, int maxEvents)
          This will generally be more scalable. In the case of map-output-fetches, it helps in the way that we do the same thing as we do today (except that the randomness is not there and the TT exactly knows which maps finished).

          2) Since the events IDs are numbered in a monotonically increasing sequence for a Job, we don't need to maintain timestamps (as the original comment on this bug suggests).

          3) Add a "boolean isMapTask()" method to TaskCompletionEvent class that will return true if the event is from a map task, false otherwise.

          Comments?

          Show
          Devaraj Das added a comment - Propose the following: 1) Modify the "TaskCompletionEvent[] getTaskCompletionEvents(String jobid, int fromEventId)" defined in IntertrackerProtocol and JobSubmissionProtocol to have a new argument that will signify how many events we want to fetch. We may get a smaller number depending on how many events got registered for the job. So, it becomes: TaskCompletionEvent[] getTaskCompletionEvents(String jobid, int fromEventId, int maxEvents) This will generally be more scalable. In the case of map-output-fetches, it helps in the way that we do the same thing as we do today (except that the randomness is not there and the TT exactly knows which maps finished). 2) Since the events IDs are numbered in a monotonically increasing sequence for a Job, we don't need to maintain timestamps (as the original comment on this bug suggests). 3) Add a "boolean isMapTask()" method to TaskCompletionEvent class that will return true if the event is from a map task, false otherwise. Comments?

            People

            • Assignee:
              Devaraj Das
              Reporter:
              Owen O'Malley
            • Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development