Based on above comments, I am summarizing what IMO could be an overall approach:
There could be following error scenarios:
A) The framework can catch the exception thrown by map/reduce function (only applicable for java Tasks). The framework can decide to keep moving after skipping the record, OR if the exception seems to be FATAL like OutOfMemory making the task process unstable, the framework can decide to forgo that Task execution. Re-execution should skip that record.
B) The task process crashes due to bad record.
For supporting the above error scenarios, here is what could be done:
1. Each task would have 2 lists - SKIPPED_RECORDS and SKIPPED_RANGES. Perhaps these could be maintained in DFS.
2 Have Task periodically send to the TaskTracker, the record range which it would going to process next. Range -> R(i, j). If the Task crashes, then the last received range is written to SKIPPED_RANGES.
3. Have Task periodically send the skipped records. This is the list for which it has caught exception and skipped since the last send. The TaskTracker will write these to the SKIPPED_RECORDS list.
4. Whenever a Task starts, it will look for SKIPPED_RECORDS and SKIPPED_RANGES from any previous run and will skip those while executing. At the end Task will try to run SKIPPED_RANGES on best-effort basis.
5. Have some job level thresholds like TOLERABLE_SKIPPED_PERCENT. When all the remaining task are trying to execute SKIPPED_RANGES, check for this threshold and if cumulative SKIPPED_RANGES for all remaining tasks are less than this threshold, then finish the job gracefully.
6. Executing SKIPPED_RANGES: execute by dividing a range into half. If a particular range succeeds then try from another range. In first pass, all the ranges are trimmed to half. In second pass, all ranges are trimmed to 1/4, and so on. This will continue till TOLERABLE_SKIPPED_PERCENT is not met.
7. Pipes: For identifying record range, the protocol has to be modified to figure out the processed records.
8. Streaming: The streaming process can write the processed record no to the stderr as a framework counter. See
For streaming process which does not support this feature, we can fall back to the mechanism in which the record range sent always start from the beginning (as we are not sure which ones have been processed yet). Range -> R(0, j). This range is then tried in the end on best effort basis, as described in 6.
Some optimizations could be done to this approach like instead of starting from begin, have it start based on some job configured no N. For eg. Range -> R(i-N, j). N is the expected no of records in the streaming process' stomach before they are processed. Users can define N for their jobs based on the buffers used in their process. The framework then tries to tune the value of N based on the crashes it encounters in further executions. The algorithm for this can become little complex; and there may not be that much payoff. So I think initially lets have it always skip from start, and optimize this behavior later.