Pig
  1. Pig
  2. PIG-3059

Global configurable minimum 'bad record' thresholds

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.11
    • Fix Version/s: None
    • Component/s: impl
    • Labels:
      None

      Description

      See PIG-2614.

      Pig dies when one record in a LOAD of a billion records fails to parse. This is almost certainly not the desired behavior. elephant-bird and some other storage UDFs have minimum thresholds in terms of percent and count that must be exceeded before a job will fail outright.

      We need these limits to be configurable for Pig, globally. I've come to realize what a major problem Pig's crashing on bad records is for new Pig users. I believe this feature can greatly improve Pig.

      An example of a config would look like:

      pig.storage.bad.record.threshold=0.01
      pig.storage.bad.record.min=100

      A thorough discussion of this issue is available here: http://www.quora.com/Big-Data/In-Big-Data-ETL-how-many-records-are-an-acceptable-loss

      1. avro_test_files-2.tar.gz
        20 kB
        Cheolsoo Park
      2. PIG-3059.patch
        24 kB
        Cheolsoo Park
      3. PIG-3059-2.patch
        33 kB
        Cheolsoo Park

        Issue Links

          Activity

          Hide
          Russell Jurney added a comment -

          High five!

          Show
          Russell Jurney added a comment - High five!
          Hide
          Cheolsoo Park added a comment -

          Hi Joe,

          I am sorry if you're working on this jira. But I was working for a customer who has issues with bad input files, so I thought that I might just solve it once for all.

          As per your suggestion, I moved the error handling code to PigRecordReader, so now threshold should work for any record reader including PigAvroRecordReader. I uploaded the patch to the RB for your review:
          https://reviews.apache.org/r/8765/

          Please let me know what you think.

          Thanks!

          Show
          Cheolsoo Park added a comment - Hi Joe, I am sorry if you're working on this jira. But I was working for a customer who has issues with bad input files, so I thought that I might just solve it once for all. As per your suggestion, I moved the error handling code to PigRecordReader , so now threshold should work for any record reader including PigAvroRecordReader . I uploaded the patch to the RB for your review: https://reviews.apache.org/r/8765/ Please let me know what you think. Thanks!
          Hide
          Cheolsoo Park added a comment -

          In addition to applying the patch, the following commands should be run to commit it:
          wget https://issues.apache.org/jira/secure/attachment/12555546/test_avro_files.tar.gz
          tar -xf test_avro_files.tar.gz
          svn rm contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro
          svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file
          svn rm contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro
          svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile2.avro
          svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile3.avro

          Show
          Cheolsoo Park added a comment - In addition to applying the patch, the following commands should be run to commit it: wget https://issues.apache.org/jira/secure/attachment/12555546/test_avro_files.tar.gz tar -xf test_avro_files.tar.gz svn rm contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file svn rm contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile2.avro svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile3.avro
          Hide
          Russell Jurney added a comment -

          Awesome!

          Show
          Russell Jurney added a comment - Awesome!
          Hide
          Russell Jurney added a comment -

          I have a couple comments here:

          1) We need good reporting on how many records are bad, in terms of %, count and which split/file they came from.
          2) We need to log/store the bad records some place, so we can view/debug them and make a more robust loadfunc, etl process, etc.

          Show
          Russell Jurney added a comment - I have a couple comments here: 1) We need good reporting on how many records are bad, in terms of %, count and which split/file they came from. 2) We need to log/store the bad records some place, so we can view/debug them and make a more robust loadfunc, etl process, etc.
          Hide
          Russell Jurney added a comment -

          I actually just reviewed this, think that's my first time doing code review

          1) It would be cool if InputErrorTracker could print the error rate at the end of the job. Not sure I saw this? This would be my only suggestion that couldn't be pushed to another JIRA.

          2) Adding the file the error came from would be helpful. I imagine knowing error distribution would be really helpful.

          3) If there is any way for InputErrorTracker to write the exact input record out to an error file, that would be awesome.

          Show
          Russell Jurney added a comment - I actually just reviewed this, think that's my first time doing code review 1) It would be cool if InputErrorTracker could print the error rate at the end of the job. Not sure I saw this? This would be my only suggestion that couldn't be pushed to another JIRA. 2) Adding the file the error came from would be helpful. I imagine knowing error distribution would be really helpful. 3) If there is any way for InputErrorTracker to write the exact input record out to an error file, that would be awesome.
          Hide
          Cheolsoo Park added a comment -

          Hi Russell, thank you very much for the review.

          I think that your suggestions are all good, and I will try to incorporate them in a new patch.

          Regarding the error rate, I actually found a problem in the current implementation. Currently, when we encounter a bad input split, I skip all the remaining records in that split. This means, we don't really count the total number of records. So if we calculate the error rate by (errors thrown) / (records processed) * 100, the result won't be accurate (unless each split includes only one record).

          So I am thinking that I count the number of splits instead of records, which can be counted correctly. Please let me know know if you have a better suggestion.

          Thanks!

          Show
          Cheolsoo Park added a comment - Hi Russell, thank you very much for the review. I think that your suggestions are all good, and I will try to incorporate them in a new patch. Regarding the error rate, I actually found a problem in the current implementation. Currently, when we encounter a bad input split, I skip all the remaining records in that split. This means, we don't really count the total number of records. So if we calculate the error rate by (errors thrown) / (records processed) * 100 , the result won't be accurate (unless each split includes only one record). So I am thinking that I count the number of splits instead of records, which can be counted correctly. Please let me know know if you have a better suggestion. Thanks!
          Hide
          Russell Jurney added a comment -

          Is it possible to change the strategy we use when we encounter a bad record? Does the one record mess up the rest of the read, or can we reasonably resume?

          Show
          Russell Jurney added a comment - Is it possible to change the strategy we use when we encounter a bad record? Does the one record mess up the rest of the read, or can we reasonably resume?
          Hide
          Cheolsoo Park added a comment -

          I think it depends on file format. But for Avro, one case that we should handle is when a sync() call throws an exception. In this case, we can't really find the next position where we can resume the read. Given that we're implementing this logic in PigRecordReader (a wrapper class for underlying record readers), I don't think that skipping records not splits is always possible. Please correct me if I am wrong.

          Thanks!

          Show
          Cheolsoo Park added a comment - I think it depends on file format. But for Avro, one case that we should handle is when a sync() call throws an exception. In this case, we can't really find the next position where we can resume the read. Given that we're implementing this logic in PigRecordReader (a wrapper class for underlying record readers), I don't think that skipping records not splits is always possible. Please correct me if I am wrong. Thanks!
          Hide
          Cheolsoo Park added a comment -

          I am uploading a new patch that includes the following changes:

          • The error rate is printed as part of job stats. (See the last two lines.)
            Counters:
            Total records written : 1
            Total bytes written : 10
            Spillable Memory Manager spill count : 0
            Total bags proactively spilled: 0
            Total records proactively spilled: 0
            Total input splits processed : 2
            Total bad input splits : 1 ( 50.0% )
            

            To do this, I made changes to the PigStats interface. Please let me know if this is not recommended.

          • The error message is improved. Now the location of the bad split that causes the run-time exception is printed.
            Backend error : error while reading input split: hdfs://cheolsoo-mr1-0.ent.cloudera.com:8020/user/cheolsoo/bad.avro:0+81
            
          • As discussed, InputErrorTracker counts the number of splits instead of records.
          • Since the ignore_bad_files option in AvroStorage is already in branch-0.11, I decided to not delete it for backward compatibility. When the ignore_bad_files option is enabled, it is equivalent to setting pig.load.bad.split.threshold to 1.0.
          • Lastly, I am uploading a new tarball for new test cases. To run them, the following commands should be executed:
            tar -xf test_avro_files-2.tar.gz
            svn rm contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro
            svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file
            svn rm contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro
            svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile2.avro
            svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile3.avro
            svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile4.avro
            

          Thanks!

          Show
          Cheolsoo Park added a comment - I am uploading a new patch that includes the following changes: The error rate is printed as part of job stats. (See the last two lines.) Counters: Total records written : 1 Total bytes written : 10 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Total input splits processed : 2 Total bad input splits : 1 ( 50.0% ) To do this, I made changes to the PigStats interface. Please let me know if this is not recommended. The error message is improved. Now the location of the bad split that causes the run-time exception is printed. Backend error : error while reading input split: hdfs: //cheolsoo-mr1-0.ent.cloudera.com:8020/user/cheolsoo/bad.avro:0+81 As discussed, InputErrorTracker counts the number of splits instead of records. Since the ignore_bad_files option in AvroStorage is already in branch-0.11, I decided to not delete it for backward compatibility. When the ignore_bad_files option is enabled, it is equivalent to setting pig.load.bad.split.threshold to 1.0. Lastly, I am uploading a new tarball for new test cases. To run them, the following commands should be executed: tar -xf test_avro_files-2.tar.gz svn rm contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file svn rm contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile2.avro svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile3.avro svn add contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile4.avro Thanks!
          Hide
          Russell Jurney added a comment -

          Is it possible at all to provide an optional interface that counts bad records, as well?

          I don't understand at the moment the issues around resuming after bad records. I believe some readers can handle one bad record and continue. I think elephant-bird works this way with some formats? That being the case, if possible, we should accommodate bad record counts, as well as bad inputsplits, in the reporting. Not sure if that makes sense, but check this out: https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java

              
              void incErrors(Throwable cause) {
                numErrors++;
                if (numErrors > numRecords) {
                  // incorrect use of this class
                  throw new RuntimeException("Forgot to invoke incRecords()?");
                }
          
                if (cause == null) {
                  cause = new Exception("Unknown error");
                }
          
                if (errorThreshold <= 0) { // no errors are tolerated
                  throw new RuntimeException("error while reading input records", cause);
                }
          
                LOG.warn("Error while reading an input record ("
                    + numErrors + " out of " + numRecords + " so far ): ", cause);
          
                double errRate = numErrors/(double)numRecords;
          
                // will always excuse the first error. We can decide if single
                // error crosses threshold inside close() if we want to.
                if (numErrors >= minErrors  && errRate > errorThreshold) {
                  LOG.error(numErrors + " out of " + numRecords
                      + " crosses configured threshold (" + errorThreshold + ")");
                  throw new RuntimeException("error rate while reading input records crossed threshold", cause);
                }
              }
          

          Also, when we report a bad split, is it possible to say how large that split is? If we could report the total size and the size of the inputsplits lost, that gives a user better context as to the overall %. If it is hard to implement the total size of the job, a user might compare the size of the lost inputsplits with 'hadoop fs -ls /my/input' and see the size difference?

          Show
          Russell Jurney added a comment - Is it possible at all to provide an optional interface that counts bad records, as well? I don't understand at the moment the issues around resuming after bad records. I believe some readers can handle one bad record and continue. I think elephant-bird works this way with some formats? That being the case, if possible, we should accommodate bad record counts, as well as bad inputsplits, in the reporting. Not sure if that makes sense, but check this out: https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java void incErrors(Throwable cause) { numErrors++; if (numErrors > numRecords) { // incorrect use of this class throw new RuntimeException("Forgot to invoke incRecords()?"); } if (cause == null) { cause = new Exception("Unknown error"); } if (errorThreshold <= 0) { // no errors are tolerated throw new RuntimeException("error while reading input records", cause); } LOG.warn("Error while reading an input record (" + numErrors + " out of " + numRecords + " so far ): ", cause); double errRate = numErrors/(double)numRecords; // will always excuse the first error. We can decide if single // error crosses threshold inside close() if we want to. if (numErrors >= minErrors && errRate > errorThreshold) { LOG.error(numErrors + " out of " + numRecords + " crosses configured threshold (" + errorThreshold + ")"); throw new RuntimeException("error rate while reading input records crossed threshold", cause); } } Also, when we report a bad split, is it possible to say how large that split is? If we could report the total size and the size of the inputsplits lost, that gives a user better context as to the overall %. If it is hard to implement the total size of the job, a user might compare the size of the lost inputsplits with 'hadoop fs -ls /my/input' and see the size difference?
          Hide
          Dmitriy V. Ryaboy added a comment -

          I agree with the principle that inspired this patch, but the solution seems to fall short of ideal.

          Dealing in splits is misleading and hard to reason about:

          • Good records read from a split that contains a bad record still get processed, so it's not the case that a "bad split" is ignored, and you are controlling how many bad splits to ignore.
          • a single bad record stops the whole rest of the split from being processed, whether your loader could recover or not. This is unnecessary data loss.
          • most users of pig have no idea (or should have no idea) what a split is
          • Pig combines splits – but this deals with pre-combination splits. Especially when combining small but unequal files, splits are very different from each other, and some may contain 100 records while others contain 100,000 records.

          This all means that no matter what the user sets these values to, they actually have no idea what error threshold they are telling Pig to ignore.

          I think the Elephant-Bird way of dealing with errors – minimal threshold of record errors + a percentage of total records read – is quite robust and easy to explain. If Avro can't recover from a bad record in a single split, it can do whatever is appropriate for avro – estimate how many records it's dropping and throw that many exceptions, or just pretend that this one error is all that was left in the split, or maybe fix the format so that it can recover properly (ok, that was a troll comment ).

          Show
          Dmitriy V. Ryaboy added a comment - I agree with the principle that inspired this patch, but the solution seems to fall short of ideal. Dealing in splits is misleading and hard to reason about: Good records read from a split that contains a bad record still get processed, so it's not the case that a "bad split" is ignored, and you are controlling how many bad splits to ignore. a single bad record stops the whole rest of the split from being processed, whether your loader could recover or not. This is unnecessary data loss. most users of pig have no idea (or should have no idea) what a split is Pig combines splits – but this deals with pre-combination splits. Especially when combining small but unequal files, splits are very different from each other, and some may contain 100 records while others contain 100,000 records. This all means that no matter what the user sets these values to, they actually have no idea what error threshold they are telling Pig to ignore. I think the Elephant-Bird way of dealing with errors – minimal threshold of record errors + a percentage of total records read – is quite robust and easy to explain. If Avro can't recover from a bad record in a single split, it can do whatever is appropriate for avro – estimate how many records it's dropping and throw that many exceptions, or just pretend that this one error is all that was left in the split, or maybe fix the format so that it can recover properly (ok, that was a troll comment ).
          Hide
          Cheolsoo Park added a comment -

          Hi Russull and Dmitriy, thank you for your comments/suggestions.

          Sure, I agreed. I will give more thoughts and try to upload a new patch.

          Show
          Cheolsoo Park added a comment - Hi Russull and Dmitriy, thank you for your comments/suggestions. Sure, I agreed. I will give more thoughts and try to upload a new patch.
          Hide
          Joseph Adler added a comment -

          Sorry to take so long to get back to this. It was a long break from work...

          Thanks so much for taking this over. I like the way you've implemented this.

          Show
          Joseph Adler added a comment - Sorry to take so long to get back to this. It was a long break from work... Thanks so much for taking this over. I like the way you've implemented this.
          Hide
          Russell Jurney added a comment -

          Regarding Avro, in reading https://github.com/apache/avro/blob/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java - it looks like you can still sync to the next record under most bad reads. We should do so.

          You're right about a bad sync halting things, but in the case of a bad sync - you might try advancing by some amount using seek() and then sync'ing again? I think this would work. I could be wrong, but in looking how seeks work - I think that would be ok. Kinda neat, maybe? Worst case, we would only throw out inputsplits on a bad sync(), not a bad read(). length() should help, as might pastSync(), skip() and available()

          I agree with Dmitriy's feedback, thanks for taking the time.

          Show
          Russell Jurney added a comment - Regarding Avro, in reading https://github.com/apache/avro/blob/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java - it looks like you can still sync to the next record under most bad reads. We should do so. You're right about a bad sync halting things, but in the case of a bad sync - you might try advancing by some amount using seek() and then sync'ing again? I think this would work. I could be wrong, but in looking how seeks work - I think that would be ok. Kinda neat, maybe? Worst case, we would only throw out inputsplits on a bad sync(), not a bad read(). length() should help, as might pastSync(), skip() and available() I agree with Dmitriy's feedback, thanks for taking the time.

            People

            • Assignee:
              Unassigned
              Reporter:
              Russell Jurney
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:

                Development