diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java index d16b082..5a0fd76 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java @@ -155,6 +155,9 @@ public class RCFile { public static final String COLUMN_NUMBER_CONF_STR = "hive.io.rcfile.column.number.conf"; + public static final String TOLERATE_CORRUPTIONS_CONF_STR = + "hive.io.rcfile.tolerate.corruptions"; + /* * these header and Sync are kept from SequenceFile, for compatible of * SequenceFile's format. @@ -351,6 +354,7 @@ public class RCFile { private boolean[] decompressedFlag = null; private int numCompressed; private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null; + private boolean lazyDecompress = true; boolean inited = false; @@ -381,7 +385,13 @@ public class RCFile { public ValueBuffer(KeyBuffer currentKey, int columnNumber, boolean[] skippedCols, CompressionCodec codec) throws IOException { + this(currentKey, columnNumber, skippedCols, codec, true); + } + public ValueBuffer(KeyBuffer currentKey, int columnNumber, + boolean[] skippedCols, CompressionCodec codec, boolean lazyDecompress) + throws IOException { + this.lazyDecompress = lazyDecompress; keyBuffer = currentKey; this.columnNumber = columnNumber; @@ -469,7 +479,12 @@ public class RCFile { valBuf.reset(); valBuf.write(in, vaRowsLen); if (codec != null) { - decompressedFlag[addIndex] = false; + if (lazyDecompress) { + decompressedFlag[addIndex] = false; + } else { + lazyDecompressCallbackObjs[addIndex].decompress(); + decompressedFlag[addIndex] = true; + } } addIndex++; } @@ -1024,6 +1039,8 @@ public class RCFile { private int passedRowsNum = 0; + // Should we try to tolerate corruption? Default is No. + private boolean tolerateCorruptions = false; private boolean decompress = false; @@ -1049,6 +1066,8 @@ public class RCFile { /** Create a new RCFile reader. */ public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf, long start, long length) throws IOException { + tolerateCorruptions = conf.getBoolean( + TOLERATE_CORRUPTIONS_CONF_STR, tolerateCorruptions); conf.setInt("io.file.buffer.size", bufferSize); this.file = file; in = openFile(fs, file, bufferSize, length); @@ -1133,7 +1152,9 @@ public class RCFile { } currentKey = createKeyBuffer(); - currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec); + boolean lazyDecompress = !tolerateCorruptions; + currentValue = new ValueBuffer( + null, columnNumber, skippedColIDs, codec, lazyDecompress); } /** @@ -1280,11 +1301,6 @@ public class RCFile { return new KeyBuffer(columnNumber); } - @SuppressWarnings("unused") - private ValueBuffer createValueBuffer(KeyBuffer key) throws IOException { - return new ValueBuffer(key, skippedColIDs); - } - /** * Read and return the next record length, potentially skipping over a sync * block. @@ -1495,10 +1511,32 @@ public class RCFile { } int ret = -1; - try { - ret = nextKeyBuffer(); - } catch (EOFException eof) { - eof.printStackTrace(); + if (tolerateCorruptions) { + long currentOffset = in.getPos(); + try { + ret = nextKeyBuffer(); + this.currentValueBuffer(); + } catch (EOFException eof) { + eof.printStackTrace(); + return false; + } catch (IOException ioe) { + // We have an IOException other than EOF. + // This is likely a read-error, not corruption, re-throw. + throw ioe; + } catch (Throwable t) { + // We got an exception that is not IOException + // (typically OOM, IndexOutOfBounds, InternalError). + // This is most likely a corruption. + LOG.error("Encountered unknown error in " + file + " after offset " + + currentOffset, t); + return false; + } + } else { + try { + ret = nextKeyBuffer(); + } catch (EOFException eof) { + eof.printStackTrace(); + } } if (ret > 0) { return next(readRows); @@ -1522,11 +1560,18 @@ public class RCFile { return; } - if (!currentValue.inited) { - currentValueBuffer(); - // do this only when not initialized, but we may need to find a way to - // tell the caller how to initialize the valid size + if (tolerateCorruptions) { + if (!currentValue.inited) { + currentValueBuffer(); + } ret.resetValid(columnNumber); + } else { + if (!currentValue.inited) { + currentValueBuffer(); + // do this only when not initialized, but we may need to find a way to + // tell the caller how to initialize the valid size + ret.resetValid(columnNumber); + } } // we do not use BytesWritable here to avoid the byte-copy from