|
+1
This would be a generalization of the checksum handler that tries to skip records when 'io.skip.checksum.errors' is set. In a recent note on the list, I speculate that there are a class of IOExceptions that should be treatable in the same manner – skipped if a flag is set (Search subject line ''Corrupt GZIP trailer' and other irrecoverable IOEs.'). sounds good. The acceptable % should probably be configurable. I'd be inclined to use something more like 1%. You could turn the feature off by configuring 0%, which should arguably be the default.
+1 Exceptions in the map and reduce functions that are implemented by the user should be handled by the user within the functions. Am starting to look at this issue. I am tending to handle this by only keeping track of how RecordReader.next behaves. This will take care of all cases - Java, Streaming, and Pipes in a generic manner. I am not very much in favor of providing hooks in the applications. Is that a requirement? Any other requirement? Thoughts welcome.
> I am tending to handle this by only keeping track of how RecordReader.next behaves.
I'm not sure what you mean by that. There are two kinds of places where user code might throw per-record exceptions:
Doug, I was just trying to summarize the high level requirement. Thanks for the detailed inputs.
A major use case is that the user implementation of the map or reduce(or third party libraries) can throw StackOverflow or OOM. In the latter case there is no guarantee that the program will be stable even if it catches the OOM. So a method will be good if the framework keeps track of the records, terminates the execution of the task and skips them in the rescheduled run.
Here is a proposal that tries to address the scenarios discussed in this jira:
0) Define the concept of a failed record number that is set by Tasks and propagated to the JobTracker on task failures. This becomes part of the TIP object at the JobTracker. 1) Define an API in the RecordReader to do with getting the record boundary. On getting an exception in RecordReader.next, the task starts from the beginning of the last successfully read record till the boundary and reads the next record from that point (ignoring the boundary bytes). Applies to maps. 2) Define an API in RecordWriter to do with writing record boundary along with every write(k,v). The record boundary can default to the sync bytes. Tasks fail when they get an exception while writing a bad record. With (0), in the subsequent retries, the records can be skipped. This applies to outputs of maps and reduces. 3) Define an API in RecordReader to do with whether we want to have recovery while reading records on not (useful for e.g. if the RecordReader has side effects in the next() method that would affect the reading of the subsequent record if there was an exception for the current record). In cases of applications throwing exceptions in the map/reduce methods, the exception is caught by the Task method, which invoked the map/reduce method. The task attempt is killed after noting the record number of the failed invocation. With the above point (0), these records are not fed to the m/r methods in the subsequent retries. The recovery/skip above is done on a best effort basis. That is, the worst case is that tasks fail! The above strategies should at least allow configuring the max % of records that will keep the recovery/skip cycle going before the job is declared as having failed. Also, the "skip records on re-execution" should probably be configurable on a per job basis by the user (since the exceptions could have been caused due to reasons other than incorrect input). Thoughts? I should have thought about Pipes and Streaming apps. In order to reliably reexecute Pipes Map tasks, we probably need to change the corresponding protocol to tell the sequence number of the record for which it is emitting the (keys,values). It's not clear how to handle the Streaming tasks since the protocol for Streaming can't be tweaked in a generic manner.
This sounds like roughly the right direction.
Maybe we should extend diagnostic info to contain a record number (or offset?) and use that to record the error that was thrown? For errors in RecordReader.next, you are right that it probably makes sense to skip ahead to the next sync marker/newline. Since the meta data is equally at risk as the record data. So having a "gotoSync" in RecordReader would help. Mappers and Reducers should be able to tolerate killer records, just like the reader. We should think about what is required for the RecordWriter to recover from a failed write... Of course I suspect there aren't any good answers until HDFS has truncate and append.
As I mentioned earlier, for the reader, I have an API in RecordReader for querying whether the RecordReader is side-effect free. If the RecordReader is not side-effect free (for e.g., if the next method implementation has issues to do with proceeding further if the current record read throws an exception), then the task is declared as having failed (and the offending record is skipped in the task retry). Honestly, i think bringing extra complexity (especially RecordReader API change, which will break lots of legacy code) is not justified to solve this issue. To be more explicit, the framework does not need to "continue from where it was" once a task throws an exception. I think we can just do step 0 (as defined above) and re-execute the task from the beginning, but this time skipping the problematic record. It is expected that resuming the execution from where we left and completely re-executing the task will not make a big difference, since the number of tasks is assumed to be high enough.
I propose So this functionality will be completely transparent to the application code, except for setting the configuration. If the user code is not side-effect free, than the user has to deal with that, because already the tasks can be executed more than once for various reasons(for example speculative execution). Any thoughts ? The order of the number of exceptions that the framework intends to handle affects the design a great deal. What is the scope of the problem we intend to handle here? I see there being two cases:
Perhaps we should attempt to resolve case 1) with task re-execution for now since it represents a useful incremental step towards a more sophisticated solution. It may prove to be sufficient. One could in principle argue that if the number of exceptions is O(num tasks) the problem is better handled at the application level. The tricky bit will be identifying the failed record number, no? The naive approach would be to have the child report after each record has been processed, so that the parent can then know, when it crashes, which record it was on. But that would probably be too expensive.
So rather we might have the child report in its TaskStatus the range of record numbers it intends to process before it next reports. Then, on failure, the parent knows that skipping that range will skip the offending record. The child can adaptively choose a range that keeps the status update time within reason. It would start by reporting range 0-1, then use the time required to process that entry to determine how big of a range to send the next time and still provide timely updates. If a particular record takes a long time, then it can update status before the range is exhausted, providing a new range. Would this be acceptable, to skip a few more than just the offending record?
In the original mapreduce paper, the record number is kept in a global variable which is then passed to the master through an UDP package. The master decides that the record is malformed if it sees more than one report for the same record number. I think we may be able catch the exception in the Child process and make an IPC to the TT(which in turn reports this to JT). There may be situations in which the IPC will fail, for those tasks we can adopt the above strategy of reporting the record number in the subsequent reexecution of the task to find out exactly which record the computation fails. Enis, agree that for the Java tasks case we could get the offending record immediately in the Child process. The problem here is that with things like Pipes apps (where the Java task spawns another child process from within), the record number at which the exception happened is tricky to get since the exception was really encountered in the Pipes process (this doesn't include the exception that we might encounter while reading the input since that happens in the Java parent task and we can catch those immediately).
hey folks - we are having a discussion on a similar jira (covering a smaller subset of issues) - 3144. we are actually hitting this problem (corrupted records causing OOM) and have a simple workaround specific to our problem.
but i am a little intrigued by the proposal here. for the recordreader issues - why not, simply, let the record reader skip the bad record(s). as the discussions here mentions - there have to be additional api's in the record reader to be able to skip problematic records. If the framework trusts record readers to be able to skip bad records - why bother re-executing? why not allow them to detect and skip bad records on the very first try. if TT/JT want to keep track and impose a limit on the bad records skipped - they could ask the record reader to report the same through an api. the exceptions from map/reduce functions are different - if they make the entire task unstable due to OOM issues then a re-execution makes sense. but if we separate the two issues - we may have a more lightweight way of tolerating pure data corruption/validity issues (as we are trying to in 3144). Yes, if you assume that record readers have the necessary logic to be able to skip bad records on the very first try, it would work. However, in this issue, we were trying to address the problem more from the framework perspective. That is, even though recordreaders might not have the logic to determine whether a particular record is corrupt, the framework can do it with some help from the reader.
> why not, simply, let the record reader skip the bad record(s) [ ... ] ?
If the record reader can identify bad records and skip them, then the framework need not get involved: the RecordReader iteself can catch exceptions that it knows might be thrown by bad records and then try to read the next. > That is, even though recordreaders might not have the logic to determine whether a particular record is corrupt, the framework can do it with
some help from the reader > If the record reader can identify bad records and skip them, then the framework need not get involved in the beginning of this jira - it was mentioned that problems from recordreader.next() were also covered in this jira. I take it from these comments - that this is not the case any more. it seems to me that if a recordreader can skip a record reliably (which is the support required from record readers from this jira) - then it will also be able to avoid throwing exceptions (since quite obviously, it can catch any exceptions and invoke the logic to skip to the next record within the next() body). just wanted to make sure since this jira was mentioned as something that might preempt 3144 (putting basic corruption detection code in the linerecordreader) - but that doesn't seem like the case ..
I didn't mean that (not sure about Doug). Today, the way map tasks work is that the recordreader's next method is invoked for each record. So if the implementation of next can handle corrupt records, then the framework doesn't need to get involved. However, if the implementation of next is not capable of handling bad records, then it is most likely going to throw an exception, and, the proposal in this jira for handling bad input starts with that assumption. The framework catches that exception and the recordreader's new interface (look at points 1, 2, and, 3 in http://issues.apache.org/jira/browse/HADOOP-153?focusedCommentId=12574404#action_12574404 At this point of time, since we don't have a patch for this jira, I don't see this jira preempting i am missing something .. whatever the framework can do using the recordreader's new interfaces - the recordreader can do itself. If the recordreader can define record boundaries and skip to the next record - then it can catch any and all exceptions that it might throw and ignore the bad record and move to the next one without involving the framework.
sorry if i am being totally obtuse here - just not getting it .. coversely, if the recordreader can avoid throwing exceptions - then u don't really need new api's from the recordreader. u can just void calling map()/reduce() on specific records that had a problem.
one small implementation request - one of the things i have noticed (based on some profiling) is that a lot of processing overhead (in the map side) in hadoop comes from function calls that are invoked for every processed record. it would be nice if this jira does not lead to more of the same. considering that this is not a normal case - it might make sense to have different code paths for the first and subsequent retries (so that in the normal case - we don't incur overhead looking for records to be skipped). In our user case, when re-execute, we do not want to skip those killer records. Instead, in our user-supplied mapper we want to be able to identify them such that we can take a different code path to specialize them. So, we would like to see during re-executing, the records are marked by a flag indicating they are killer records in the previous runs. Or any alternatives that could give us the same information would help.
Yiping, the problem is that if the recordreader cannot read (deserialize) we can't do much. We could write the offending records out to a well known location (some place in the dfs for example) that you can later inspect.
Joydeep, I buy your argument that all these can be done in the recordreader implementation. So, maybe we should re-scope this jira to only consider failed invocations of map/reduce methods. Devaraj,
After some discussion, we think that output the killer records somewhere and let the good records pass is an acceptable solution. It would be better if they are collected onto DFS and put into a flat directory such that we can run a job against it easily. And we also want to know what is the roadmap for this ticket? I suggest a transactional emit() interface to be supplement to this proposal.
When an exception happens, there might already be some partial output be emitted from the mapper. In some use cases, such partial output should be considered garbage and harmful. So I suggest a transactional emit mode, such that the Collector should buffer the output records from mapper and until the mappers calls a commit() operation, they are actually submitted. At the beginning of the map() function, the mapper should call a start() operation which discards any records remaining in the Collector. There is a problem with respect to pipes, in that multiple input records can be in the subject process's stomach when the crash occurs so there's no principled way to say how many of the records preceding the one that was read just before the exception should be regarded as having been suspect. I get that the plan includes that being a configuration variable, but perhaps there could be a way for the pipes interface to say that a particular record is beyond reproach?
-dk So here are some thoughts, after some discussion with others, on how to handle app level faults. Comments welcome.
Java Maps Streaming Pipes Regarding skipping records wrt streaming and pipe: Based on above comments, I am summarizing what IMO could be an overall approach:
There could be following error scenarios: For supporting the above error scenarios, here is what could be done: 2 Have Task periodically send to the TaskTracker, the record range which it would going to process next. Range -> R(i, j). If the Task crashes, then the last received range is written to SKIPPED_RANGES. 3. Have Task periodically send the skipped records. This is the list for which it has caught exception and skipped since the last send. The TaskTracker will write these to the SKIPPED_RECORDS list. 4. Whenever a Task starts, it will look for SKIPPED_RECORDS and SKIPPED_RANGES from any previous run and will skip those while executing. At the end Task will try to run SKIPPED_RANGES on best-effort basis. 5. Have some job level thresholds like TOLERABLE_SKIPPED_PERCENT. When all the remaining task are trying to execute SKIPPED_RANGES, check for this threshold and if cumulative SKIPPED_RANGES for all remaining tasks are less than this threshold, then finish the job gracefully. 6. Executing SKIPPED_RANGES: execute by dividing a range into half. If a particular range succeeds then try from another range. In first pass, all the ranges are trimmed to half. In second pass, all ranges are trimmed to 1/4, and so on. This will continue till TOLERABLE_SKIPPED_PERCENT is not met. 7. Pipes: For identifying record range, the protocol has to be modified to figure out the processed records. 8. Streaming: The streaming process can write the processed record no to the stderr as a framework counter. See Thoughts? I am attaching the patch I have developed several weeks ago for this issue. It is just an illustrative example, but I guess the final patch might rely heavily on this.
It captures several of the above items : 1. Tasks have an associated badrecords list, which are kept in the JT's TaskInProgress objects. I do not think we should skip "ranges" of possibly valid records in favor of skipping bad ones. 2. bad records are pushed / pulled via Reporter / UmblicalProtocol / TaskStatus 3. configurable max number of skippable records per map and reduce. Percentages do not apply here, since we do not know the absolute number of records to compute the percentage. 4. No support for pipes / streaming. For the pipes case, I think we should first run the tasks normally, until an error occurs. Upon the subsequent run of the same task, then the pipes application should run in "debug" mode. In debug mode, the application should send back the key(or recordID) before it is passed to the mapper/reducer. For streaming, again the application should run in debug mode on second trial. In this mode, the framework should send the key/value pairs one-by-one, meaning that it will not send another pair over the pipe, before the previous are consumed by the application. But as far as I know, there is no mechanism to detect this. So we might opt for delaying this to another issue. Instead of keeping bad records ids in JobTracker's TIP object, I would prefer keeping it in FileSystem along with other job files. This would allows us to be more scalable as no of bad records could potentially be high.
I think the number of bad records supposed to be low. If the job fails in lots of records, then there should be some problem with the implementation itself. I assume there would be use cases in which a higher bad records percentage could be acceptable say jobs related to log mining, crawling, indexing, analysis for research purposes etc.
That said, even 1 % of acceptable bad records could turn out to be a big no given that hadoop jobs are generally suitable for huge data. Piggybacking on jobtracker's memory for this may not be a good idea. Transferring the whole list over rpc would not be ideal. Also, the need to persist may arise due to Had an offline discussion with Eric and Devaraj, and we came up with following:
Sharad,
I would suggest bad records to be written onto DFS. The reason is not for expecting high percentage of bad records, but for debugging/analysis purpose. As user would expect to collect those bad boys and analyze them. Here is the initial patch, intended for early feedback. It does the core work of maintaining the failed ranges and skipping those on further executions. Few test cases have also been included.
Although the basic functionality is working, the patch is still incomplete. Things remaining are:
Attaching the patch. It adds the support for streaming and pipes. Also it includes a test case for streaming.
The work is in progress. Other items as mentioned in above comment are remaining. The basic skip functionality including streaming/pipes is working in the last submitted patch.
For the remaining items, had an offline discussion with Devaraj: 1. Writing the skipped records: Saving the offsets seems to be non-trivial and it will make sense only for FileSplits and map tasks. So we are thinking of the strategy to write the skipped records during Task execution itself. Task would write the skipped records locally to the disk while it executes. If the task attempt is last, it will flush the skipped records to the DFS, after it pases thru all the seen bad ranges. One drawback is that if there is a new bad range in the last attempt and task fails, the records for the range in which last attempt has failed, are not written. However, user can increase the no of attempts and can get to this bad range. 2. When this special (skipping) mode kicks off (after the 2nd attempt by default), disable the task speculation, as in this special mode it is already expected to run slower than normal. 3. Define a user configurable no MAX_SKIPPED_RECORDS-> acceptable skipped records in the neighborhood of a bad record. By default this no would be Long.MAX_VALUE. If skipped range is greater than MAX_SKIPPED_RECORDS, the task will try to narrow down the skipped range by doing a binary search during task re-executions till MAX_SKIPPED_RECORDS threshold is met or all task attempts are exhausted. thoughts/suggestions ? I am concerned that this further complicates the already complicated JobTracker and TaskTracker. Ideally we could layer this as a user library. Even if that's not possible today, we should structure it so that we might transition it to such an implementation. So, rather than adding more JobConf methods, perhaps its configuration should be through static accessor methods on a SkipBadRecords class?
Might it be possible to implement this through the filesystem? A MapRunnable implementation could, when it catches exceptions, record the bad record location to a task/attempt directory. Then, on startup, the MapRunnable could list task/* files, to find all bad record locations generated by prior invocations. Could that work? It shouldn't actually generate that many files, since most tasks should not have errors. It would generate one extra listFiles call to the namenode per task attempt, which doesn't seem too bad. Seems I haven't clearly mentioned, this Jira we scoped for handling Task crashes/hangs as that seems to be more pressing requirement from users. For handling java exceptions, have another JIRA -
>I am concerned that this further complicates the already complicated JobTracker and TaskTracker. >rather than adding more JobConf methods, perhaps its configuration should be through static accessor methods on a SkipBadRecords class? > Might it be possible to implement this through the filesystem? > Since we are handling task crashes, the tasktracker and jobtracker have to get involved at some level, no?
You're right, at least the TaskTracker needs to be involved, if it's to capture crashed processes, and the JobTracker if it's to handle machine crashes. Sigh. A few issues with the patch:
I'm nervous about adding extra NN operations per task. That is going to make scaling to large clusters more expensive. I'd really like to find another solution if possible. We just took out a bunch of NN work per task due to performance problems! (task promotion stuff)
> I'm nervous about adding extra NN operations per task.
current solution doesn't add any NN operations. It works based on storing failed ranges in TIP. Incorporated Doug's review comments.
We can have this Jira to address the base skipping functionality. I have created two dependent Jiras
disabling the speculation if skip mode is active
-1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12386874/153_4.patch against trunk revision 679845. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 9 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 3 new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2953/testReport/ This message is automatically generated. +1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12387079/153_5.patch against trunk revision 680577. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 9 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2968/testReport/ This message is automatically generated. Sigh. Sorry it's taken so long for me to get back to this. The patch has now gone stale! My bad.
It looks much better. The configuration parameters controlling skipping should all start with "mapred.skip". Private constants should probably be defined for these, since they're referred to more than once. That way the compiler will check that they're all spelled correctly. The term "isSkipModeKickedOff" might instead be just "inSkipMode" or simply "isSkipping", and "skip.mode.kicked.off" might be "mapred.skip.on" or somesuch. It would also be good to add javadoc to the new public Task methods and to SortedRange's public methods. We don't (yet) publish the javadoc for Task, but it is a pretty central class and deserves to be well documented. In general, public methods in public classes should have javadoc. Sometimes classes (like Task) which didn't used to be public are made public, and lots of their methods are then missing javadoc, but that's not an excuse to continue the practice. And it never hurts to have javadoc, even for non-public methods in non-public classes... updated to trunk
incorporated Doug's comments. changed SortedRanges.Range to store (startIndex,length) instead of (startIndex,endIndex) cleaned up Test classes. Removed redundant test cases.
other minor clean up/better variable names. +1 overall. Here are the results of testing the latest attachment
http://issues.apache.org/jira/secure/attachment/12387626/153_7.patch against trunk revision 682978. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 9 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3016/testReport/ This message is automatically generated. Doug, could you please review this one last time? Thanks!
no changes, other than wrapping the lines exceeding 80 chars.
Sharad, sorry for the late comment - one small change I would like to see - the updates to the counters MAP_PROCESSED_RECORDS and REDUCE_PROCESSED_RECORDS should happen in the test/skip mode only.
Incorporated Devaraj's comment.
I just committed this. Thanks, Sharad!
Integrated in Hadoop-trunk #581 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/581/
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
This would be a cool feature to have. Perhaps the exceptions should also be made visible at the jobtracker. An extension to manage exceptions in RecordWriter.write() could record the offending key(s) and skip them the next time the task is run.