Issue Details (XML | Word | Printable)

Key: HADOOP-4714
Type: Bug Bug
Status: Closed Closed
Resolution: Fixed
Priority: Major Major
Assignee: Jothi Padmanabhan
Reporter: Christian Kunz
Votes: 0
Watchers: 2
Operations

If you were logged in you would be able to see more operations.
Hadoop Common

map tasks timing out during merge phase

Created: 24/Nov/08 06:38 AM   Updated: 08/Jul/09 04:53 PM
Return to search
Component/s: None
Affects Version/s: 0.18.1
Fix Version/s: 0.18.3

Time Tracking:
Not Specified

File Attachments:
  Size
Text File Licensed for inclusion in ASF works hadoop-4714-18.patch 2008-12-02 02:29 PM Jothi Padmanabhan 4 kB
Text File Licensed for inclusion in ASF works hadoop-4714-19.patch 2008-12-02 02:29 PM Jothi Padmanabhan 4 kB
Text File Licensed for inclusion in ASF works hadoop-4714-v1.patch 2008-12-01 07:16 AM Jothi Padmanabhan 4 kB
Text File Licensed for inclusion in ASF works hadoop-4714.patch 2008-11-28 02:07 PM Jothi Padmanabhan 13 kB

Hadoop Flags: Reviewed
Resolution Date: 02/Dec/08 07:12 PM


 Description  « Hide
With compression of transient data turned on some parts of the merge phase seem to not report progress enough.

We see a lot of task failures during the merge phase, most of them timing out (even with a 20 min timeout)



 All   Comments   Work Log   Change History   Subversion Commits      Sort Order: Ascending order - Click to sort in descending order
Christian Kunz added a comment - 24/Nov/08 06:42 AM
These tasks report a time of failure when there are still messages logged to the syslog file, indicating that this might be a lack of progress reporting.

E.g.
attempt_200811221852_0001_m_093617_0 FAILED 0.00% 24-Nov-2008 06:26:23 (1hrs, 6mins, 20sec)
Task attempt_200811221852_0001_m_093617_0 failed to report status for 1218 seconds. Killing!

syslog:
...
2008-11-24 06:26:21,607 INFO org.apache.hadoop.mapred.MapTask: Index: (1358423777, 193912, 94564)
2008-11-24 06:26:21,636 INFO org.apache.hadoop.mapred.Merger: Merging 13 sorted segments
2008-11-24 06:26:21,654 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 13 segments left of total size: 101764 bytes
2008-11-24 06:26:21,680 INFO org.apache.hadoop.mapred.MapTask: Index: (1358518341, 173266, 89638)
2008-11-24 06:26:21,709 INFO org.apache.hadoop.mapred.Merger: Merging 13 sorted segments
2008-11-24 06:26:21,749 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 13 segments left of total size: 121477 bytes
2008-11-24 06:26:21,780 INFO org.apache.hadoop.mapred.MapTask: Index: (1358607979, 219875, 102296)
2008-11-24 06:26:21,809 INFO org.apache.hadoop.mapred.Merger: Merging 13 sorted segments
2008-11-24 06:26:21,827 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 13 segments left of total size: 110955 bytes
2008-11-24 06:26:21,855 INFO org.apache.hadoop.mapred.MapTask: Index: (1358710275, 191382, 97152)
2008-11-24 06:26:22,023 INFO org.apache.hadoop.mapred.Merger: Merging 13 sorted segments
2008-11-24 06:26:22,178 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 13 segments left of total size: 105632 bytes


Jothi Padmanabhan added a comment - 24/Nov/08 06:57 AM
Progress is reported back in Merger.writeFile. It reports back progress for every 10000 records. Might be we should report back every 5000 records ?
if ((++recordCtr % PROGRESS_BAR) == 0) {
        progressable.progress();
      }

Jothi Padmanabhan added a comment - 24/Nov/08 07:28 AM
It could also be possible that we have lesser records in each spill file because each record is of a bigger size. Since the limits for spill files are by size (io.sort.mb) and not record count, we probably are not touching the 10,000 mark at all?

Christian, could you tell us what would be the approximate/average record size of the intermediate map output and the io.sort.mb in this case?


Christian Kunz added a comment - 24/Nov/08 06:10 PM
io.sort.mb=500
Avg size of record is 276 B. There are some bad outlayers of up to 3 MB, but their frequency is too small to be the reason for failure of reorting progress.

I checked the full syslog of one of the tasks. The last merge started exactly 20 minutes (the configured timeout) before the time of failure, i.e. there was no progress reported at all. I am not familiar with progress reporting, but does progress() in writeFile() just set a flag with maybe no consequences?
When checking the log of a successful task I noticed that the final merge lasted longer than 20 minutes, i.e. this task reported progress, but from the TaskTracker log there was no progress reported for 18 minutes into the merge phase (before it was every few seconds), i.e. with a default timeout of 10 minutes this task attempt would have failed as well.

2008-11-24 08:39:13,142 INFO org.apache.hadoop.mapred.MapTask: Finished spill 12
2008-11-24 08:39:16,383 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:16,681 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:16,832 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:17,020 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:17,302 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:17,995 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:18,109 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:18,360 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:18,487 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:18,844 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:19,016 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:19,081 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:19,119 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor
2008-11-24 08:39:19,350 INFO org.apache.hadoop.mapred.Merger: Merging 13 sorted segments
2008-11-24 08:39:20,240 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 13 segments left of total size: 111126 bytes
2008-11-24 08:39:20,338 INFO org.apache.hadoop.mapred.MapTask: Index: (0, 194236, 96533)
2008-11-24 08:39:20,989 INFO org.apache.hadoop.mapred.Merger: Merging 13 sorted segments
2008-11-24 08:39:21,343 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 13 segments left of total size: 115642 bytes
2008-11-24 08:39:21,381 INFO org.apache.hadoop.mapred.MapTask: Index: (96533, 199588, 100312)
2008-11-24 08:39:21,427 INFO org.apache.hadoop.mapred.Merger: Merging 13 sorted segments
2008-11-24 08:39:21,864 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 13 segments left of total size: 126500 bytes
...
2008-11-24 08:59:10,877 INFO org.apache.hadoop.mapred.MapTask: Index: (1318384976, 240135, 108120)
2008-11-24 08:59:10,899 INFO org.apache.hadoop.mapred.Merger: Merging 13 sorted segments
2008-11-24 08:59:11,057 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 13 segments left of total size: 109385 bytes
2008-11-24 08:59:11,798 WARN org.apache.hadoop.mapred.TaskRunner: Parent died. Exiting attempt_200811221852_0001_m_099999_0


Jothi Padmanabhan added a comment - 27/Nov/08 09:39 AM

does progress() in writeFile() just set a flag with maybe no consequences

Yes, progress does set a flag, but there is a separate thread that looks at this flag value and the progress is propogated.

Could you let us also know what io.sort.factor is and the total number of reduces for this application?

From the above logs, the total number of spills is 13 and it is also apparent that your io.sort.factor is > 13 and there is no multi-level merge; there is only one merge of all these 13 files.

In the above log, reducer 0's total intermediate output size is 111126 bytes, which implies 111126/256 = 434 records. Since this number is < 10,000, there will not be any progress reported for this reducer.
If all the reducers have similar number of record counts, there will no progress information at all.

A simple fix to try would be to send a progress right at the beginning.

if ((++recordCtr % PROGRESS_BAR) == 0) {
        progressable.progress();
      }

to

if ((recordCtr++ % PROGRESS_BAR) ==0) {
    progressable.progress();
}

Christian Kunz added a comment - 27/Nov/08 05:44 PM
io.sort.factor is 100, number of reducers 18,000.

From your comment I would conclude that recordCtr is reset for every reducer, making progress reporting dependent on the number of reducers. That would not be good.

In my opiniion progress reporting should be configured to be at the minimum once per a certain time period (e.g. 1 minute). Using number of processed records could be used for cnvenience (although decompression/compression slows processing down a lot), but it would have to be a counter independent of the number of reducers.


Jothi Padmanabhan added a comment - 28/Nov/08 03:37 AM
Yes, I agree that the reporting should not be reset for every reducer.

Jothi Padmanabhan added a comment - 28/Nov/08 10:13 AM
One way to do this would be to pass an AtomicInteger as an additional argument to Merger.WriteFile. This would be a running counter and would be passed to the Merge function as well. Thoughts?

Jothi Padmanabhan added a comment - 28/Nov/08 02:07 PM
Attaching a patch that uses a single recordCounter per task. This counter is used while merging and is not reset per reducer

Chris Douglas added a comment - 30/Nov/08 02:22 AM

Attaching a patch that uses a single recordCounter per task. This counter is used while merging and is not reset per reducer

Passing yet another counter to the merge seems unnecessary; sharing an AtomicLong between counters to maintain the arbitrary, 10k threshold is pressing a heuristic into service as an API.

Why not simply report progress at least once for each partition during the merge?


Chris Douglas added a comment - 30/Nov/08 02:28 AM
Such a change would also be much easier to port to older branches...

Christian Kunz added a comment - 30/Nov/08 02:56 AM
Although this would solve the issue for our particular case, I can imagine a situation (e.g. single reducer with highly aggregated huge records) where this would not help, i.e. the time component needs to be factored into the progress reporting. Progress should always be reported at smaller intervals than the timeout which is configurable and could be a small number.

Chris Douglas added a comment - 30/Nov/08 04:59 AM

Although this would solve the issue for our particular case, I can imagine a situation (e.g. single reducer with highly aggregated huge records) where this would not help, i.e. the time component needs to be factored into the progress reporting. Progress should always be reported at smaller intervals than the timeout which is configurable and could be a small number.

Fair point. Still, it's far from the only profile that makes unobserved progress and the current approach isn't guaranteed to work in this hypothetical case, either. Balancing performance against accuracy in the current model admits the possibility of spurious timeouts by definition. The heuristics we use- every N records out of the merge, every partition, etc.- are, as you point out, not guaranteed to fit each job's profile, but they're usually good enough without being too expensive. Continuing to refine them is a stop-gap, admittedly.

The TaskTracker could try to look for external signs of progress in tasks it suspects are stuck (e.g. spills/merges generated between heartbeats), but that mistakes MapTask side-effects for task health. Adding a thread to poll for progress within the task adds modest benefits, but the cost in complexity (and likely performance) is discouraging. It also separates the checks for progress from the code effecting it, making it harder to maintain. While it's possible to imagine an adaptive system that would sample the frequency of status updates generated from each component and tune each threshold to the particular job, that's a long, long way from what is currently in place.

For the scope of this JIRA, I think it's sufficient to flag progress after each partition. Doing more reporting out of the merge would be OK, but I see no reason to enshrine the N records/progress update heuristic.


Jothi Padmanabhan added a comment - 30/Nov/08 11:32 AM
Had an offline discussion with Chris. It looks like that we could do the following to address the issue without complicating the code much
  • Change Merger.writeFile
    if ((++recordCtr % PROGRESS_BAR) == 0) {
            progressable.progress();
          }

    to

    if ((recordCtr++ % PROGRESS_BAR) == 0) {
            progressable.progress();
          }

so that progress is sent at the beginning for each reducer

  • Make the threshold (the record counts after which progress is sent, PROGRESS_BAR in the current code) configurable, by adding that as a parameter in hadoop_default.xml. The existing 10,000 would be the default. That way, users can easily arrive at the optimal number for their specific application/typical data set, if they so wish.

Agreed that this will not address all the use cases, but should be sufficient for most. Thoughts?


Jothi Padmanabhan added a comment - 01/Dec/08 07:16 AM
Attaching a patch that introduces a new parameter to control the number of records to process before sending progress. The patch also modifies writeFile to send progress for each reducer at the beginning as well.

Chris Douglas added a comment - 02/Dec/08 08:24 AM
+1

Jothi Padmanabhan added a comment - 02/Dec/08 02:29 PM
Attaching patches for the 18 and 19 branches

Jothi Padmanabhan added a comment - 02/Dec/08 02:46 PM
[exec]
[exec] -1 overall.
[exec]
[exec] +1 @author. The patch does not contain any @author tags.
[exec]
[exec] -1 tests included. The patch doesn't appear to include any new or modified tests.
[exec] Please justify why no tests are needed for this patch.
[exec]
[exec] +1 javadoc. The javadoc tool did not generate any warning messages.
[exec]
[exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
[exec]
[exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
[exec]
[exec]
[exec]

ant testpach and ant test passed for all three (18, 19 branches and trunk)


Chris Douglas added a comment - 02/Dec/08 07:12 PM
I just committed this. Thanks, Jothi

Hudson added a comment - 03/Dec/08 02:32 PM
Integrated in Hadoop-trunk #677 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/677/)
. Report status between merges and make the number of records
between progress reports configurable. Contributed by Jothi Padmanabhan.