Index: ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (revision 944004) +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; +import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -30,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -46,6 +48,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -71,6 +74,8 @@ private static Path file; + private static Path dir; + private static FileSystem fs; private static Properties tbl; @@ -78,7 +83,7 @@ static { try { fs = FileSystem.getLocal(conf); - Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred"); + dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred"); file = new Path(dir, "test_rcfile"); fs.delete(dir, true); // the SerDe part is from TestLazySimpleSerDe @@ -216,7 +221,47 @@ writeTest(fs, 10000, file, bytesArray); partialReadTest(fs, 10000, file); } + + public void testMergeFile() throws IOException, SerDeException { + + Path file_1 = new Path(dir, "test_rcfile_1"); + Path file_2 = new Path(dir, "test_rcfile_2"); + Path file_merge = new Path(dir, "test_rcfile_merge"); + writeTest(fs, 10000, file_1, bytesArray); + writeTest(fs, 10000, file_2, bytesArray); + + List files = new ArrayList (); + files.add(file_2); + mergeRCFile(file_merge, file_1, files); + fullyReadTest(fs, 10000 * 2, file_merge); + + //add another file_2 into the tail list. So we want to merge file_1 + file_2 + file_2 + files.add(file_2); + mergeRCFile(file_merge, file_1, files); + fullyReadTest(fs, 10000 * 3, file_merge); + + fs.delete(file_1, true); + fs.delete(file_2, true); + fs.delete(file_merge, true); + } + private void mergeRCFile(Path file_merge, Path headFile, List tailFiles) throws IOException { + FSDataInputStream in_1 = RCFile.Reader.getDataInputStream(fs, headFile, + 1024 * 4, conf, false); + + FSDataOutputStream out = fs.create(file_merge, true); + IOUtils.copyBytes(in_1, out, conf, false); + in_1.close(); + + for (Path f: tailFiles) { + in_1 = RCFile.Reader.getDataInputStream(fs, f, 1024 * 4, conf, true); + IOUtils.copyBytes(in_1, out, conf, false); + in_1.close(); + } + + out.close(); + } + /** For debugging and testing. */ public static void main(String[] args) throws Exception { int count = 10000; Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (revision 944004) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (working copy) @@ -1054,6 +1054,32 @@ currentKey = createKeyBuffer(); currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec); } + + /** + * Get a FSDataInputStream for a given file. Use 'skipHeaderSection' to + * specify whether or not to skip header section. + * + * @param fs FileSystem handler + * @param file file path + * @param bufferSize + * @param conf + * @param skipHeaderSection whether or not to skip header section + * @return + * @throws IOException + */ + public static FSDataInputStream getDataInputStream(FileSystem fs, + Path file, int bufferSize, Configuration conf, boolean skipHeaderSection) + throws IOException { + if (skipHeaderSection) { + Reader reader = new RCFile.Reader(fs, file, bufferSize, conf, 0, fs + .getFileStatus(file).getLen()); + return reader.in; + } else { + conf.setInt("io.file.buffer.size", bufferSize); + FSDataInputStream in = fs.open(file, bufferSize); + return in; + } + } /** * Override this method to specialize the type of