Uploaded image for project: 'Apache Avro'
  1. Apache Avro
  2. AVRO-2637

Avro Tool's Repair Tool Can Go Into an Infinite Loop

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.9.1
    • None
    • tools
    • None

    Description

      There are certain avro files corrupt in such a way that the repair tool emarks on an infinite loop. Specifically, corruption is found, but the position marker is not moved.
      Let's fix this to avoid the infinite loop.

      Evidence / Reproduction Effort:

      Two unit test are added:
       

        @Test
        public void testReportCorruptLoopBlock() throws Exception {
          String corruptLoopFile = "/Users/stephan/IdeaProjects/avro/lang/java/tools/src/test/resources/Report_looping.avro";
          String repairedLoopFile = "/Users/stephan/IdeaProjects/avro/lang/java/tools/src/test/resources/Report_looping-FIXED.avro";
          String output = run(new DataFileRepairTool(), "-o", "all", corruptLoopFile, repairedLoopFile);
          assertTrue(output, output.contains("Number of blocks: 5 Number of corrupt blocks: 4"));
        }
        @Test
        public void testRepairedReportCorruptLoopBlock() throws Exception {
          String repairedLoopFile = "/Users/stephan/IdeaProjects/avro/lang/java/tools/src/test/resources/Report_looping-FIXED.avro";
          String output = run(new DataFileRepairTool(), "-o", "report", repairedLoopFile, repairedLoopFile);
          assertTrue(output, output.contains("Number of blocks: 4 Number of corrupt blocks: 0"));
        }
      
      

       

      Without modification, the output should look similar to this:

       

      Failed to read block 0. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync!
      Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync!
      Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync!
      Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync!
      ...
      

       

       

      With the inner recover portion of the code modified for some debugging as such:

         private int innerRecover(DataFileReader<Object> fileReader, DataFileWriter<Object> fileWriter, PrintStream out,
            PrintStream err, boolean recoverPrior, boolean recoverAfter, Schema schema, File outfile) {
          int numBlocks = 0;
          int numCorruptBlocks = 0;
          int numRecords = 0;
          int lastNumRecords = -1;
          int numCorruptRecords = 0;
          int recordsWritten = 0;
          long lastPostion = -1;
          long position = fileReader.previousSync();
          long blockSize = 0;
          long blockCount = 0;
          boolean fileWritten = false;
          try {
            while (true) {
              try {
                if (!fileReader.hasNext()) {
                  out.println("File Summary: ");
                  out.println("  Number of blocks: " + numBlocks + " Number of corrupt blocks: " + numCorruptBlocks);
                  out.println("  Number of records: " + numRecords + " Number of corrupt records: " + numCorruptRecords);
                  if (recoverAfter || recoverPrior) {
                    out.println("  Number of records written " + recordsWritten);
                  }
                  out.println();
                  return 0;
                }
                position = fileReader.previousSync();
                blockCount = fileReader.getBlockCount();
                blockSize = fileReader.getBlockSize();
                numRecords += blockCount;
                long blockRemaining = blockCount;
                numBlocks++;
                boolean lastRecordWasBad = false;
                long badRecordsInBlock = 0;
                err.println("Details Prior: numblocks: "+numBlocks +
                  ", blockRemaining: "+ blockRemaining +
                  ", lastRecordWasBad: " + lastRecordWasBad +
                  ", numCorruptRecords: " + numCorruptRecords +
                  ", badRecordsInBloc: " + badRecordsInBlock);
                while (blockRemaining > 0) {
                  try {
                    Object datum = fileReader.next();
                    if ((recoverPrior && numCorruptBlocks == 0) || (recoverAfter && numCorruptBlocks > 0)) {
                      if (!fileWritten) {
                        try {
                          fileWriter.create(schema, outfile);
                          fileWritten = true;
                        } catch (Exception e) {
                          e.printStackTrace(err);
                          return 1;
                        }
                      }
                      try {
                        fileWriter.append(datum);
                        recordsWritten++;
                      } catch (Exception e) {
                        e.printStackTrace(err);
                        throw e;
                      }
                    }
                    blockRemaining--;
                    lastRecordWasBad = false;
      //              err.println("Details #1: blockRemaining: "+ blockRemaining +
      //                ", lastRecordWasBad: " + lastRecordWasBad +
      //                ", numCorruptRecords: " + numCorruptRecords +
      //                ", badRecordsInBloc: " + badRecordsInBlock);
                  } catch (Exception e) {
                    long pos = blockCount - blockRemaining;
                    if (badRecordsInBlock == 0) {
                      // first corrupt record
                      numCorruptBlocks++;
                      err.println("Corrupt block: " + numBlocks + " Records in block: " + blockCount
                          + " uncompressed block size: " + blockSize);
                      err.println("Corrupt record at position: " + (pos));
                    } else {
                      // second bad record in block, if consecutive skip block.
                      err.println("Corrupt record at position: " + (pos));
                      if (lastRecordWasBad) {
                        // consecutive bad record
                        err.println(
                            "Second consecutive bad record in block: " + numBlocks + ". Skipping remainder of block. ");
                        numCorruptRecords += blockRemaining;
                        badRecordsInBlock += blockRemaining;
                        try {
                          fileReader.sync(position);
                        } catch (Exception e2) {
                          err.println("failed to sync to sync marker, aborting");
                          e2.printStackTrace(err);
                          return 1;
                        }
                        break;
                      }
                    }
                    blockRemaining--;
                    lastRecordWasBad = true;
                    numCorruptRecords++;
                    badRecordsInBlock++;
                    err.println("Details #2: blockRemaining: "+ blockRemaining +
                      ", lastRecordWasBad: " + lastRecordWasBad +
                      ", numCorruptRecords: " + numCorruptRecords +
                      ", badRecordsInBloc: " + badRecordsInBlock);
                  }
                }
                err.println("Details After: blockRemaining: "+ blockRemaining +
                  ", lastRecordWasBad: " + lastRecordWasBad +
                  ", numCorruptRecords: " + numCorruptRecords +
                  ", badRecordsInBloc: " + badRecordsInBlock);
      
                if (badRecordsInBlock != 0) {
                  err.println("** Number of unrecoverable records in block: " + (badRecordsInBlock));
                }
                position = fileReader.previousSync();
              } catch (Exception e) {
      
      //          if(lastNumRecords == numRecords) {
      //          if(lastPostion == position) {
                if(false) {
                    position++;
                }
                else {
                  lastNumRecords = numRecords;
                  lastPostion = position;
                  err.println("Failed to read block " + numBlocks + ". Unknown record " + "count in block.  Skipping. Reason: "
                    + e.getMessage());
      
                  numCorruptBlocks++;
      
                  err.printf(
                    "    int numBlocks = %d;\n" +
                      "    int numCorruptBlocks = %d;\n" +
                      "    int numRecords = %d;\n" +
                      "    int numCorruptRecords = %d;\n" +
                      "    int recordsWritten = %d;\n" +
                      "    long position = %d / 0x%04x;\n" +
                      "    long blockSize = %d / 0x%04x;\n" +
                      "    long blockCount = %d / 0x%04x\n",
                    numBlocks,
                    numCorruptBlocks,
                    numRecords,
                    numCorruptRecords,
                    recordsWritten,
                    position, position,
                    blockSize, blockSize,
                    blockCount, blockCount);
                  try {
                    fileReader.sync(position);
                  } catch (Exception e2) {
                    err.println("failed to sync to sync marker, aborting");
                    e2.printStackTrace(err);
                    return 1;
                  }
                }
              }
            }
          } finally {
            if (fileWritten) {
              try {
                fileWriter.close();
              } catch (Exception e) {
                e.printStackTrace(err);
                return 1;
              }
            }
          }
        }
      

      With the above code, we can see a infinite loop pattern emerge (as seen with the extra printing):

      Failed to read block 0. Unknown record count in block.  Skipping. Reason: java.io.IOException: Invalid sync!
          int numBlocks = 0;
          int numCorruptBlocks = 1;
          int numRecords = 0;
          int numCorruptRecords = 0;
          int recordsWritten = 0;
          long position = 6504 / 0x1968;
          long blockSize = 0 / 0x0000;
          long blockCount = 0 / 0x0000
      Details Prior: numblocks: 1, blockRemaining: 464, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0
      Details After: blockRemaining: 0, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0
      Details Prior: numblocks: 2, blockRemaining: 464, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0
      Details After: blockRemaining: 0, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0
      Details Prior: numblocks: 3, blockRemaining: 464, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0
      Details After: blockRemaining: 0, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0
      Failed to read block 3. Unknown record count in block.  Skipping. Reason: java.io.IOException: Invalid sync!
          int numBlocks = 3;
          int numCorruptBlocks = 2;
          int numRecords = 1392;
          int numCorruptRecords = 0;
          int recordsWritten = 1392;
          long position = 248627 / 0x3cb33;
          long blockSize = 17611 / 0x44cb;
          long blockCount = 464 / 0x01d0
      Failed to read block 3. Unknown record count in block.  Skipping. Reason: java.io.IOException: Invalid sync!
          int numBlocks = 3;
          int numCorruptBlocks = 3;
          int numRecords = 1392;
          int numCorruptRecords = 0;
          int recordsWritten = 1392;
          long position = 248627 / 0x3cb33;
          long blockSize = 17611 / 0x44cb;
          long blockCount = 464 / 0x01d0
      Failed to read block 3. Unknown record count in block.  Skipping. Reason: java.io.IOException: Invalid sync!
          int numBlocks = 3;
          int numCorruptBlocks = 4;
          int numRecords = 1392;
          int numCorruptRecords = 0;
          int recordsWritten = 1392;
          long position = 248627 / 0x3cb33;
          long blockSize = 17611 / 0x44cb;
          long blockCount = 464 / 0x01d0
      Failed to read block 3. Unknown record count in block.  Skipping. Reason: java.io.IOException: Invalid sync!
          int numBlocks = 3;
          int numCorruptBlocks = 5;
          int numRecords = 1392;
          int numCorruptRecords = 0;
          int recordsWritten = 1392;
          long position = 248627 / 0x3cb33;
          long blockSize = 17611 / 0x44cb;
          long blockCount = 464 / 0x01d0
      
      

      While it's it may not be the probable correct & acceptable fix, the following logic flip seems to make the repair complete in the above provided code (with some minor logic):

                if(lastPostion == position) {
      //          if(false) {
      

      All the unit tests associated the TestDataFileRepairTool class seem to pass including the hacked two above.

      I cannot provide the proprietary file, but I'd be happy to address questions.

      Attachments

        Activity

          People

            Unassigned Unassigned
            therealcodeaperature Stephan Warren
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: