Index: ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (revision 1161660) +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (working copy) @@ -19,10 +19,12 @@ package org.apache.hadoop.hive.ql.io; import java.io.IOException; +import java.io.RandomAccessFile; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Properties; +import java.util.Random; import junit.framework.TestCase; @@ -207,6 +209,59 @@ reader.close(); } + public void testReadCorruptFile() throws IOException, SerDeException { + fs.delete(file, true); + + byte[][] record = {null, null, null, null, null, null, null, null}; + + RCFileOutputFormat.setColumnNumber(conf, expectedFieldsData.length); + RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null, + new DefaultCodec()); + BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length); + final int recCount = 100; + Random rand = new Random(); + for (int recIdx = 0; recIdx < recCount; recIdx++) { + for (int i = 0; i < record.length; i++) { + record[i] = new Integer(rand.nextInt()).toString().getBytes("UTF-8"); + } + for (int i = 0; i < record.length; i++) { + BytesRefWritable cu = new BytesRefWritable(record[i], 0, + record[i].length); + bytes.set(i, cu); + } + writer.append(bytes); + bytes.clear(); + } + writer.close(); + + // Insert junk in middle of file. Assumes file is on local disk. + RandomAccessFile raf = new RandomAccessFile(file.toUri().getPath(), "rw"); + long corruptOffset = raf.length() / 2; + LOG.info("corrupting " + raf + " at offset " + corruptOffset); + raf.seek(corruptOffset); + raf.writeBytes("junkjunkjunkjunkjunkjunkjunkjunk"); + raf.close(); + + // Set the option for tolerating corruptions. The read should succeed. + Configuration tmpConf = new Configuration(conf); + tmpConf.setBoolean("hive.io.rcfile.tolerate.corruptions", true); + RCFile.Reader reader = new RCFile.Reader(fs, file, tmpConf); + + LongWritable rowID = new LongWritable(); + + while (true) { + boolean more = reader.next(rowID); + if (!more) { + break; + } + BytesRefArrayWritable cols = new BytesRefArrayWritable(); + reader.getCurrentRow(cols); + cols.resetValid(8); + } + + reader.close(); + } + public void testWriteAndFullyRead() throws IOException, SerDeException { writeTest(fs, 10000, file, bytesArray); fullyReadTest(fs, 10000, file); Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (revision 1161660) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (working copy) @@ -155,6 +155,9 @@ public static final String COLUMN_NUMBER_CONF_STR = "hive.io.rcfile.column.number.conf"; + public static final String TOLERATE_CORRUPTIONS_CONF_STR = + "hive.io.rcfile.tolerate.corruptions"; + /* * these header and Sync are kept from SequenceFile, for compatible of * SequenceFile's format. @@ -351,6 +354,7 @@ private boolean[] decompressedFlag = null; private int numCompressed; private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null; + private boolean lazyDecompress = true; boolean inited = false; @@ -381,7 +385,13 @@ public ValueBuffer(KeyBuffer currentKey, int columnNumber, boolean[] skippedCols, CompressionCodec codec) throws IOException { + this(currentKey, columnNumber, skippedCols, codec, true); + } + public ValueBuffer(KeyBuffer currentKey, int columnNumber, + boolean[] skippedCols, CompressionCodec codec, boolean lazyDecompress) + throws IOException { + this.lazyDecompress = lazyDecompress; keyBuffer = currentKey; this.columnNumber = columnNumber; @@ -469,7 +479,12 @@ valBuf.reset(); valBuf.write(in, vaRowsLen); if (codec != null) { - decompressedFlag[addIndex] = false; + if (lazyDecompress) { + decompressedFlag[addIndex] = false; + } else { + lazyDecompressCallbackObjs[addIndex].decompress(); + decompressedFlag[addIndex] = true; + } } addIndex++; } @@ -1024,6 +1039,8 @@ private int passedRowsNum = 0; + // Should we try to tolerate corruption? Default is No. + private boolean tolerateCorruptions = false; private boolean decompress = false; @@ -1049,6 +1066,8 @@ /** Create a new RCFile reader. */ public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf, long start, long length) throws IOException { + tolerateCorruptions = conf.getBoolean( + TOLERATE_CORRUPTIONS_CONF_STR, false); conf.setInt("io.file.buffer.size", bufferSize); this.file = file; in = openFile(fs, file, bufferSize, length); @@ -1133,7 +1152,9 @@ } currentKey = createKeyBuffer(); - currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec); + boolean lazyDecompress = !tolerateCorruptions; + currentValue = new ValueBuffer( + null, columnNumber, skippedColIDs, codec, lazyDecompress); } /** @@ -1280,11 +1301,6 @@ return new KeyBuffer(columnNumber); } - @SuppressWarnings("unused") - private ValueBuffer createValueBuffer(KeyBuffer key) throws IOException { - return new ValueBuffer(key, skippedColIDs); - } - /** * Read and return the next record length, potentially skipping over a sync * block. @@ -1495,10 +1511,14 @@ } int ret = -1; - try { - ret = nextKeyBuffer(); - } catch (EOFException eof) { - eof.printStackTrace(); + if (tolerateCorruptions) { + ret = nextKeyValueTolerateCorruptions(); + } else { + try { + ret = nextKeyBuffer(); + } catch (EOFException eof) { + eof.printStackTrace(); + } } if (ret > 0) { return next(readRows); @@ -1506,6 +1526,35 @@ return false; } + private int nextKeyValueTolerateCorruptions() throws IOException { + long currentOffset = in.getPos(); + int ret = -1; + try { + ret = nextKeyBuffer(); + this.currentValueBuffer(); + } catch (EOFException eof) { + LOG.warn("Ignoring EOFException in file " + file + + " after offset " + currentOffset, eof); + ret = -1; + } catch (ChecksumException ce) { + LOG.warn("Ignoring ChecksumException in file " + file + + " after offset " + currentOffset, ce); + ret = -1; + } catch (IOException ioe) { + // We have an IOException other than EOF or ChecksumException + // This is likely a read-error, not corruption, re-throw. + throw ioe; + } catch (Throwable t) { + // We got an exception that is not IOException + // (typically OOM, IndexOutOfBounds, InternalError). + // This is most likely a corruption. + LOG.warn("Ignoring unknown error in " + file + + " after offset " + currentOffset, t); + ret = -1; + } + return ret; + } + public boolean hasRecordsInBuffer() { return readRowsIndexInBuffer < recordsNumInValBuffer; } @@ -1522,11 +1571,18 @@ return; } - if (!currentValue.inited) { - currentValueBuffer(); - // do this only when not initialized, but we may need to find a way to - // tell the caller how to initialize the valid size + if (tolerateCorruptions) { + if (!currentValue.inited) { + currentValueBuffer(); + } ret.resetValid(columnNumber); + } else { + if (!currentValue.inited) { + currentValueBuffer(); + // do this only when not initialized, but we may need to find a way to + // tell the caller how to initialize the valid size + ret.resetValid(columnNumber); + } } // we do not use BytesWritable here to avoid the byte-copy from