Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.2.0
    • Fix Version/s: 0.19.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Reviewed
    • Release Note:
      Introduced record skipping where tasks fail on certain records. (org.apache.hadoop.mapred.SkipBadRecords)

      Description

      MapReduce should skip records that throw exceptions.

      If the exception is thrown under RecordReader.next() then RecordReader implementations should automatically skip to the start of a subsequent record.

      Exceptions in map and reduce implementations can simply be logged, unless they happen under RecordWriter.write(). Cancelling partial output could be hard. So such output errors will still result in task failure.

      This behaviour should be optional, but enabled by default. A count of errors per task and job should be maintained and displayed in the web ui. Perhaps if some percentage of records (>50%?) result in exceptions then the task should fail. This would stop jobs early that are misconfigured or have buggy code.

      Thoughts?

      1. 153_1.patch
        37 kB
        Sharad Agarwal
      2. 153_2.patch
        57 kB
        Sharad Agarwal
      3. 153_3.patch
        63 kB
        Sharad Agarwal
      4. 153_4.patch
        63 kB
        Sharad Agarwal
      5. 153_5.patch
        63 kB
        Sharad Agarwal
      6. 153_6.patch
        68 kB
        Sharad Agarwal
      7. 153_7.patch
        68 kB
        Sharad Agarwal
      8. 153_8.patch
        68 kB
        Sharad Agarwal
      9. 153_9.patch
        68 kB
        Sharad Agarwal
      10. skipRecords_wip1.patch
        30 kB
        Enis Soztutar

        Issue Links

          Activity

          Hide
          Sameer Paranjpye added a comment -

          +1

          This would be a cool feature to have. Perhaps the exceptions should also be made visible at the jobtracker. An extension to manage exceptions in RecordWriter.write() could record the offending key(s) and skip them the next time the task is run.

          Show
          Sameer Paranjpye added a comment - +1 This would be a cool feature to have. Perhaps the exceptions should also be made visible at the jobtracker. An extension to manage exceptions in RecordWriter.write() could record the offending key(s) and skip them the next time the task is run.
          Hide
          stack added a comment -

          +1

          This would be a generalization of the checksum handler that tries to skip records when 'io.skip.checksum.errors' is set. In a recent note on the list, I speculate that there are a class of IOExceptions that should be treatable in the same manner – skipped if a flag is set (Search subject line ''Corrupt GZIP trailer' and other irrecoverable IOEs.').

          Show
          stack added a comment - +1 This would be a generalization of the checksum handler that tries to skip records when 'io.skip.checksum.errors' is set. In a recent note on the list, I speculate that there are a class of IOExceptions that should be treatable in the same manner – skipped if a flag is set (Search subject line ''Corrupt GZIP trailer' and other irrecoverable IOEs.').
          Hide
          eric baldeschwieler added a comment -

          sounds good. The acceptable % should probably be configurable. I'd be inclined to use something more like 1%. You could turn the feature off by configuring 0%, which should arguably be the default.

          Show
          eric baldeschwieler added a comment - sounds good. The acceptable % should probably be configurable. I'd be inclined to use something more like 1%. You could turn the feature off by configuring 0%, which should arguably be the default.
          Hide
          Runping Qi added a comment -

          +1

          Exceptions in the map and reduce functions that are implemented by the user should be handled by the user within the functions.
          In the current implementation of sequencial file record reader, it is hard to skip to the next record if exception happens during record reading.

          Show
          Runping Qi added a comment - +1 Exceptions in the map and reduce functions that are implemented by the user should be handled by the user within the functions. In the current implementation of sequencial file record reader, it is hard to skip to the next record if exception happens during record reading.
          Hide
          Devaraj Das added a comment -

          Am starting to look at this issue. I am tending to handle this by only keeping track of how RecordReader.next behaves. This will take care of all cases - Java, Streaming, and Pipes in a generic manner. I am not very much in favor of providing hooks in the applications. Is that a requirement? Any other requirement? Thoughts welcome.

          Show
          Devaraj Das added a comment - Am starting to look at this issue. I am tending to handle this by only keeping track of how RecordReader.next behaves. This will take care of all cases - Java, Streaming, and Pipes in a generic manner. I am not very much in favor of providing hooks in the applications. Is that a requirement? Any other requirement? Thoughts welcome.
          Hide
          Doug Cutting added a comment -

          > I am tending to handle this by only keeping track of how RecordReader.next behaves.

          I'm not sure what you mean by that.

          There are two kinds of places where user code might throw per-record exceptions:

          1. under RecordReader#next() or RecordWriter#write(). Depending on the RecordReader/RecordWriter implementation and the exception, it may or may not be possible to call next() or write() again. Either are likely to leave streams mid-object. A reader of binary input might get badly out of sync, and a writer of binary output might generate badly corrupt data. To address this correctly, we either need to change to contracts of next() and write(), or we need to add new methods to re-sync these to object boundaries.
          1. under Mapper#map() or Reducer#reduce(). Exceptions here can be ignored without causing anything worse than data loss. We can safely proceed without worrying about corruption.
          Show
          Doug Cutting added a comment - > I am tending to handle this by only keeping track of how RecordReader.next behaves. I'm not sure what you mean by that. There are two kinds of places where user code might throw per-record exceptions: under RecordReader#next() or RecordWriter#write(). Depending on the RecordReader/RecordWriter implementation and the exception, it may or may not be possible to call next() or write() again. Either are likely to leave streams mid-object. A reader of binary input might get badly out of sync, and a writer of binary output might generate badly corrupt data. To address this correctly, we either need to change to contracts of next() and write(), or we need to add new methods to re-sync these to object boundaries. under Mapper#map() or Reducer#reduce(). Exceptions here can be ignored without causing anything worse than data loss. We can safely proceed without worrying about corruption.
          Hide
          Devaraj Das added a comment -

          Doug, I was just trying to summarize the high level requirement. Thanks for the detailed inputs.

          Show
          Devaraj Das added a comment - Doug, I was just trying to summarize the high level requirement. Thanks for the detailed inputs.
          Hide
          Enis Soztutar added a comment -

          A major use case is that the user implementation of the map or reduce(or third party libraries) can throw StackOverflow or OOM. In the latter case there is no guarantee that the program will be stable even if it catches the OOM. So a method will be good if the framework keeps track of the records, terminates the execution of the task and skips them in the rescheduled run.

          Show
          Enis Soztutar added a comment - A major use case is that the user implementation of the map or reduce(or third party libraries) can throw StackOverflow or OOM. In the latter case there is no guarantee that the program will be stable even if it catches the OOM. So a method will be good if the framework keeps track of the records, terminates the execution of the task and skips them in the rescheduled run.
          Hide
          Devaraj Das added a comment -

          Here is a proposal that tries to address the scenarios discussed in this jira:
          0) Define the concept of a failed record number that is set by Tasks and propagated to the JobTracker on task failures. This becomes part of the TIP object at the JobTracker.
          1) Define an API in the RecordReader to do with getting the record boundary. On getting an exception in RecordReader.next, the task starts from the beginning of the last successfully read record till the boundary and reads the next record from that point (ignoring the boundary bytes). Applies to maps.
          2) Define an API in RecordWriter to do with writing record boundary along with every write(k,v). The record boundary can default to the sync bytes. Tasks fail when they get an exception while writing a bad record. With (0), in the subsequent retries, the records can be skipped. This applies to outputs of maps and reduces.
          3) Define an API in RecordReader to do with whether we want to have recovery while reading records on not (useful for e.g. if the RecordReader has side effects in the next() method that would affect the reading of the subsequent record if there was an exception for the current record).

          In cases of applications throwing exceptions in the map/reduce methods, the exception is caught by the Task method, which invoked the map/reduce method. The task attempt is killed after noting the record number of the failed invocation. With the above point (0), these records are not fed to the m/r methods in the subsequent retries.

          The recovery/skip above is done on a best effort basis. That is, the worst case is that tasks fail!

          The above strategies should at least allow configuring the max % of records that will keep the recovery/skip cycle going before the job is declared as having failed. Also, the "skip records on re-execution" should probably be configurable on a per job basis by the user (since the exceptions could have been caused due to reasons other than incorrect input).

          Thoughts?

          Show
          Devaraj Das added a comment - Here is a proposal that tries to address the scenarios discussed in this jira: 0) Define the concept of a failed record number that is set by Tasks and propagated to the JobTracker on task failures. This becomes part of the TIP object at the JobTracker. 1) Define an API in the RecordReader to do with getting the record boundary. On getting an exception in RecordReader.next, the task starts from the beginning of the last successfully read record till the boundary and reads the next record from that point (ignoring the boundary bytes). Applies to maps. 2) Define an API in RecordWriter to do with writing record boundary along with every write(k,v). The record boundary can default to the sync bytes. Tasks fail when they get an exception while writing a bad record. With (0), in the subsequent retries, the records can be skipped. This applies to outputs of maps and reduces. 3) Define an API in RecordReader to do with whether we want to have recovery while reading records on not (useful for e.g. if the RecordReader has side effects in the next() method that would affect the reading of the subsequent record if there was an exception for the current record). In cases of applications throwing exceptions in the map/reduce methods, the exception is caught by the Task method, which invoked the map/reduce method. The task attempt is killed after noting the record number of the failed invocation. With the above point (0), these records are not fed to the m/r methods in the subsequent retries. The recovery/skip above is done on a best effort basis. That is, the worst case is that tasks fail! The above strategies should at least allow configuring the max % of records that will keep the recovery/skip cycle going before the job is declared as having failed. Also, the "skip records on re-execution" should probably be configurable on a per job basis by the user (since the exceptions could have been caused due to reasons other than incorrect input). Thoughts?
          Hide
          Devaraj Das added a comment -

          I should have thought about Pipes and Streaming apps. In order to reliably reexecute Pipes Map tasks, we probably need to change the corresponding protocol to tell the sequence number of the record for which it is emitting the (keys,values). It's not clear how to handle the Streaming tasks since the protocol for Streaming can't be tweaked in a generic manner.

          Show
          Devaraj Das added a comment - I should have thought about Pipes and Streaming apps. In order to reliably reexecute Pipes Map tasks, we probably need to change the corresponding protocol to tell the sequence number of the record for which it is emitting the (keys,values). It's not clear how to handle the Streaming tasks since the protocol for Streaming can't be tweaked in a generic manner.
          Hide
          Owen O'Malley added a comment -

          This sounds like roughly the right direction.

          Maybe we should extend diagnostic info to contain a record number (or offset?) and use that to record the error that was thrown? For errors in RecordReader.next, you are right that it probably makes sense to skip ahead to the next sync marker/newline. Since the meta data is equally at risk as the record data. So having a "gotoSync" in RecordReader would help.

          Mappers and Reducers should be able to tolerate killer records, just like the reader.

          We should think about what is required for the RecordWriter to recover from a failed write... Of course I suspect there aren't any good answers until HDFS has truncate and append.

          Show
          Owen O'Malley added a comment - This sounds like roughly the right direction. Maybe we should extend diagnostic info to contain a record number (or offset?) and use that to record the error that was thrown? For errors in RecordReader.next, you are right that it probably makes sense to skip ahead to the next sync marker/newline. Since the meta data is equally at risk as the record data. So having a "gotoSync" in RecordReader would help. Mappers and Reducers should be able to tolerate killer records, just like the reader. We should think about what is required for the RecordWriter to recover from a failed write... Of course I suspect there aren't any good answers until HDFS has truncate and append.
          Hide
          Devaraj Das added a comment -

          Mappers and Reducers should be able to tolerate killer records, just like the reader.

          As I mentioned earlier, for the reader, I have an API in RecordReader for querying whether the RecordReader is side-effect free. If the RecordReader is not side-effect free (for e.g., if the next method implementation has issues to do with proceeding further if the current record read throws an exception), then the task is declared as having failed (and the offending record is skipped in the task retry).
          If we were to support the same strategy for Mapper/Reducer, we need to consider a similar problem. Also, since Mapper/Reducer can throw exceptions due to problems not necessarily to do with bad input, we probably need from the user, the info whether to continue with map/reduce method invocations in the event of exceptions. In addition, the user can also mention the fatal exceptions which should result in the task failure (e.g. OOM).
          Makes sense?

          Show
          Devaraj Das added a comment - Mappers and Reducers should be able to tolerate killer records, just like the reader. As I mentioned earlier, for the reader, I have an API in RecordReader for querying whether the RecordReader is side-effect free. If the RecordReader is not side-effect free (for e.g., if the next method implementation has issues to do with proceeding further if the current record read throws an exception), then the task is declared as having failed (and the offending record is skipped in the task retry). If we were to support the same strategy for Mapper/Reducer, we need to consider a similar problem. Also, since Mapper/Reducer can throw exceptions due to problems not necessarily to do with bad input, we probably need from the user, the info whether to continue with map/reduce method invocations in the event of exceptions. In addition, the user can also mention the fatal exceptions which should result in the task failure (e.g. OOM). Makes sense?
          Hide
          Enis Soztutar added a comment -

          Honestly, i think bringing extra complexity (especially RecordReader API change, which will break lots of legacy code) is not justified to solve this issue. To be more explicit, the framework does not need to "continue from where it was" once a task throws an exception. I think we can just do step 0 (as defined above) and re-execute the task from the beginning, but this time skipping the problematic record. It is expected that resuming the execution from where we left and completely re-executing the task will not make a big difference, since the number of tasks is assumed to be high enough.

          I propose
          1. Define the concept of a failed record number that is set by Tasks and propagated to the JobTracker on task failures. This becomes part of the TIP object at the JobTracker.
          2. Define an API in JobConf and a configuration item in hadoop-site to [dis]allow skipping records. (can be merged with 3 below)
          3. Define an API in JobConf and a configuration item in hadoop-site to set max number(or percentage) of records that can be skipped
          4. Do not change RecordReader/Writer interfaces.
          5. The recovery/skip above is done on a best effort basis. That is, the worst case is that tasks fail!

          So this functionality will be completely transparent to the application code, except for setting the configuration. If the user code is not side-effect free, than the user has to deal with that, because already the tasks can be executed more than once for various reasons(for example speculative execution).

          Any thoughts ?

          Show
          Enis Soztutar added a comment - Honestly, i think bringing extra complexity (especially RecordReader API change, which will break lots of legacy code) is not justified to solve this issue. To be more explicit, the framework does not need to "continue from where it was" once a task throws an exception. I think we can just do step 0 (as defined above) and re-execute the task from the beginning, but this time skipping the problematic record. It is expected that resuming the execution from where we left and completely re-executing the task will not make a big difference, since the number of tasks is assumed to be high enough. I propose 1. Define the concept of a failed record number that is set by Tasks and propagated to the JobTracker on task failures. This becomes part of the TIP object at the JobTracker. 2. Define an API in JobConf and a configuration item in hadoop-site to [dis] allow skipping records. (can be merged with 3 below) 3. Define an API in JobConf and a configuration item in hadoop-site to set max number(or percentage) of records that can be skipped 4. Do not change RecordReader/Writer interfaces. 5. The recovery/skip above is done on a best effort basis. That is, the worst case is that tasks fail! So this functionality will be completely transparent to the application code, except for setting the configuration. If the user code is not side-effect free, than the user has to deal with that, because already the tasks can be executed more than once for various reasons(for example speculative execution). Any thoughts ?
          Hide
          Sameer Paranjpye added a comment - - edited

          The order of the number of exceptions that the framework intends to handle affects the design a great deal. What is the scope of the problem we intend to handle here? I see there being two cases:

          1. the number of exceptions is small compared to the number of tasks in a job. In this scenario Enis' strategy makes a lot of sense. In general we assume that tasks are fine grained enough that re-executing a handful of them is not a significant burden in terms of job runtime and throughput.
          2. the number of exceptions is O(num tasks). In this scenario, re-execution could cause job runtime to double (or worse) since every task could in principle be executed two or more times. If we set out to handle this case then we'll need to keep enough state to enable each task to skip the offending record(s) and start where they left off i.e. not re-process any previously handled records

          Perhaps we should attempt to resolve case 1) with task re-execution for now since it represents a useful incremental step towards a more sophisticated solution. It may prove to be sufficient. One could in principle argue that if the number of exceptions is O(num tasks) the problem is better handled at the application level.

          Show
          Sameer Paranjpye added a comment - - edited The order of the number of exceptions that the framework intends to handle affects the design a great deal. What is the scope of the problem we intend to handle here? I see there being two cases: the number of exceptions is small compared to the number of tasks in a job. In this scenario Enis' strategy makes a lot of sense. In general we assume that tasks are fine grained enough that re-executing a handful of them is not a significant burden in terms of job runtime and throughput. the number of exceptions is O(num tasks) . In this scenario, re-execution could cause job runtime to double (or worse) since every task could in principle be executed two or more times. If we set out to handle this case then we'll need to keep enough state to enable each task to skip the offending record(s) and start where they left off i.e. not re-process any previously handled records Perhaps we should attempt to resolve case 1) with task re-execution for now since it represents a useful incremental step towards a more sophisticated solution. It may prove to be sufficient. One could in principle argue that if the number of exceptions is O(num tasks) the problem is better handled at the application level.
          Hide
          Doug Cutting added a comment -

          The tricky bit will be identifying the failed record number, no? The naive approach would be to have the child report after each record has been processed, so that the parent can then know, when it crashes, which record it was on. But that would probably be too expensive.

          So rather we might have the child report in its TaskStatus the range of record numbers it intends to process before it next reports. Then, on failure, the parent knows that skipping that range will skip the offending record. The child can adaptively choose a range that keeps the status update time within reason. It would start by reporting range 0-1, then use the time required to process that entry to determine how big of a range to send the next time and still provide timely updates. If a particular record takes a long time, then it can update status before the range is exhausted, providing a new range.

          Would this be acceptable, to skip a few more than just the offending record?

          Show
          Doug Cutting added a comment - The tricky bit will be identifying the failed record number, no? The naive approach would be to have the child report after each record has been processed, so that the parent can then know, when it crashes, which record it was on. But that would probably be too expensive. So rather we might have the child report in its TaskStatus the range of record numbers it intends to process before it next reports. Then, on failure, the parent knows that skipping that range will skip the offending record. The child can adaptively choose a range that keeps the status update time within reason. It would start by reporting range 0-1, then use the time required to process that entry to determine how big of a range to send the next time and still provide timely updates. If a particular record takes a long time, then it can update status before the range is exhausted, providing a new range. Would this be acceptable, to skip a few more than just the offending record?
          Hide
          Enis Soztutar added a comment -

          The tricky bit will be identifying the failed record number, no? The naive approach would be to have the child report after each record has been processed, so that the parent can then know, when it crashes, which record it was on. But that would probably be too expensive.

          In the original mapreduce paper, the record number is kept in a global variable which is then passed to the master through an UDP package. The master decides that the record is malformed if it sees more than one report for the same record number.

          I think we may be able catch the exception in the Child process and make an IPC to the TT(which in turn reports this to JT). There may be situations in which the IPC will fail, for those tasks we can adopt the above strategy of reporting the record number in the subsequent reexecution of the task to find out exactly which record the computation fails.

          Show
          Enis Soztutar added a comment - The tricky bit will be identifying the failed record number, no? The naive approach would be to have the child report after each record has been processed, so that the parent can then know, when it crashes, which record it was on. But that would probably be too expensive. In the original mapreduce paper, the record number is kept in a global variable which is then passed to the master through an UDP package. The master decides that the record is malformed if it sees more than one report for the same record number. I think we may be able catch the exception in the Child process and make an IPC to the TT(which in turn reports this to JT). There may be situations in which the IPC will fail, for those tasks we can adopt the above strategy of reporting the record number in the subsequent reexecution of the task to find out exactly which record the computation fails.
          Hide
          Devaraj Das added a comment -

          Enis, agree that for the Java tasks case we could get the offending record immediately in the Child process. The problem here is that with things like Pipes apps (where the Java task spawns another child process from within), the record number at which the exception happened is tricky to get since the exception was really encountered in the Pipes process (this doesn't include the exception that we might encounter while reading the input since that happens in the Java parent task and we can catch those immediately).

          Show
          Devaraj Das added a comment - Enis, agree that for the Java tasks case we could get the offending record immediately in the Child process. The problem here is that with things like Pipes apps (where the Java task spawns another child process from within), the record number at which the exception happened is tricky to get since the exception was really encountered in the Pipes process (this doesn't include the exception that we might encounter while reading the input since that happens in the Java parent task and we can catch those immediately).
          Hide
          Joydeep Sen Sarma added a comment -

          hey folks - we are having a discussion on a similar jira (covering a smaller subset of issues) - 3144. we are actually hitting this problem (corrupted records causing OOM) and have a simple workaround specific to our problem.

          but i am a little intrigued by the proposal here. for the recordreader issues - why not, simply, let the record reader skip the bad record(s). as the discussions here mentions - there have to be additional api's in the record reader to be able to skip problematic records. If the framework trusts record readers to be able to skip bad records - why bother re-executing? why not allow them to detect and skip bad records on the very first try. if TT/JT want to keep track and impose a limit on the bad records skipped - they could ask the record reader to report the same through an api.

          the exceptions from map/reduce functions are different - if they make the entire task unstable due to OOM issues then a re-execution makes sense. but if we separate the two issues - we may have a more lightweight way of tolerating pure data corruption/validity issues (as we are trying to in 3144).

          Show
          Joydeep Sen Sarma added a comment - hey folks - we are having a discussion on a similar jira (covering a smaller subset of issues) - 3144. we are actually hitting this problem (corrupted records causing OOM) and have a simple workaround specific to our problem. but i am a little intrigued by the proposal here. for the recordreader issues - why not, simply, let the record reader skip the bad record(s). as the discussions here mentions - there have to be additional api's in the record reader to be able to skip problematic records. If the framework trusts record readers to be able to skip bad records - why bother re-executing? why not allow them to detect and skip bad records on the very first try. if TT/JT want to keep track and impose a limit on the bad records skipped - they could ask the record reader to report the same through an api. the exceptions from map/reduce functions are different - if they make the entire task unstable due to OOM issues then a re-execution makes sense. but if we separate the two issues - we may have a more lightweight way of tolerating pure data corruption/validity issues (as we are trying to in 3144).
          Hide
          Devaraj Das added a comment -

          Yes, if you assume that record readers have the necessary logic to be able to skip bad records on the very first try, it would work. However, in this issue, we were trying to address the problem more from the framework perspective. That is, even though recordreaders might not have the logic to determine whether a particular record is corrupt, the framework can do it with some help from the reader.

          Show
          Devaraj Das added a comment - Yes, if you assume that record readers have the necessary logic to be able to skip bad records on the very first try, it would work. However, in this issue, we were trying to address the problem more from the framework perspective. That is, even though recordreaders might not have the logic to determine whether a particular record is corrupt, the framework can do it with some help from the reader.
          Hide
          Doug Cutting added a comment -

          > why not, simply, let the record reader skip the bad record(s) [ ... ] ?

          If the record reader can identify bad records and skip them, then the framework need not get involved: the RecordReader iteself can catch exceptions that it knows might be thrown by bad records and then try to read the next.

          Show
          Doug Cutting added a comment - > why not, simply, let the record reader skip the bad record(s) [ ... ] ? If the record reader can identify bad records and skip them, then the framework need not get involved: the RecordReader iteself can catch exceptions that it knows might be thrown by bad records and then try to read the next.
          Hide
          Joydeep Sen Sarma added a comment -

          > That is, even though recordreaders might not have the logic to determine whether a particular record is corrupt, the framework can do it with
          some help from the reader

          > If the record reader can identify bad records and skip them, then the framework need not get involved

          in the beginning of this jira - it was mentioned that problems from recordreader.next() were also covered in this jira. I take it from these comments - that this is not the case any more. it seems to me that if a recordreader can skip a record reliably (which is the support required from record readers from this jira) - then it will also be able to avoid throwing exceptions (since quite obviously, it can catch any exceptions and invoke the logic to skip to the next record within the next() body).

          just wanted to make sure since this jira was mentioned as something that might preempt 3144 (putting basic corruption detection code in the linerecordreader) - but that doesn't seem like the case ..

          Show
          Joydeep Sen Sarma added a comment - > That is, even though recordreaders might not have the logic to determine whether a particular record is corrupt, the framework can do it with some help from the reader > If the record reader can identify bad records and skip them, then the framework need not get involved in the beginning of this jira - it was mentioned that problems from recordreader.next() were also covered in this jira. I take it from these comments - that this is not the case any more. it seems to me that if a recordreader can skip a record reliably (which is the support required from record readers from this jira) - then it will also be able to avoid throwing exceptions (since quite obviously, it can catch any exceptions and invoke the logic to skip to the next record within the next() body). just wanted to make sure since this jira was mentioned as something that might preempt 3144 (putting basic corruption detection code in the linerecordreader) - but that doesn't seem like the case ..
          Hide
          Devaraj Das added a comment -

          in the beginning of this jira - it was mentioned that problems from recordreader.next() were also covered in this jira. I take it from these comments - that this is not the case any more. it seems to me that if a recordreader can skip a record reliably (which is the support required from record readers from this jira) - then it will also be able to avoid throwing exceptions (since quite obviously, it can catch any exceptions and invoke the logic to skip to the next record within the next() body).

          I didn't mean that (not sure about Doug). Today, the way map tasks work is that the recordreader's next method is invoked for each record. So if the implementation of next can handle corrupt records, then the framework doesn't need to get involved. However, if the implementation of next is not capable of handling bad records, then it is most likely going to throw an exception, and, the proposal in this jira for handling bad input starts with that assumption. The framework catches that exception and the recordreader's new interface (look at points 1, 2, and, 3 in http://issues.apache.org/jira/browse/HADOOP-153?focusedCommentId=12574404#action_12574404) comes handy to recover from that. For cases like OOM where continuing the current task execution is not safe, the framework notifies the JobTracker about these bad records, and upon the next retry these records are skipped (point 0 in the previously mentioned url)

          At this point of time, since we don't have a patch for this jira, I don't see this jira preempting HADOOP-3144. Although it'd be nice if we have a patch for this jira to handle the general case.

          Show
          Devaraj Das added a comment - in the beginning of this jira - it was mentioned that problems from recordreader.next() were also covered in this jira. I take it from these comments - that this is not the case any more. it seems to me that if a recordreader can skip a record reliably (which is the support required from record readers from this jira) - then it will also be able to avoid throwing exceptions (since quite obviously, it can catch any exceptions and invoke the logic to skip to the next record within the next() body). I didn't mean that (not sure about Doug). Today, the way map tasks work is that the recordreader's next method is invoked for each record. So if the implementation of next can handle corrupt records, then the framework doesn't need to get involved. However, if the implementation of next is not capable of handling bad records, then it is most likely going to throw an exception, and, the proposal in this jira for handling bad input starts with that assumption. The framework catches that exception and the recordreader's new interface (look at points 1, 2, and, 3 in http://issues.apache.org/jira/browse/HADOOP-153?focusedCommentId=12574404#action_12574404 ) comes handy to recover from that. For cases like OOM where continuing the current task execution is not safe, the framework notifies the JobTracker about these bad records, and upon the next retry these records are skipped (point 0 in the previously mentioned url) At this point of time, since we don't have a patch for this jira, I don't see this jira preempting HADOOP-3144 . Although it'd be nice if we have a patch for this jira to handle the general case.
          Hide
          Joydeep Sen Sarma added a comment -

          i am missing something .. whatever the framework can do using the recordreader's new interfaces - the recordreader can do itself. If the recordreader can define record boundaries and skip to the next record - then it can catch any and all exceptions that it might throw and ignore the bad record and move to the next one without involving the framework.

          sorry if i am being totally obtuse here - just not getting it ..

          Show
          Joydeep Sen Sarma added a comment - i am missing something .. whatever the framework can do using the recordreader's new interfaces - the recordreader can do itself. If the recordreader can define record boundaries and skip to the next record - then it can catch any and all exceptions that it might throw and ignore the bad record and move to the next one without involving the framework. sorry if i am being totally obtuse here - just not getting it ..
          Hide
          Joydeep Sen Sarma added a comment -

          coversely, if the recordreader can avoid throwing exceptions - then u don't really need new api's from the recordreader. u can just void calling map()/reduce() on specific records that had a problem.

          one small implementation request - one of the things i have noticed (based on some profiling) is that a lot of processing overhead (in the map side) in hadoop comes from function calls that are invoked for every processed record. it would be nice if this jira does not lead to more of the same. considering that this is not a normal case - it might make sense to have different code paths for the first and subsequent retries (so that in the normal case - we don't incur overhead looking for records to be skipped).

          Show
          Joydeep Sen Sarma added a comment - coversely, if the recordreader can avoid throwing exceptions - then u don't really need new api's from the recordreader. u can just void calling map()/reduce() on specific records that had a problem. one small implementation request - one of the things i have noticed (based on some profiling) is that a lot of processing overhead (in the map side) in hadoop comes from function calls that are invoked for every processed record. it would be nice if this jira does not lead to more of the same. considering that this is not a normal case - it might make sense to have different code paths for the first and subsequent retries (so that in the normal case - we don't incur overhead looking for records to be skipped).
          Hide
          Yiping Han added a comment -

          In our user case, when re-execute, we do not want to skip those killer records. Instead, in our user-supplied mapper we want to be able to identify them such that we can take a different code path to specialize them. So, we would like to see during re-executing, the records are marked by a flag indicating they are killer records in the previous runs. Or any alternatives that could give us the same information would help.

          Show
          Yiping Han added a comment - In our user case, when re-execute, we do not want to skip those killer records. Instead, in our user-supplied mapper we want to be able to identify them such that we can take a different code path to specialize them. So, we would like to see during re-executing, the records are marked by a flag indicating they are killer records in the previous runs. Or any alternatives that could give us the same information would help.
          Hide
          Devaraj Das added a comment -

          Yiping, the problem is that if the recordreader cannot read (deserialize) we can't do much. We could write the offending records out to a well known location (some place in the dfs for example) that you can later inspect.

          Joydeep, I buy your argument that all these can be done in the recordreader implementation. So, maybe we should re-scope this jira to only consider failed invocations of map/reduce methods.

          Show
          Devaraj Das added a comment - Yiping, the problem is that if the recordreader cannot read (deserialize) we can't do much. We could write the offending records out to a well known location (some place in the dfs for example) that you can later inspect. Joydeep, I buy your argument that all these can be done in the recordreader implementation. So, maybe we should re-scope this jira to only consider failed invocations of map/reduce methods.
          Hide
          Yiping Han added a comment -

          Devaraj,

          After some discussion, we think that output the killer records somewhere and let the good records pass is an acceptable solution. It would be better if they are collected onto DFS and put into a flat directory such that we can run a job against it easily. And we also want to know what is the roadmap for this ticket?

          Show
          Yiping Han added a comment - Devaraj, After some discussion, we think that output the killer records somewhere and let the good records pass is an acceptable solution. It would be better if they are collected onto DFS and put into a flat directory such that we can run a job against it easily. And we also want to know what is the roadmap for this ticket?
          Hide
          Yiping Han added a comment -

          I suggest a transactional emit() interface to be supplement to this proposal.

          When an exception happens, there might already be some partial output be emitted from the mapper. In some use cases, such partial output should be considered garbage and harmful. So I suggest a transactional emit mode, such that the Collector should buffer the output records from mapper and until the mappers calls a commit() operation, they are actually submitted. At the beginning of the map() function, the mapper should call a start() operation which discards any records remaining in the Collector.

          Show
          Yiping Han added a comment - I suggest a transactional emit() interface to be supplement to this proposal. When an exception happens, there might already be some partial output be emitted from the mapper. In some use cases, such partial output should be considered garbage and harmful. So I suggest a transactional emit mode, such that the Collector should buffer the output records from mapper and until the mappers calls a commit() operation, they are actually submitted. At the beginning of the map() function, the mapper should call a start() operation which discards any records remaining in the Collector.
          Hide
          Dick King added a comment -

          There is a problem with respect to pipes, in that multiple input records can be in the subject process's stomach when the crash occurs so there's no principled way to say how many of the records preceding the one that was read just before the exception should be regarded as having been suspect. I get that the plan includes that being a configuration variable, but perhaps there could be a way for the pipes interface to say that a particular record is beyond reproach?

          -dk

          Show
          Dick King added a comment - There is a problem with respect to pipes, in that multiple input records can be in the subject process's stomach when the crash occurs so there's no principled way to say how many of the records preceding the one that was read just before the exception should be regarded as having been suspect. I get that the plan includes that being a configuration variable, but perhaps there could be a way for the pipes interface to say that a particular record is beyond reproach? -dk
          Hide
          Devaraj Das added a comment -

          So here are some thoughts, after some discussion with others, on how to handle app level faults. Comments welcome.

          Java Maps
          In this case, we can immediately know which record couldn't be processed and depending on the type of exception that the method threw we can decide to continue or not (the user can tell us which exceptions are fatal; we could also have a couple of defaults like OOM). If we decide to not continue, the task can be reexecuted in the same tasktracker slot and this time that record is skipped. In order to know which record should be skipped in the reexecution, the task as part of the progress/ping RPC tells the TaskTracker the record number of the last successfully processed record and the set of bad record numbers is passed to the task upon reexecution and the task simply skips those for processing.

          Streaming
          In this case, the Java parent notifies the TaskTracker what the last successfully processed record is. The "last successfully processed" record in this case refers to the record that was sent to the streaming child process just before the crash was detected. The same TaskTracker then reexecutes the task and this time, the Java task skips that record assuming that that was the one on which the process crashed. If the process crashes even now, it gets reexecuted and this time the Java parent skips the last 2 records. This could go on with every reexecution skipping the last 2*exec-count number of records (where exec-count represents the number of reexecutions). This will give us a range within which the faulty record exists. Upon the first successful reexecution, the TaskTracker passes the range on to the JobTracker and the user can then debug his input and/or the program that processes the input. An alternative strategy is to do a binary-search for the offending record.

          Pipes
          The exact same thing as Streaming applies here too. The one point to note here is this that if we enable the user to tell us whenever it can successfully process a record (similar to the status/progress calls to the Java parent) it would substantially help in the reexecution w.r.t skipping records.

          Show
          Devaraj Das added a comment - So here are some thoughts, after some discussion with others, on how to handle app level faults. Comments welcome. Java Maps In this case, we can immediately know which record couldn't be processed and depending on the type of exception that the method threw we can decide to continue or not (the user can tell us which exceptions are fatal; we could also have a couple of defaults like OOM). If we decide to not continue, the task can be reexecuted in the same tasktracker slot and this time that record is skipped. In order to know which record should be skipped in the reexecution, the task as part of the progress/ping RPC tells the TaskTracker the record number of the last successfully processed record and the set of bad record numbers is passed to the task upon reexecution and the task simply skips those for processing. Streaming In this case, the Java parent notifies the TaskTracker what the last successfully processed record is. The "last successfully processed" record in this case refers to the record that was sent to the streaming child process just before the crash was detected. The same TaskTracker then reexecutes the task and this time, the Java task skips that record assuming that that was the one on which the process crashed. If the process crashes even now, it gets reexecuted and this time the Java parent skips the last 2 records. This could go on with every reexecution skipping the last 2*exec-count number of records (where exec-count represents the number of reexecutions). This will give us a range within which the faulty record exists. Upon the first successful reexecution, the TaskTracker passes the range on to the JobTracker and the user can then debug his input and/or the program that processes the input. An alternative strategy is to do a binary-search for the offending record. Pipes The exact same thing as Streaming applies here too. The one point to note here is this that if we enable the user to tell us whenever it can successfully process a record (similar to the status/progress calls to the Java parent) it would substantially help in the reexecution w.r.t skipping records.
          Hide
          Runping Qi added a comment -

          Regarding skipping records wrt streaming and pipe:
          When a task is re-executed, it can skip N records starting at the detected fail position and processes the rest.
          At the end, the tasl can process thos N skipped records on a "best-effort" basis.

          Show
          Runping Qi added a comment - Regarding skipping records wrt streaming and pipe: When a task is re-executed, it can skip N records starting at the detected fail position and processes the rest. At the end, the tasl can process thos N skipped records on a "best-effort" basis.
          Hide
          Sharad Agarwal added a comment -

          Based on above comments, I am summarizing what IMO could be an overall approach:

          There could be following error scenarios:
          A) The framework can catch the exception thrown by map/reduce function (only applicable for java Tasks). The framework can decide to keep moving after skipping the record, OR if the exception seems to be FATAL like OutOfMemory making the task process unstable, the framework can decide to forgo that Task execution. Re-execution should skip that record.
          B) The task process crashes due to bad record.

          For supporting the above error scenarios, here is what could be done:
          1. Each task would have 2 lists - SKIPPED_RECORDS and SKIPPED_RANGES. Perhaps these could be maintained in DFS.

          2 Have Task periodically send to the TaskTracker, the record range which it would going to process next. Range -> R(i, j). If the Task crashes, then the last received range is written to SKIPPED_RANGES.

          3. Have Task periodically send the skipped records. This is the list for which it has caught exception and skipped since the last send. The TaskTracker will write these to the SKIPPED_RECORDS list.

          4. Whenever a Task starts, it will look for SKIPPED_RECORDS and SKIPPED_RANGES from any previous run and will skip those while executing. At the end Task will try to run SKIPPED_RANGES on best-effort basis.

          5. Have some job level thresholds like TOLERABLE_SKIPPED_PERCENT. When all the remaining task are trying to execute SKIPPED_RANGES, check for this threshold and if cumulative SKIPPED_RANGES for all remaining tasks are less than this threshold, then finish the job gracefully.

          6. Executing SKIPPED_RANGES: execute by dividing a range into half. If a particular range succeeds then try from another range. In first pass, all the ranges are trimmed to half. In second pass, all ranges are trimmed to 1/4, and so on. This will continue till TOLERABLE_SKIPPED_PERCENT is not met.

          7. Pipes: For identifying record range, the protocol has to be modified to figure out the processed records.

          8. Streaming: The streaming process can write the processed record no to the stderr as a framework counter. See HADOOP-1328.
          For streaming process which does not support this feature, we can fall back to the mechanism in which the record range sent always start from the beginning (as we are not sure which ones have been processed yet). Range -> R(0, j). This range is then tried in the end on best effort basis, as described in 6.
          Some optimizations could be done to this approach like instead of starting from begin, have it start based on some job configured no N. For eg. Range -> R(i-N, j). N is the expected no of records in the streaming process' stomach before they are processed. Users can define N for their jobs based on the buffers used in their process. The framework then tries to tune the value of N based on the crashes it encounters in further executions. The algorithm for this can become little complex; and there may not be that much payoff. So I think initially lets have it always skip from start, and optimize this behavior later.

          Thoughts?

          Show
          Sharad Agarwal added a comment - Based on above comments, I am summarizing what IMO could be an overall approach: There could be following error scenarios: A) The framework can catch the exception thrown by map/reduce function (only applicable for java Tasks). The framework can decide to keep moving after skipping the record, OR if the exception seems to be FATAL like OutOfMemory making the task process unstable, the framework can decide to forgo that Task execution. Re-execution should skip that record. B) The task process crashes due to bad record. For supporting the above error scenarios, here is what could be done: 1. Each task would have 2 lists - SKIPPED_RECORDS and SKIPPED_RANGES. Perhaps these could be maintained in DFS. 2 Have Task periodically send to the TaskTracker, the record range which it would going to process next. Range -> R(i, j). If the Task crashes, then the last received range is written to SKIPPED_RANGES. 3. Have Task periodically send the skipped records. This is the list for which it has caught exception and skipped since the last send. The TaskTracker will write these to the SKIPPED_RECORDS list. 4. Whenever a Task starts, it will look for SKIPPED_RECORDS and SKIPPED_RANGES from any previous run and will skip those while executing. At the end Task will try to run SKIPPED_RANGES on best-effort basis. 5. Have some job level thresholds like TOLERABLE_SKIPPED_PERCENT. When all the remaining task are trying to execute SKIPPED_RANGES, check for this threshold and if cumulative SKIPPED_RANGES for all remaining tasks are less than this threshold, then finish the job gracefully. 6. Executing SKIPPED_RANGES: execute by dividing a range into half. If a particular range succeeds then try from another range. In first pass, all the ranges are trimmed to half. In second pass, all ranges are trimmed to 1/4, and so on. This will continue till TOLERABLE_SKIPPED_PERCENT is not met. 7. Pipes: For identifying record range, the protocol has to be modified to figure out the processed records. 8. Streaming: The streaming process can write the processed record no to the stderr as a framework counter. See HADOOP-1328 . For streaming process which does not support this feature, we can fall back to the mechanism in which the record range sent always start from the beginning (as we are not sure which ones have been processed yet). Range -> R(0, j). This range is then tried in the end on best effort basis, as described in 6. Some optimizations could be done to this approach like instead of starting from begin, have it start based on some job configured no N. For eg. Range -> R(i-N, j). N is the expected no of records in the streaming process' stomach before they are processed. Users can define N for their jobs based on the buffers used in their process. The framework then tries to tune the value of N based on the crashes it encounters in further executions. The algorithm for this can become little complex; and there may not be that much payoff. So I think initially lets have it always skip from start, and optimize this behavior later. Thoughts?
          Hide
          Enis Soztutar added a comment -

          I am attaching the patch I have developed several weeks ago for this issue. It is just an illustrative example, but I guess the final patch might rely heavily on this.
          It captures several of the above items :
          1. Tasks have an associated badrecords list, which are kept in the JT's TaskInProgress objects. I do not think we should skip "ranges" of possibly valid records in favor of skipping bad ones.
          2. bad records are pushed / pulled via Reporter / UmblicalProtocol / TaskStatus
          3. configurable max number of skippable records per map and reduce. Percentages do not apply here, since we do not know the absolute number of records to compute the percentage.
          4. No support for pipes / streaming.

          For the pipes case, I think we should first run the tasks normally, until an error occurs. Upon the subsequent run of the same task, then the pipes application should run in "debug" mode. In debug mode, the application should send back the key(or recordID) before it is passed to the mapper/reducer.

          For streaming, again the application should run in debug mode on second trial. In this mode, the framework should send the key/value pairs one-by-one, meaning that it will not send another pair over the pipe, before the previous are consumed by the application. But as far as I know, there is no mechanism to detect this. So we might opt for delaying this to another issue.

          Show
          Enis Soztutar added a comment - I am attaching the patch I have developed several weeks ago for this issue. It is just an illustrative example, but I guess the final patch might rely heavily on this. It captures several of the above items : 1. Tasks have an associated badrecords list, which are kept in the JT's TaskInProgress objects. I do not think we should skip "ranges" of possibly valid records in favor of skipping bad ones. 2. bad records are pushed / pulled via Reporter / UmblicalProtocol / TaskStatus 3. configurable max number of skippable records per map and reduce. Percentages do not apply here, since we do not know the absolute number of records to compute the percentage. 4. No support for pipes / streaming. For the pipes case, I think we should first run the tasks normally, until an error occurs. Upon the subsequent run of the same task, then the pipes application should run in "debug" mode. In debug mode, the application should send back the key(or recordID) before it is passed to the mapper/reducer. For streaming, again the application should run in debug mode on second trial. In this mode, the framework should send the key/value pairs one-by-one, meaning that it will not send another pair over the pipe, before the previous are consumed by the application. But as far as I know, there is no mechanism to detect this. So we might opt for delaying this to another issue.
          Hide
          Sharad Agarwal added a comment -

          Instead of keeping bad records ids in JobTracker's TIP object, I would prefer keeping it in FileSystem along with other job files. This would allows us to be more scalable as no of bad records could potentially be high.

          Show
          Sharad Agarwal added a comment - Instead of keeping bad records ids in JobTracker's TIP object, I would prefer keeping it in FileSystem along with other job files. This would allows us to be more scalable as no of bad records could potentially be high.
          Hide
          Enis Soztutar added a comment -

          Instead of keeping bad records ids in JobTracker's TIP object, I would prefer keeping it in FileSystem along with other job files. This would allows us to be more scalable as no of bad records could potentially be high.

          I think the number of bad records supposed to be low. If the job fails in lots of records, then there should be some problem with the implementation itself.
          I guess the number of bad records per job should be on the order of hundreds max. While working with nutch, there were just several records which caused StackOverflow by the third-party html parsers ,out of millions of urls.
          Can you give us an example percentage of bad records that you see in the jobs at Y!

          Show
          Enis Soztutar added a comment - Instead of keeping bad records ids in JobTracker's TIP object, I would prefer keeping it in FileSystem along with other job files. This would allows us to be more scalable as no of bad records could potentially be high. I think the number of bad records supposed to be low. If the job fails in lots of records, then there should be some problem with the implementation itself. I guess the number of bad records per job should be on the order of hundreds max. While working with nutch, there were just several records which caused StackOverflow by the third-party html parsers ,out of millions of urls. Can you give us an example percentage of bad records that you see in the jobs at Y!
          Hide
          Sharad Agarwal added a comment -

          I assume there would be use cases in which a higher bad records percentage could be acceptable say jobs related to log mining, crawling, indexing, analysis for research purposes etc.
          That said, even 1 % of acceptable bad records could turn out to be a big no given that hadoop jobs are generally suitable for huge data. Piggybacking on jobtracker's memory for this may not be a good idea. Transferring the whole list over rpc would not be ideal.
          Also, the need to persist may arise due to HADOOP-3245.

          Show
          Sharad Agarwal added a comment - I assume there would be use cases in which a higher bad records percentage could be acceptable say jobs related to log mining, crawling, indexing, analysis for research purposes etc. That said, even 1 % of acceptable bad records could turn out to be a big no given that hadoop jobs are generally suitable for huge data. Piggybacking on jobtracker's memory for this may not be a good idea. Transferring the whole list over rpc would not be ideal. Also, the need to persist may arise due to HADOOP-3245 .
          Hide
          Sharad Agarwal added a comment - - edited

          Had an offline discussion with Eric and Devaraj, and we came up with following:

          • Let this issue handle the case of crashes and hangups. For the case of catching the exception for Java tasks, filed another Jira -> HADOOP-3700
          • Design gets impacted based on the assumption of how frequent the failures would be. At this point of time, design for INFREQUENT failures. This would simplify the design. Also, bad records can be maintained by the Jobtracker (as pointed out by Enis), as no of bad records are expected to be quite low.
          • Failing Fast the bad jobs is very crucial to avoid wasting Grid resources. Thresholds should be define in such a way that we identify bad jobs early enough, say maximum of 10% of the maps can fail. Also, we need to make sure that we execute failed task VERY FAST.
          • Apart from bad data, Task crashes could be due to bad user code (like Out of memory) or bad nodes. To isolate these cases, on failure, reexecute on another node as now. If it fails AGAIN, then reexecute a third time, this time in the special mode where we report every record completion to the Task tracker.
          • For the case of Streaming, streaming would have to write the processed record count to the stderr as a framework counter, to take advantage of this feature.
          Show
          Sharad Agarwal added a comment - - edited Had an offline discussion with Eric and Devaraj, and we came up with following: Let this issue handle the case of crashes and hangups. For the case of catching the exception for Java tasks, filed another Jira -> HADOOP-3700 Design gets impacted based on the assumption of how frequent the failures would be. At this point of time, design for INFREQUENT failures. This would simplify the design. Also, bad records can be maintained by the Jobtracker (as pointed out by Enis), as no of bad records are expected to be quite low. Failing Fast the bad jobs is very crucial to avoid wasting Grid resources. Thresholds should be define in such a way that we identify bad jobs early enough, say maximum of 10% of the maps can fail. Also, we need to make sure that we execute failed task VERY FAST. Apart from bad data, Task crashes could be due to bad user code (like Out of memory) or bad nodes. To isolate these cases, on failure, reexecute on another node as now. If it fails AGAIN, then reexecute a third time, this time in the special mode where we report every record completion to the Task tracker. For the case of Streaming, streaming would have to write the processed record count to the stderr as a framework counter, to take advantage of this feature.
          Hide
          Yiping Han added a comment -

          Sharad,

          I would suggest bad records to be written onto DFS. The reason is not for expecting high percentage of bad records, but for debugging/analysis purpose. As user would expect to collect those bad boys and analyze them.

          Show
          Yiping Han added a comment - Sharad, I would suggest bad records to be written onto DFS. The reason is not for expecting high percentage of bad records, but for debugging/analysis purpose. As user would expect to collect those bad boys and analyze them.
          Hide
          Sharad Agarwal added a comment -

          Here is the initial patch, intended for early feedback. It does the core work of maintaining the failed ranges and skipping those on further executions. Few test cases have also been included.
          Although the basic functionality is working, the patch is still incomplete. Things remaining are:

          • Capture offsets along with record sequence no. Once we have all the offsets for the skipped records, actual records data can be written anytime later on.
          • Write the skipped records information (only sequence nos and offsets) to job history.
          • Support for Pipes and Streaming.
          • Introduce thresholds to limit the no of failing tasks execution (failing fast for really bad user code)
          • Execute the skipped ranges on best effort basis. Perhaps this can be done as a separate Jira later on, if we find value in it after seeing real use cases.
          Show
          Sharad Agarwal added a comment - Here is the initial patch, intended for early feedback. It does the core work of maintaining the failed ranges and skipping those on further executions. Few test cases have also been included. Although the basic functionality is working, the patch is still incomplete . Things remaining are: Capture offsets along with record sequence no. Once we have all the offsets for the skipped records, actual records data can be written anytime later on. Write the skipped records information (only sequence nos and offsets) to job history. Support for Pipes and Streaming. Introduce thresholds to limit the no of failing tasks execution (failing fast for really bad user code) Execute the skipped ranges on best effort basis. Perhaps this can be done as a separate Jira later on, if we find value in it after seeing real use cases.
          Hide
          Sharad Agarwal added a comment -

          Attaching the patch. It adds the support for streaming and pipes. Also it includes a test case for streaming.
          The work is in progress. Other items as mentioned in above comment are remaining.

          Show
          Sharad Agarwal added a comment - Attaching the patch. It adds the support for streaming and pipes. Also it includes a test case for streaming. The work is in progress. Other items as mentioned in above comment are remaining.
          Hide
          Sharad Agarwal added a comment -

          The basic skip functionality including streaming/pipes is working in the last submitted patch.
          For the remaining items, had an offline discussion with Devaraj:
          1. Writing the skipped records: Saving the offsets seems to be non-trivial and it will make sense only for FileSplits and map tasks. So we are thinking of the strategy to write the skipped records during Task execution itself.
          Task would write the skipped records locally to the disk while it executes. If the task attempt is last, it will flush the skipped records to the DFS, after it pases thru all the seen bad ranges. One drawback is that if there is a new bad range in the last attempt and task fails, the records for the range in which last attempt has failed, are not written. However, user can increase the no of attempts and can get to this bad range.
          2. When this special (skipping) mode kicks off (after the 2nd attempt by default), disable the task speculation, as in this special mode it is already expected to run slower than normal.
          3. Define a user configurable no MAX_SKIPPED_RECORDS-> acceptable skipped records in the neighborhood of a bad record. By default this no would be Long.MAX_VALUE. If skipped range is greater than MAX_SKIPPED_RECORDS, the task will try to narrow down the skipped range by doing a binary search during task re-executions till MAX_SKIPPED_RECORDS threshold is met or all task attempts are exhausted.

          thoughts/suggestions ?

          Show
          Sharad Agarwal added a comment - The basic skip functionality including streaming/pipes is working in the last submitted patch. For the remaining items, had an offline discussion with Devaraj: 1. Writing the skipped records: Saving the offsets seems to be non-trivial and it will make sense only for FileSplits and map tasks. So we are thinking of the strategy to write the skipped records during Task execution itself. Task would write the skipped records locally to the disk while it executes. If the task attempt is last, it will flush the skipped records to the DFS, after it pases thru all the seen bad ranges. One drawback is that if there is a new bad range in the last attempt and task fails, the records for the range in which last attempt has failed, are not written. However, user can increase the no of attempts and can get to this bad range. 2. When this special (skipping) mode kicks off (after the 2nd attempt by default), disable the task speculation, as in this special mode it is already expected to run slower than normal. 3. Define a user configurable no MAX_SKIPPED_RECORDS-> acceptable skipped records in the neighborhood of a bad record. By default this no would be Long.MAX_VALUE. If skipped range is greater than MAX_SKIPPED_RECORDS, the task will try to narrow down the skipped range by doing a binary search during task re-executions till MAX_SKIPPED_RECORDS threshold is met or all task attempts are exhausted. thoughts/suggestions ?
          Hide
          Doug Cutting added a comment -

          I am concerned that this further complicates the already complicated JobTracker and TaskTracker. Ideally we could layer this as a user library. Even if that's not possible today, we should structure it so that we might transition it to such an implementation. So, rather than adding more JobConf methods, perhaps its configuration should be through static accessor methods on a SkipBadRecords class?

          Might it be possible to implement this through the filesystem? A MapRunnable implementation could, when it catches exceptions, record the bad record location to a task/attempt directory. Then, on startup, the MapRunnable could list task/* files, to find all bad record locations generated by prior invocations. Could that work? It shouldn't actually generate that many files, since most tasks should not have errors. It would generate one extra listFiles call to the namenode per task attempt, which doesn't seem too bad.

          Show
          Doug Cutting added a comment - I am concerned that this further complicates the already complicated JobTracker and TaskTracker. Ideally we could layer this as a user library. Even if that's not possible today, we should structure it so that we might transition it to such an implementation. So, rather than adding more JobConf methods, perhaps its configuration should be through static accessor methods on a SkipBadRecords class? Might it be possible to implement this through the filesystem? A MapRunnable implementation could, when it catches exceptions, record the bad record location to a task/attempt directory. Then, on startup, the MapRunnable could list task/* files, to find all bad record locations generated by prior invocations. Could that work? It shouldn't actually generate that many files, since most tasks should not have errors. It would generate one extra listFiles call to the namenode per task attempt, which doesn't seem too bad.
          Hide
          Sharad Agarwal added a comment -

          Seems I haven't clearly mentioned, this Jira we scoped for handling Task crashes/hangs as that seems to be more pressing requirement from users. For handling java exceptions, have another JIRA - HADOOP-3700. I should rename the subject of this Jira appropriately.

          >I am concerned that this further complicates the already complicated JobTracker and TaskTracker.
          Since we are handling task crashes, the tasktracker and jobtracker have to get involved at some level, no? btw, In the current patch, there are minimal changes to the jobtracker/tasktracker. Most of the logic is in RecordReader wrapper and SortedRanges ( a new class). We get a good working functionality (incorporates test cases) for skipping the bad ranges.

          >rather than adding more JobConf methods, perhaps its configuration should be through static accessor methods on a SkipBadRecords class?
          +1. It would avoid polluting the JobConf.

          > Might it be possible to implement this through the filesystem?
          Yes we can use filesystem to write the bad record sequence nos. But since those are assumed to be very small, we agreed to store in Jobtracker's TIP. When TIP instantiates MapTask, it passes that to it.
          As Yiping has suggested in one of his comment to write the actual bytes (key,values) of the skipped records, we are trying to address that. My earlier comment #1.

          Show
          Sharad Agarwal added a comment - Seems I haven't clearly mentioned, this Jira we scoped for handling Task crashes/hangs as that seems to be more pressing requirement from users. For handling java exceptions, have another JIRA - HADOOP-3700 . I should rename the subject of this Jira appropriately. >I am concerned that this further complicates the already complicated JobTracker and TaskTracker. Since we are handling task crashes, the tasktracker and jobtracker have to get involved at some level, no? btw, In the current patch, there are minimal changes to the jobtracker/tasktracker. Most of the logic is in RecordReader wrapper and SortedRanges ( a new class). We get a good working functionality (incorporates test cases) for skipping the bad ranges. >rather than adding more JobConf methods, perhaps its configuration should be through static accessor methods on a SkipBadRecords class? +1. It would avoid polluting the JobConf. > Might it be possible to implement this through the filesystem? Yes we can use filesystem to write the bad record sequence nos. But since those are assumed to be very small, we agreed to store in Jobtracker's TIP. When TIP instantiates MapTask, it passes that to it. As Yiping has suggested in one of his comment to write the actual bytes (key,values) of the skipped records, we are trying to address that. My earlier comment #1.
          Hide
          Doug Cutting added a comment -

          > Since we are handling task crashes, the tasktracker and jobtracker have to get involved at some level, no?

          You're right, at least the TaskTracker needs to be involved, if it's to capture crashed processes, and the JobTracker if it's to handle machine crashes. Sigh.

          A few issues with the patch:

          • InterTrackerProtocol's version needs to change since the format of a Task and TaskStatus have changed.
          • UmbilicalProtocol#reportNextRecordRange() needs javadoc. This javadoc is not published, but our protocols should be well-documented.
          • You add a method to the Reporter interface: are we sure no users implement this? Also, this is the wrong place for this method. It has nothing to do with reporting status. Unfortunately, getInputSplit() was added to Reporter at some point in the past, setting a bad example. These should be added to the context (once we have contexts).
          • in MapRunner, autoIncrPrcossedRecord is a typo.
          Show
          Doug Cutting added a comment - > Since we are handling task crashes, the tasktracker and jobtracker have to get involved at some level, no? You're right, at least the TaskTracker needs to be involved, if it's to capture crashed processes, and the JobTracker if it's to handle machine crashes. Sigh. A few issues with the patch: InterTrackerProtocol's version needs to change since the format of a Task and TaskStatus have changed. UmbilicalProtocol#reportNextRecordRange() needs javadoc. This javadoc is not published, but our protocols should be well-documented. You add a method to the Reporter interface: are we sure no users implement this? Also, this is the wrong place for this method. It has nothing to do with reporting status. Unfortunately, getInputSplit() was added to Reporter at some point in the past, setting a bad example. These should be added to the context (once we have contexts). in MapRunner, autoIncrPrcossedRecord is a typo.
          Hide
          eric baldeschwieler added a comment -

          I'm nervous about adding extra NN operations per task. That is going to make scaling to large clusters more expensive. I'd really like to find another solution if possible. We just took out a bunch of NN work per task due to performance problems! (task promotion stuff)

          Show
          eric baldeschwieler added a comment - I'm nervous about adding extra NN operations per task. That is going to make scaling to large clusters more expensive. I'd really like to find another solution if possible. We just took out a bunch of NN work per task due to performance problems! (task promotion stuff)
          Hide
          Sharad Agarwal added a comment -

          > I'm nervous about adding extra NN operations per task.
          current solution doesn't add any NN operations. It works based on storing failed ranges in TIP.

          Show
          Sharad Agarwal added a comment - > I'm nervous about adding extra NN operations per task. current solution doesn't add any NN operations. It works based on storing failed ranges in TIP.
          Hide
          Sharad Agarwal added a comment -

          Incorporated Doug's review comments.

          • moved configuration methods out of JobConf
          • changed InterTrackerProtocol's version
          • added javadoc to UmbilicalProtocol#reportNextRecordRange()
          • keeping the Reporter interface unchanged
          • typo in MapRunner fixed
          Show
          Sharad Agarwal added a comment - Incorporated Doug's review comments. moved configuration methods out of JobConf changed InterTrackerProtocol's version added javadoc to UmbilicalProtocol#reportNextRecordRange() keeping the Reporter interface unchanged typo in MapRunner fixed
          Hide
          Sharad Agarwal added a comment - - edited

          We can have this Jira to address the base skipping functionality. I have created two dependent Jiras HADOOP-3828 and HADOOP-3829 which are the incremental features over this one.

          Show
          Sharad Agarwal added a comment - - edited We can have this Jira to address the base skipping functionality. I have created two dependent Jiras HADOOP-3828 and HADOOP-3829 which are the incremental features over this one.
          Hide
          Sharad Agarwal added a comment -

          disabling the speculation if skip mode is active

          Show
          Sharad Agarwal added a comment - disabling the speculation if skip mode is active
          Hide
          Hadoop QA added a comment -

          -1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12386874/153_4.patch
          against trunk revision 679845.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 9 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          -1 findbugs. The patch appears to introduce 3 new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          +1 core tests. The patch passed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2953/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2953/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2953/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2953/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12386874/153_4.patch against trunk revision 679845. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 9 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. -1 findbugs. The patch appears to introduce 3 new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2953/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2953/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2953/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2953/console This message is automatically generated.
          Hide
          Sharad Agarwal added a comment -

          Fixed findbugs warnings.

          Show
          Sharad Agarwal added a comment - Fixed findbugs warnings.
          Hide
          Hadoop QA added a comment -

          +1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12387079/153_5.patch
          against trunk revision 680577.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 9 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          +1 core tests. The patch passed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2968/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2968/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2968/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2968/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - +1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12387079/153_5.patch against trunk revision 680577. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 9 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2968/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2968/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2968/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/2968/console This message is automatically generated.
          Hide
          Doug Cutting added a comment -

          Sigh. Sorry it's taken so long for me to get back to this. The patch has now gone stale! My bad.

          It looks much better.

          The configuration parameters controlling skipping should all start with "mapred.skip". Private constants should probably be defined for these, since they're referred to more than once. That way the compiler will check that they're all spelled correctly. The term "isSkipModeKickedOff" might instead be just "inSkipMode" or simply "isSkipping", and "skip.mode.kicked.off" might be "mapred.skip.on" or somesuch.

          It would also be good to add javadoc to the new public Task methods and to SortedRange's public methods. We don't (yet) publish the javadoc for Task, but it is a pretty central class and deserves to be well documented. In general, public methods in public classes should have javadoc. Sometimes classes (like Task) which didn't used to be public are made public, and lots of their methods are then missing javadoc, but that's not an excuse to continue the practice. And it never hurts to have javadoc, even for non-public methods in non-public classes...

          Show
          Doug Cutting added a comment - Sigh. Sorry it's taken so long for me to get back to this. The patch has now gone stale! My bad. It looks much better. The configuration parameters controlling skipping should all start with "mapred.skip". Private constants should probably be defined for these, since they're referred to more than once. That way the compiler will check that they're all spelled correctly. The term "isSkipModeKickedOff" might instead be just "inSkipMode" or simply "isSkipping", and "skip.mode.kicked.off" might be "mapred.skip.on" or somesuch. It would also be good to add javadoc to the new public Task methods and to SortedRange's public methods. We don't (yet) publish the javadoc for Task, but it is a pretty central class and deserves to be well documented. In general, public methods in public classes should have javadoc. Sometimes classes (like Task) which didn't used to be public are made public, and lots of their methods are then missing javadoc, but that's not an excuse to continue the practice. And it never hurts to have javadoc, even for non-public methods in non-public classes...
          Hide
          Sharad Agarwal added a comment -

          updated to trunk
          incorporated Doug's comments.
          changed SortedRanges.Range to store (startIndex,length) instead of (startIndex,endIndex)

          Show
          Sharad Agarwal added a comment - updated to trunk incorporated Doug's comments. changed SortedRanges.Range to store (startIndex,length) instead of (startIndex,endIndex)
          Hide
          Sharad Agarwal added a comment -

          cleaned up Test classes. Removed redundant test cases.
          other minor clean up/better variable names.

          Show
          Sharad Agarwal added a comment - cleaned up Test classes. Removed redundant test cases. other minor clean up/better variable names.
          Hide
          Hadoop QA added a comment -

          +1 overall. Here are the results of testing the latest attachment
          http://issues.apache.org/jira/secure/attachment/12387626/153_7.patch
          against trunk revision 682978.

          +1 @author. The patch does not contain any @author tags.

          +1 tests included. The patch appears to include 9 new or modified tests.

          +1 javadoc. The javadoc tool did not generate any warning messages.

          +1 javac. The applied patch does not increase the total number of javac compiler warnings.

          +1 findbugs. The patch does not introduce any new Findbugs warnings.

          +1 release audit. The applied patch does not increase the total number of release audit warnings.

          +1 core tests. The patch passed core unit tests.

          +1 contrib tests. The patch passed contrib unit tests.

          Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3016/testReport/
          Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3016/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
          Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3016/artifact/trunk/build/test/checkstyle-errors.html
          Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3016/console

          This message is automatically generated.

          Show
          Hadoop QA added a comment - +1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12387626/153_7.patch against trunk revision 682978. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 9 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. +1 release audit. The applied patch does not increase the total number of release audit warnings. +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3016/testReport/ Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3016/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Checkstyle results: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3016/artifact/trunk/build/test/checkstyle-errors.html Console output: http://hudson.zones.apache.org/hudson/job/Hadoop-Patch/3016/console This message is automatically generated.
          Hide
          Devaraj Das added a comment -

          Doug, could you please review this one last time? Thanks!

          Show
          Devaraj Das added a comment - Doug, could you please review this one last time? Thanks!
          Hide
          Doug Cutting added a comment -

          +1 This looks good to me.

          Show
          Doug Cutting added a comment - +1 This looks good to me.
          Hide
          Sharad Agarwal added a comment -

          no changes, other than wrapping the lines exceeding 80 chars.

          Show
          Sharad Agarwal added a comment - no changes, other than wrapping the lines exceeding 80 chars.
          Hide
          Devaraj Das added a comment -

          Sharad, sorry for the late comment - one small change I would like to see - the updates to the counters MAP_PROCESSED_RECORDS and REDUCE_PROCESSED_RECORDS should happen in the test/skip mode only.

          Show
          Devaraj Das added a comment - Sharad, sorry for the late comment - one small change I would like to see - the updates to the counters MAP_PROCESSED_RECORDS and REDUCE_PROCESSED_RECORDS should happen in the test/skip mode only.
          Hide
          Sharad Agarwal added a comment -

          Incorporated Devaraj's comment.

          Show
          Sharad Agarwal added a comment - Incorporated Devaraj's comment.
          Hide
          Devaraj Das added a comment -

          I just committed this. Thanks, Sharad!

          Show
          Devaraj Das added a comment - I just committed this. Thanks, Sharad!
          Hide
          Hudson added a comment -
          Show
          Hudson added a comment - Integrated in Hadoop-trunk #581 (See http://hudson.zones.apache.org/hudson/job/Hadoop-trunk/581/ )

            People

            • Assignee:
              Sharad Agarwal
              Reporter:
              Doug Cutting
            • Votes:
              1 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development