Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (revision 911664) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (working copy) @@ -970,16 +970,25 @@ long start, long length) throws IOException { conf.setInt("io.file.buffer.size", bufferSize); this.file = file; - in = fs.open(file, bufferSize); + in = openFile(fs, file, bufferSize, length); this.conf = conf; end = start + length; - if (start > 0) { - seek(0); - init(); - seek(start); - } else { - init(); + boolean succeed = false; + try { + if (start > 0) { + seek(0); + init(); + seek(start); + } else { + init(); + } + succeed = true; + } finally { + if (!succeed) { + IOUtils.cleanup(LOG, in); + } } + columnNumber = Integer.parseInt(metadata.get( new Text(COLUMN_NUMBER_METADATA_STR)).toString()); @@ -1037,6 +1046,15 @@ currentKey = createKeyBuffer(); currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec); } + + /** + * Override this method to specialize the type of + * {@link FSDataInputStream} returned. + */ + protected FSDataInputStream openFile(FileSystem fs, Path file, + int bufferSize, long length) throws IOException { + return fs.open(file, bufferSize); + } private void init() throws IOException { byte[] versionBlock = new byte[VERSION.length]; Index: ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (revision 909965) +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io; import java.io.IOException; +import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Properties; @@ -28,7 +29,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -460,5 +463,53 @@ } assertEquals("readCount should be equal to writeCount", readCount, writeCount); } + + + // adopted Hadoop-5476 (calling new SequenceFile.Reader(...) leaves an + // InputStream open, if the given sequence file is broken) to RCFile + private static class TestFSDataInputStream extends FSDataInputStream { + private boolean closed = false; + + private TestFSDataInputStream(InputStream in) throws IOException { + super(in); + } + + public void close() throws IOException { + closed = true; + super.close(); + } + + public boolean isClosed() { + return closed; + } + } + + public void testCloseForErroneousRCFile() throws IOException { + Configuration conf = new Configuration(); + LocalFileSystem fs = FileSystem.getLocal(conf); + // create an empty file (which is not a valid rcfile) + Path path = new Path(System.getProperty("test.build.data", ".") + + "/broken.rcfile"); + fs.create(path).close(); + // try to create RCFile.Reader + final TestFSDataInputStream[] openedFile = new TestFSDataInputStream[1]; + try { + new RCFile.Reader(fs, path, conf) { + // this method is called by the RCFile.Reader constructor, overwritten, + // so we can access the opened file + protected FSDataInputStream openFile(FileSystem fs, Path file, + int bufferSize, long length) throws IOException { + final InputStream in = super.openFile(fs, file, bufferSize, length); + openedFile[0] = new TestFSDataInputStream(in); + return openedFile[0]; + } + }; + fail("IOException expected."); + } catch (IOException expected) { + } + assertNotNull(path + " should have been opened.", openedFile[0]); + assertTrue("InputStream for " + path + " should have been closed.", + openedFile[0].isClosed()); + } }