Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1337408) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -421,6 +421,8 @@ HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS( "hive.merge.current.job.has.dynamic.partitions", false), + HIVEUSEEXPLICITRCFILEHEADER("hive.exec.rcfile.use.explicit.header", true), + HIVESKEWJOIN("hive.optimize.skewjoin", false), HIVECONVERTJOIN("hive.auto.convert.join", false), HIVESKEWJOINKEY("hive.skewjoin.key", 100000), Index: ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (revision 1337408) +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (working copy) @@ -18,9 +18,10 @@ package org.apache.hadoop.hive.ql.io; +import java.io.DataInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.RandomAccessFile; -import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Properties; @@ -35,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; @@ -45,9 +47,9 @@ import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -347,6 +349,11 @@ private void writeTest(FileSystem fs, int count, Path file, byte[][] fieldsData) throws IOException, SerDeException { + writeTest(fs, count, file, fieldsData, conf); + } + + private void writeTest(FileSystem fs, int count, Path file, + byte[][] fieldsData, Configuration conf) throws IOException, SerDeException { fs.delete(file, true); RCFileOutputFormat.setColumnNumber(conf, fieldsData.length); @@ -556,6 +563,7 @@ super(in); } + @Override public void close() throws IOException { closed = true; super.close(); @@ -579,6 +587,7 @@ new RCFile.Reader(fs, path, conf) { // this method is called by the RCFile.Reader constructor, overwritten, // so we can access the opened file + @Override protected FSDataInputStream openFile(FileSystem fs, Path file, int bufferSize, long length) throws IOException { final InputStream in = super.openFile(fs, file, bufferSize, length); @@ -594,4 +603,31 @@ openedFile[0].isClosed()); } + public void testRCFileHeader(char[] expected, Configuration conf) + throws IOException, SerDeException { + + writeTest(fs, 10000, file, bytesArray, conf); + DataInputStream di = fs.open(file, 10000); + byte[] bytes = new byte[3]; + di.read(bytes); + for (int i = 0; i < expected.length; i++) { + assertTrue("Headers did not match", bytes[i] == expected[i]); + } + di.close(); + } + + public void testNonExplicitRCFileHeader() throws IOException, SerDeException { + Configuration conf = new Configuration(); + conf.setBoolean(HiveConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, false); + char[] expected = new char[] {'S', 'E', 'Q'}; + testRCFileHeader(expected, conf); + } + + public void testExplicitRCFileHeader() throws IOException, SerDeException { + Configuration conf = new Configuration(); + conf.setBoolean(HiveConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true); + char[] expected = new char[] {'R', 'C', 'F'}; + testRCFileHeader(expected, conf); + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (revision 1337408) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (working copy) @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; @@ -170,8 +171,12 @@ (byte) 'S', (byte) 'E', (byte) 'Q'}; // the version that was included with the original magic, which is mapped // into ORIGINAL_VERSION - private static final byte ORIGINAL_MAGIC_VERSION = 6; + private static final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6; + private static final byte[] ORIGINAL_MAGIC_VERSION = new byte[] { + (byte) 'S', (byte) 'E', (byte) 'Q', ORIGINAL_MAGIC_VERSION_WITH_METADATA + }; + // The 'magic' bytes at the beginning of the RCFile private static final byte[] MAGIC = new byte[] { (byte) 'R', (byte) 'C', (byte) 'F'}; @@ -618,6 +623,8 @@ private final int[] plainTotalColumnLength; private final int[] comprTotalColumnLength; + boolean useNewMagic = true; + /* * used for buffering appends before flush them out */ @@ -782,8 +789,12 @@ /** Write the initial part of file header. */ void initializeFileHeader() throws IOException { - out.write(MAGIC); - out.write(CURRENT_VERSION); + if (useNewMagic) { + out.write(MAGIC); + out.write(CURRENT_VERSION); + } else { + out.write(ORIGINAL_MAGIC_VERSION); + } } /** Write the final part of file header. */ @@ -798,7 +809,14 @@ /** Write and flush the file header. */ void writeFileHeader() throws IOException { - out.writeBoolean(isCompressed()); + if (useNewMagic) { + out.writeBoolean(isCompressed()); + } else { + Text.writeString(out, KeyBuffer.class.getName()); + Text.writeString(out, ValueBuffer.class.getName()); + out.writeBoolean(isCompressed()); + out.writeBoolean(false); + } if (isCompressed()) { Text.writeString(out, (codec.getClass()).getName()); @@ -836,6 +854,8 @@ keyDeflateOut = new DataOutputStream(new BufferedOutputStream( keyDeflateFilter)); } + this.useNewMagic = + conf.getBoolean(HiveConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true); } /** Returns the compression codec of data in this file. */ @@ -1235,7 +1255,7 @@ if (Arrays.equals(magic, ORIGINAL_MAGIC)) { byte vers = in.readByte(); - if (vers != ORIGINAL_MAGIC_VERSION) { + if (vers != ORIGINAL_MAGIC_VERSION_WITH_METADATA) { throw new IOException(file + " is a version " + vers + " SequenceFile instead of an RCFile."); }