Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.9.1
-
None
-
None
Description
There are certain avro files corrupt in such a way that the repair tool emarks on an infinite loop. Specifically, corruption is found, but the position marker is not moved.
Let's fix this to avoid the infinite loop.
Evidence / Reproduction Effort:
Two unit test are added:
@Test public void testReportCorruptLoopBlock() throws Exception { String corruptLoopFile = "/Users/stephan/IdeaProjects/avro/lang/java/tools/src/test/resources/Report_looping.avro"; String repairedLoopFile = "/Users/stephan/IdeaProjects/avro/lang/java/tools/src/test/resources/Report_looping-FIXED.avro"; String output = run(new DataFileRepairTool(), "-o", "all", corruptLoopFile, repairedLoopFile); assertTrue(output, output.contains("Number of blocks: 5 Number of corrupt blocks: 4")); } @Test public void testRepairedReportCorruptLoopBlock() throws Exception { String repairedLoopFile = "/Users/stephan/IdeaProjects/avro/lang/java/tools/src/test/resources/Report_looping-FIXED.avro"; String output = run(new DataFileRepairTool(), "-o", "report", repairedLoopFile, repairedLoopFile); assertTrue(output, output.contains("Number of blocks: 4 Number of corrupt blocks: 0")); }
Without modification, the output should look similar to this:
Failed to read block 0. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! ...
With the inner recover portion of the code modified for some debugging as such:
private int innerRecover(DataFileReader<Object> fileReader, DataFileWriter<Object> fileWriter, PrintStream out, PrintStream err, boolean recoverPrior, boolean recoverAfter, Schema schema, File outfile) { int numBlocks = 0; int numCorruptBlocks = 0; int numRecords = 0; int lastNumRecords = -1; int numCorruptRecords = 0; int recordsWritten = 0; long lastPostion = -1; long position = fileReader.previousSync(); long blockSize = 0; long blockCount = 0; boolean fileWritten = false; try { while (true) { try { if (!fileReader.hasNext()) { out.println("File Summary: "); out.println(" Number of blocks: " + numBlocks + " Number of corrupt blocks: " + numCorruptBlocks); out.println(" Number of records: " + numRecords + " Number of corrupt records: " + numCorruptRecords); if (recoverAfter || recoverPrior) { out.println(" Number of records written " + recordsWritten); } out.println(); return 0; } position = fileReader.previousSync(); blockCount = fileReader.getBlockCount(); blockSize = fileReader.getBlockSize(); numRecords += blockCount; long blockRemaining = blockCount; numBlocks++; boolean lastRecordWasBad = false; long badRecordsInBlock = 0; err.println("Details Prior: numblocks: "+numBlocks + ", blockRemaining: "+ blockRemaining + ", lastRecordWasBad: " + lastRecordWasBad + ", numCorruptRecords: " + numCorruptRecords + ", badRecordsInBloc: " + badRecordsInBlock); while (blockRemaining > 0) { try { Object datum = fileReader.next(); if ((recoverPrior && numCorruptBlocks == 0) || (recoverAfter && numCorruptBlocks > 0)) { if (!fileWritten) { try { fileWriter.create(schema, outfile); fileWritten = true; } catch (Exception e) { e.printStackTrace(err); return 1; } } try { fileWriter.append(datum); recordsWritten++; } catch (Exception e) { e.printStackTrace(err); throw e; } } blockRemaining--; lastRecordWasBad = false; // err.println("Details #1: blockRemaining: "+ blockRemaining + // ", lastRecordWasBad: " + lastRecordWasBad + // ", numCorruptRecords: " + numCorruptRecords + // ", badRecordsInBloc: " + badRecordsInBlock); } catch (Exception e) { long pos = blockCount - blockRemaining; if (badRecordsInBlock == 0) { // first corrupt record numCorruptBlocks++; err.println("Corrupt block: " + numBlocks + " Records in block: " + blockCount + " uncompressed block size: " + blockSize); err.println("Corrupt record at position: " + (pos)); } else { // second bad record in block, if consecutive skip block. err.println("Corrupt record at position: " + (pos)); if (lastRecordWasBad) { // consecutive bad record err.println( "Second consecutive bad record in block: " + numBlocks + ". Skipping remainder of block. "); numCorruptRecords += blockRemaining; badRecordsInBlock += blockRemaining; try { fileReader.sync(position); } catch (Exception e2) { err.println("failed to sync to sync marker, aborting"); e2.printStackTrace(err); return 1; } break; } } blockRemaining--; lastRecordWasBad = true; numCorruptRecords++; badRecordsInBlock++; err.println("Details #2: blockRemaining: "+ blockRemaining + ", lastRecordWasBad: " + lastRecordWasBad + ", numCorruptRecords: " + numCorruptRecords + ", badRecordsInBloc: " + badRecordsInBlock); } } err.println("Details After: blockRemaining: "+ blockRemaining + ", lastRecordWasBad: " + lastRecordWasBad + ", numCorruptRecords: " + numCorruptRecords + ", badRecordsInBloc: " + badRecordsInBlock); if (badRecordsInBlock != 0) { err.println("** Number of unrecoverable records in block: " + (badRecordsInBlock)); } position = fileReader.previousSync(); } catch (Exception e) { // if(lastNumRecords == numRecords) { // if(lastPostion == position) { if(false) { position++; } else { lastNumRecords = numRecords; lastPostion = position; err.println("Failed to read block " + numBlocks + ". Unknown record " + "count in block. Skipping. Reason: " + e.getMessage()); numCorruptBlocks++; err.printf( " int numBlocks = %d;\n" + " int numCorruptBlocks = %d;\n" + " int numRecords = %d;\n" + " int numCorruptRecords = %d;\n" + " int recordsWritten = %d;\n" + " long position = %d / 0x%04x;\n" + " long blockSize = %d / 0x%04x;\n" + " long blockCount = %d / 0x%04x\n", numBlocks, numCorruptBlocks, numRecords, numCorruptRecords, recordsWritten, position, position, blockSize, blockSize, blockCount, blockCount); try { fileReader.sync(position); } catch (Exception e2) { err.println("failed to sync to sync marker, aborting"); e2.printStackTrace(err); return 1; } } } } } finally { if (fileWritten) { try { fileWriter.close(); } catch (Exception e) { e.printStackTrace(err); return 1; } } } }
With the above code, we can see a infinite loop pattern emerge (as seen with the extra printing):
Failed to read block 0. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! int numBlocks = 0; int numCorruptBlocks = 1; int numRecords = 0; int numCorruptRecords = 0; int recordsWritten = 0; long position = 6504 / 0x1968; long blockSize = 0 / 0x0000; long blockCount = 0 / 0x0000 Details Prior: numblocks: 1, blockRemaining: 464, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0 Details After: blockRemaining: 0, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0 Details Prior: numblocks: 2, blockRemaining: 464, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0 Details After: blockRemaining: 0, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0 Details Prior: numblocks: 3, blockRemaining: 464, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0 Details After: blockRemaining: 0, lastRecordWasBad: false, numCorruptRecords: 0, badRecordsInBloc: 0 Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! int numBlocks = 3; int numCorruptBlocks = 2; int numRecords = 1392; int numCorruptRecords = 0; int recordsWritten = 1392; long position = 248627 / 0x3cb33; long blockSize = 17611 / 0x44cb; long blockCount = 464 / 0x01d0 Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! int numBlocks = 3; int numCorruptBlocks = 3; int numRecords = 1392; int numCorruptRecords = 0; int recordsWritten = 1392; long position = 248627 / 0x3cb33; long blockSize = 17611 / 0x44cb; long blockCount = 464 / 0x01d0 Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! int numBlocks = 3; int numCorruptBlocks = 4; int numRecords = 1392; int numCorruptRecords = 0; int recordsWritten = 1392; long position = 248627 / 0x3cb33; long blockSize = 17611 / 0x44cb; long blockCount = 464 / 0x01d0 Failed to read block 3. Unknown record count in block. Skipping. Reason: java.io.IOException: Invalid sync! int numBlocks = 3; int numCorruptBlocks = 5; int numRecords = 1392; int numCorruptRecords = 0; int recordsWritten = 1392; long position = 248627 / 0x3cb33; long blockSize = 17611 / 0x44cb; long blockCount = 464 / 0x01d0
While it's it may not be the probable correct & acceptable fix, the following logic flip seems to make the repair complete in the above provided code (with some minor logic):
if(lastPostion == position) { // if(false) {
All the unit tests associated the TestDataFileRepairTool class seem to pass including the hacked two above.
I cannot provide the proprietary file, but I'd be happy to address questions.