|
Progress is reported back in Merger.writeFile. It reports back progress for every 10000 records. Might be we should report back every 5000 records ?
if ((++recordCtr % PROGRESS_BAR) == 0) {
progressable.progress();
}
It could also be possible that we have lesser records in each spill file because each record is of a bigger size. Since the limits for spill files are by size (io.sort.mb) and not record count, we probably are not touching the 10,000 mark at all?
Christian, could you tell us what would be the approximate/average record size of the intermediate map output and the io.sort.mb in this case? io.sort.mb=500
Avg size of record is 276 B. There are some bad outlayers of up to 3 MB, but their frequency is too small to be the reason for failure of reorting progress. I checked the full syslog of one of the tasks. The last merge started exactly 20 minutes (the configured timeout) before the time of failure, i.e. there was no progress reported at all. I am not familiar with progress reporting, but does progress() in writeFile() just set a flag with maybe no consequences? 2008-11-24 08:39:13,142 INFO org.apache.hadoop.mapred.MapTask: Finished spill 12
Yes, progress does set a flag, but there is a separate thread that looks at this flag value and the progress is propogated. Could you let us also know what io.sort.factor is and the total number of reduces for this application? From the above logs, the total number of spills is 13 and it is also apparent that your io.sort.factor is > 13 and there is no multi-level merge; there is only one merge of all these 13 files. In the above log, reducer 0's total intermediate output size is 111126 bytes, which implies 111126/256 = 434 records. Since this number is < 10,000, there will not be any progress reported for this reducer. A simple fix to try would be to send a progress right at the beginning. if ((++recordCtr % PROGRESS_BAR) == 0) {
progressable.progress();
}
to if ((recordCtr++ % PROGRESS_BAR) ==0) {
progressable.progress();
}
io.sort.factor is 100, number of reducers 18,000.
From your comment I would conclude that recordCtr is reset for every reducer, making progress reporting dependent on the number of reducers. That would not be good. In my opiniion progress reporting should be configured to be at the minimum once per a certain time period (e.g. 1 minute). Using number of processed records could be used for cnvenience (although decompression/compression slows processing down a lot), but it would have to be a counter independent of the number of reducers. Yes, I agree that the reporting should not be reset for every reducer.
One way to do this would be to pass an AtomicInteger as an additional argument to Merger.WriteFile. This would be a running counter and would be passed to the Merge function as well. Thoughts?
Attaching a patch that uses a single recordCounter per task. This counter is used while merging and is not reset per reducer
Passing yet another counter to the merge seems unnecessary; sharing an AtomicLong between counters to maintain the arbitrary, 10k threshold is pressing a heuristic into service as an API. Why not simply report progress at least once for each partition during the merge? Such a change would also be much easier to port to older branches...
Although this would solve the issue for our particular case, I can imagine a situation (e.g. single reducer with highly aggregated huge records) where this would not help, i.e. the time component needs to be factored into the progress reporting. Progress should always be reported at smaller intervals than the timeout which is configurable and could be a small number.
Fair point. Still, it's far from the only profile that makes unobserved progress and the current approach isn't guaranteed to work in this hypothetical case, either. Balancing performance against accuracy in the current model admits the possibility of spurious timeouts by definition. The heuristics we use- every N records out of the merge, every partition, etc.- are, as you point out, not guaranteed to fit each job's profile, but they're usually good enough without being too expensive. Continuing to refine them is a stop-gap, admittedly. The TaskTracker could try to look for external signs of progress in tasks it suspects are stuck (e.g. spills/merges generated between heartbeats), but that mistakes MapTask side-effects for task health. Adding a thread to poll for progress within the task adds modest benefits, but the cost in complexity (and likely performance) is discouraging. It also separates the checks for progress from the code effecting it, making it harder to maintain. While it's possible to imagine an adaptive system that would sample the frequency of status updates generated from each component and tune each threshold to the particular job, that's a long, long way from what is currently in place. For the scope of this JIRA, I think it's sufficient to flag progress after each partition. Doing more reporting out of the merge would be OK, but I see no reason to enshrine the N records/progress update heuristic. Had an offline discussion with Chris. It looks like that we could do the following to address the issue without complicating the code much
so that progress is sent at the beginning for each reducer
Agreed that this will not address all the use cases, but should be sufficient for most. Thoughts? Attaching a patch that introduces a new parameter to control the number of records to process before sending progress. The patch also modifies writeFile to send progress for each reducer at the beginning as well.
Attaching patches for the 18 and 19 branches
[exec]
[exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] -1 tests included. The patch doesn't appear to include any new or modified tests. [exec] Please justify why no tests are needed for this patch. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] [exec] ant testpach and ant test passed for all three (18, 19 branches and trunk) I just committed this. Thanks, Jothi
Integrated in Hadoop-trunk #677 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/677/
. Report status between merges and make the number of records between progress reports configurable. Contributed by Jothi Padmanabhan. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
E.g.
attempt_200811221852_0001_m_093617_0 FAILED 0.00% 24-Nov-2008 06:26:23 (1hrs, 6mins, 20sec)
Task attempt_200811221852_0001_m_093617_0 failed to report status for 1218 seconds. Killing!
syslog:
...
2008-11-24 06:26:21,607 INFO org.apache.hadoop.mapred.MapTask: Index: (1358423777, 193912, 94564)
2008-11-24 06:26:21,636 INFO org.apache.hadoop.mapred.Merger: Merging 13 sorted segments
2008-11-24 06:26:21,654 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 13 segments left of total size: 101764 bytes
2008-11-24 06:26:21,680 INFO org.apache.hadoop.mapred.MapTask: Index: (1358518341, 173266, 89638)
2008-11-24 06:26:21,709 INFO org.apache.hadoop.mapred.Merger: Merging 13 sorted segments
2008-11-24 06:26:21,749 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 13 segments left of total size: 121477 bytes
2008-11-24 06:26:21,780 INFO org.apache.hadoop.mapred.MapTask: Index: (1358607979, 219875, 102296)
2008-11-24 06:26:21,809 INFO org.apache.hadoop.mapred.Merger: Merging 13 sorted segments
2008-11-24 06:26:21,827 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 13 segments left of total size: 110955 bytes
2008-11-24 06:26:21,855 INFO org.apache.hadoop.mapred.MapTask: Index: (1358710275, 191382, 97152)
2008-11-24 06:26:22,023 INFO org.apache.hadoop.mapred.Merger: Merging 13 sorted segments
2008-11-24 06:26:22,178 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 13 segments left of total size: 105632 bytes