diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java index 2a27676..90eb6c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java @@ -967,6 +967,25 @@ public Writer(FileSystem fs, Configuration conf, Path name, metadata, codec); } + /** + * Construct RCFile Writer, appending new data to existing RCFile + * + * @param fs + * the file system + * @param reader + * existing reader we copy name, metadata and compression codec from + * @throws IOException + */ + public Writer(FileSystem fs, Reader reader) throws IOException { + this(fs, fs.getConf(), reader.file, fs.getConf().getInt("io.file.buffer.size", 4096), + ShimLoader.getHadoopShims().getDefaultReplication(fs, reader.file), + ShimLoader.getHadoopShims().getDefaultBlockSize(fs, reader.file), + null, reader.getMetadata(), reader.getCompressionCodec(), true); + // preserve sync point value in appendable writer + byte[] rsync = reader.getSync(); + System.arraycopy(rsync, 0, this.sync, 0, rsync.length); + } + /** * * Constructs a RCFile Writer. @@ -987,6 +1006,15 @@ public Writer(FileSystem fs, Configuration conf, Path name, public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize, short replication, long blockSize, Progressable progress, Metadata metadata, CompressionCodec codec) throws IOException { + this(fs, conf, name, bufferSize, replication, blockSize, progress, metadata, codec, false); + } + + /** + * Constructor is protected to prevent misuse of append option which require extra handling + */ + protected Writer(FileSystem fs, Configuration conf, Path name, int bufferSize, + short replication, long blockSize, Progressable progress, + Metadata metadata, CompressionCodec codec, boolean append) throws IOException { RECORD_INTERVAL = HiveConf.getIntVar(conf, HIVE_RCFILE_RECORD_INTERVAL); columnNumber = HiveConf.getIntVar(conf, HIVE_RCFILE_COLUMN_NUMBER_CONF); @@ -1006,11 +1034,15 @@ public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize, columnBuffers[i] = new ColumnBuffer(); } - init(conf, fs.create(name, true, bufferSize, replication, - blockSize, progress), codec, metadata); - initializeFileHeader(); - writeFileHeader(); - finalizeFileHeader(); + if (append) { + init(conf, fs.append(name, bufferSize, progress), codec, metadata); + } + else { + init(conf, fs.create(name, true, bufferSize, replication, blockSize, progress), codec, metadata); + initializeFileHeader(); + writeFileHeader(); + finalizeFileHeader(); + } key = new KeyBuffer(columnNumber); plainTotalColumnLength = new int[columnNumber]; @@ -2045,5 +2077,9 @@ public CompressionCodec getCompressionCodec() { return this.codec; } + //return sync point + protected byte[] getSync() { + return sync.clone(); + } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java index cff5ada..6026b2e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -411,6 +412,14 @@ public void testWriteAndFullyRead() throws IOException, SerDeException { fullyReadTest(fs, 10000, file); } +// @Test + public void testAppendAndFullyRead() throws IOException, SerDeException { + // this supposed to be a test, but, unfortunately, HDFS LocalFilesystem doesn't support append() + writeTest(fs, 10000, file, bytesArray); + appendTest(fs, 10000, file, bytesArray); + fullyReadTest(fs, 20000, file); + } + @Test public void testWriteAndPartialRead() throws IOException, SerDeException { writeTest(fs, 10000, file, bytesArray); @@ -505,6 +514,28 @@ private void writeTest(FileSystem fs, int count, Path file, + " number columns and " + count + " number rows is " + fileLen); } + private void appendTest(FileSystem fs, int count, Path file, + byte[][] fieldsData) throws IOException, SerDeException { + RCFileOutputFormat.setColumnNumber(conf, fieldsData.length); + RCFile.Reader reader = new RCFile.Reader(fs, file, conf); + RCFile.Writer writer = new RCFile.Writer(fs, reader); + reader.close(); + BytesRefArrayWritable bytes = new BytesRefArrayWritable(fieldsData.length); + for (int i = 0; i < fieldsData.length; i++) { + BytesRefWritable cu = null; + cu = new BytesRefWritable(fieldsData[i], 0, fieldsData[i].length); + bytes.set(i, cu); + } + + for (int i = 0; i < count; i++) { + writer.append(bytes); + } + writer.close(); + long fileLen = fs.getFileStatus(file).getLen(); + System.out.println("The appended file size of RCFile with " + bytes.size() + + " number columns and " + count + " number rows is " + fileLen); + } + private static Properties createProperties() { Properties tbl = new Properties();