commit a75015d4c092b0a8f0daf7c4787b2ba4dfd88b04 Author: Owen O'Malley Date: Wed Jun 20 09:56:11 2012 -0700 first pass of rcfile writer improvements diff --git ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java index 1c024a8..6c8c8d8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.io; -import java.io.BufferedOutputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; @@ -209,7 +208,7 @@ public class RCFile { * */ public static class KeyBuffer implements WritableComparable { - // each column's value length in a split + // each column's length in the value private int[] eachColumnValueLen = null; private int[] eachColumnUncompressedValueLen = null; // stores each cell's length of a column in one DataOutputBuffer element @@ -224,18 +223,22 @@ public class RCFile { return columnNumber; } + @SuppressWarnings("unused") + @Deprecated public KeyBuffer(){ } - KeyBuffer(int columnNumber) { - this(0, columnNumber); - } - - KeyBuffer(int numberRows, int columnNum) { + KeyBuffer(int columnNum) { columnNumber = columnNum; eachColumnValueLen = new int[columnNumber]; eachColumnUncompressedValueLen = new int[columnNumber]; allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber]; + } + + @SuppressWarnings("unused") + @Deprecated + KeyBuffer(int numberRows, int columnNum) { + this(columnNum); this.numberRows = numberRows; } @@ -388,18 +391,26 @@ public class RCFile { NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer(); CompressionInputStream deflatFilter = null; + @SuppressWarnings("unused") + @Deprecated public ValueBuffer() throws IOException { } + @SuppressWarnings("unused") + @Deprecated public ValueBuffer(KeyBuffer keyBuffer) throws IOException { - this(keyBuffer, null); + this(keyBuffer, keyBuffer.columnNumber, null, null, true); } + @SuppressWarnings("unused") + @Deprecated public ValueBuffer(KeyBuffer keyBuffer, boolean[] skippedColIDs) throws IOException { - this(keyBuffer, keyBuffer.columnNumber, skippedColIDs, null); + this(keyBuffer, keyBuffer.columnNumber, skippedColIDs, null, true); } + @SuppressWarnings("unused") + @Deprecated public ValueBuffer(KeyBuffer currentKey, int columnNumber, boolean[] skippedCols, CompressionCodec codec) throws IOException { this(currentKey, columnNumber, skippedCols, codec, true); @@ -422,11 +433,9 @@ public class RCFile { } int skipped = 0; - if (skippedColIDs != null) { - for (boolean currentSkip : skippedColIDs) { - if (currentSkip) { - skipped++; - } + for (boolean currentSkip : skippedColIDs) { + if (currentSkip) { + skipped++; } } loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber @@ -464,6 +473,8 @@ public class RCFile { } } + @SuppressWarnings("unused") + @Deprecated public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer, int addIndex) { loadedColumnsValueBuffer[addIndex] = valBuffer; @@ -576,7 +587,6 @@ public class RCFile { CompressionCodec codec = null; Metadata metadata = null; - Compressor compressor = null; // Insert a globally unique 16-byte value every few entries, so that one // can seek into the middle of a file and then synchronize with record @@ -604,22 +614,13 @@ public class RCFile { // how many records already buffered private int bufferedRecords = 0; - NonSyncDataOutputBuffer[] compressionBuffer; - CompressionOutputStream[] deflateFilter = null; - DataOutputStream[] deflateOut = null; private final ColumnBuffer[] columnBuffers; - NonSyncDataOutputBuffer keyCompressionBuffer; - CompressionOutputStream keyDeflateFilter; - DataOutputStream keyDeflateOut; - Compressor keyCompressor; - private int columnNumber = 0; private final int[] columnValuePlainLength; KeyBuffer key = null; - ValueBuffer value = null; private final int[] plainTotalColumnLength; private final int[] comprTotalColumnLength; @@ -669,7 +670,6 @@ public class RCFile { private void startNewGroup(int currentLen) { prevValueLength = currentLen; runLength = 0; - return; } public void clear() throws IOException { @@ -713,7 +713,7 @@ public class RCFile { */ public Writer(FileSystem fs, Configuration conf, Path name, Progressable progress, CompressionCodec codec) throws IOException { - this(fs, conf, name, null, new Metadata(), codec); + this(fs, conf, name, progress, new Metadata(), codec); } /** @@ -725,8 +725,8 @@ public class RCFile { * the configuration file * @param name * the file name - * @param progress - * @param metadata + * @param progress a progress meter to update as the file is written + * @param metadata a string to string map in the file header * @throws IOException */ public Writer(FileSystem fs, Configuration conf, Path name, @@ -746,11 +746,11 @@ public class RCFile { * the configuration file * @param name * the file name - * @param bufferSize - * @param replication - * @param blockSize - * @param progress - * @param metadata + * @param bufferSize the size of the file buffer + * @param replication the number of replicas for the file + * @param blockSize the block size of the file + * @param progress the progress meter for writing the file + * @param metadata a string to string map in the file header * @throws IOException */ public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize, @@ -775,13 +775,12 @@ public class RCFile { columnBuffers[i] = new ColumnBuffer(); } - init(name, conf, fs.create(name, true, bufferSize, replication, - blockSize, progress), codec, metadata); + init(conf, fs.create(name, true, bufferSize, replication, + blockSize, progress), codec, metadata); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); key = new KeyBuffer(columnNumber); - value = new ValueBuffer(key); plainTotalColumnLength = new int[columnNumber]; comprTotalColumnLength = new int[columnNumber]; @@ -824,41 +823,19 @@ public class RCFile { metadata.write(out); } - void init(Path name, Configuration conf, FSDataOutputStream out, + void init(Configuration conf, FSDataOutputStream out, CompressionCodec codec, Metadata metadata) throws IOException { this.conf = conf; this.out = out; this.codec = codec; this.metadata = metadata; - if (this.codec != null) { - ReflectionUtils.setConf(codec, this.conf); - compressor = CodecPool.getCompressor(codec); - - compressionBuffer = new NonSyncDataOutputBuffer[columnNumber]; - deflateFilter = new CompressionOutputStream[columnNumber]; - deflateOut = new DataOutputStream[columnNumber]; - for (int i = 0; i < columnNumber; i++) { - compressionBuffer[i] = new NonSyncDataOutputBuffer(); - deflateFilter[i] = codec.createOutputStream(compressionBuffer[i], - compressor); - if (deflateFilter[i] instanceof SchemaAwareCompressionOutputStream) { - ((SchemaAwareCompressionOutputStream)deflateFilter[i]).setColumnIndex(i); - } - deflateOut[i] = new DataOutputStream(new BufferedOutputStream( - deflateFilter[i])); - } - keyCompressor = CodecPool.getCompressor(codec); - keyCompressionBuffer = new NonSyncDataOutputBuffer(); - keyDeflateFilter = codec.createOutputStream(keyCompressionBuffer, - keyCompressor); - keyDeflateOut = new DataOutputStream(new BufferedOutputStream( - keyDeflateFilter)); - } this.useNewMagic = conf.getBoolean(HiveConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true); } /** Returns the compression codec of data in this file. */ + @SuppressWarnings("unused") + @Deprecated public CompressionCodec getCompressionCodec() { return codec; } @@ -873,6 +850,8 @@ public class RCFile { } /** Returns the configuration of this file. */ + @SuppressWarnings("unused") + @Deprecated Configuration getConf() { return conf; } @@ -892,7 +871,7 @@ public class RCFile { * If its size() is greater then the column number in the file, the exceeded * columns' bytes are ignored. * - * @param val + * @param val a BytesRefArrayWritable with the list of serialized columns * @throws IOException */ public void append(Writable val) throws IOException { @@ -928,39 +907,48 @@ public class RCFile { private void flushRecords() throws IOException { key.numberRows = bufferedRecords; - value.keyBuffer = key; + Compressor compressor = null; + NonSyncDataOutputBuffer valueBuffer = null; + CompressionOutputStream deflateFilter = null; + DataOutputStream deflateOut = null; + boolean isCompressed = isCompressed(); int valueLength = 0; + if (isCompressed) { + ReflectionUtils.setConf(codec, this.conf); + compressor = CodecPool.getCompressor(codec); + valueBuffer = new NonSyncDataOutputBuffer(); + deflateFilter = codec.createOutputStream(valueBuffer, compressor); + deflateOut = new DataOutputStream(deflateFilter); + } + for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) { ColumnBuffer currentBuf = columnBuffers[columnIndex]; currentBuf.flushGroup(); NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer; + int colLen; + int plainLen = columnValuePlainLength[columnIndex]; - if (isCompressed()) { - compressionBuffer[columnIndex].reset(); - deflateFilter[columnIndex].resetState(); - deflateOut[columnIndex].write(columnValue.getData(), 0, columnValue - .getLength()); - deflateOut[columnIndex].flush(); - deflateFilter[columnIndex].finish(); - int colLen = compressionBuffer[columnIndex].getLength(); - key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, - columnValuePlainLength[columnIndex], columnIndex); - value.setColumnValueBuffer(compressionBuffer[columnIndex], - columnIndex); - valueLength += colLen; - plainTotalColumnLength[columnIndex] += columnValuePlainLength[columnIndex]; - comprTotalColumnLength[columnIndex] += colLen; + if (isCompressed) { + if (deflateFilter instanceof SchemaAwareCompressionOutputStream) { + ((SchemaAwareCompressionOutputStream)deflateFilter). + setColumnIndex(columnIndex); + } + deflateFilter.resetState(); + deflateOut.write(columnValue.getData(), 0, columnValue.getLength()); + deflateOut.flush(); + deflateFilter.finish(); + // find how much compressed data was added for this column + colLen = valueBuffer.getLength() - valueLength; } else { - int colLen = columnValuePlainLength[columnIndex]; - key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, colLen, - columnIndex); - value.setColumnValueBuffer(columnValue, columnIndex); - valueLength += colLen; - plainTotalColumnLength[columnIndex] += colLen; - comprTotalColumnLength[columnIndex] += colLen; + colLen = columnValuePlainLength[columnIndex]; } + valueLength += colLen; + key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, plainLen, + columnIndex); + plainTotalColumnLength[columnIndex] += plainLen; + comprTotalColumnLength[columnIndex] += colLen; columnValuePlainLength[columnIndex] = 0; } @@ -968,25 +956,22 @@ public class RCFile { if (keyLength < 0) { throw new IOException("negative length keys not allowed: " + key); } + if (compressor != null) { + CodecPool.returnCompressor(compressor); + } - // Write the record out - checkAndWriteSync(); // sync - out.writeInt(keyLength + valueLength); // total record length - out.writeInt(keyLength); // key portion length - if (!isCompressed()) { - out.writeInt(keyLength); - key.write(out); // key + // Write the key out + writeKey(key, keyLength + valueLength, keyLength); + // write the value out + if (isCompressed) { + out.write(valueBuffer.getData(), 0, valueBuffer.getLength()); } else { - keyCompressionBuffer.reset(); - keyDeflateFilter.resetState(); - key.write(keyDeflateOut); - keyDeflateOut.flush(); - keyDeflateFilter.finish(); - int compressedKeyLen = keyCompressionBuffer.getLength(); - out.writeInt(compressedKeyLen); - out.write(keyCompressionBuffer.getData(), 0, compressedKeyLen); + for(int columnIndex=0; columnIndex < columnNumber; ++columnIndex) { + NonSyncDataOutputBuffer buf = + columnBuffers[columnIndex].columnValBuffer; + out.write(buf.getData(), 0, buf.getLength()); + } } - value.write(out); // value // clear the columnBuffers clearColumnBuffers(); @@ -999,27 +984,38 @@ public class RCFile { * flush a block out without doing anything except compressing the key part. */ public void flushBlock(KeyBuffer keyBuffer, ValueBuffer valueBuffer, - int recordLen, int keyLength, int compressedKeyLen) throws IOException { + int recordLen, int keyLength, + @SuppressWarnings("unused") int compressedKeyLen) throws IOException { + writeKey(keyBuffer, recordLen, keyLength); + valueBuffer.write(out); + } + + private void writeKey(KeyBuffer keyBuffer, int recordLen, + int keyLength) throws IOException { checkAndWriteSync(); // sync out.writeInt(recordLen); // total record length out.writeInt(keyLength); // key portion length if(this.isCompressed()) { + Compressor compressor = CodecPool.getCompressor(codec); + NonSyncDataOutputBuffer compressionBuffer = + new NonSyncDataOutputBuffer(); + CompressionOutputStream deflateFilter = + codec.createOutputStream(compressionBuffer, compressor); + DataOutputStream deflateOut = new DataOutputStream(deflateFilter); //compress key and write key out - keyCompressionBuffer.reset(); - keyDeflateFilter.resetState(); - keyBuffer.write(keyDeflateOut); - keyDeflateOut.flush(); - keyDeflateFilter.finish(); - compressedKeyLen = keyCompressionBuffer.getLength(); + compressionBuffer.reset(); + deflateFilter.resetState(); + keyBuffer.write(deflateOut); + deflateOut.flush(); + deflateFilter.finish(); + int compressedKeyLen = compressionBuffer.getLength(); out.writeInt(compressedKeyLen); - out.write(keyCompressionBuffer.getData(), 0, compressedKeyLen); + out.write(compressionBuffer.getData(), 0, compressedKeyLen); } else { - out.writeInt(compressedKeyLen); + out.writeInt(keyLength); keyBuffer.write(out); } - - valueBuffer.write(out); // value } private void clearColumnBuffers() throws IOException { @@ -1034,19 +1030,6 @@ public class RCFile { } clearColumnBuffers(); - if (isCompressed()) { - for (int i = 0; i < columnNumber; i++) { - deflateFilter[i].close(); - IOUtils.closeStream(deflateOut[i]); - } - keyDeflateFilter.close(); - IOUtils.closeStream(keyDeflateOut); - CodecPool.returnCompressor(keyCompressor); - keyCompressor = null; - CodecPool.returnCompressor(compressor); - compressor = null; - } - if (out != null) { // Close the underlying stream if we own it... @@ -1055,8 +1038,9 @@ public class RCFile { out = null; } for (int i = 0; i < columnNumber; i++) { - LOG.info("Column#" + i + " : Plain Total Column Value Length: " + plainTotalColumnLength[i] - + ", Compr Total Column Value Length: " + comprTotalColumnLength[i]); + LOG.info("Column#" + i + " : Plain Total Column Value Length: " + + plainTotalColumnLength[i] + + ", Compr Total Column Value Length: " + comprTotalColumnLength[i]); } } } @@ -1094,8 +1078,6 @@ public class RCFile { private final ValueBuffer currentValue; - private final boolean[] skippedColIDs = null; - private int readRowsIndexInBuffer = 0; private int recordsNumInValBuffer = 0; @@ -1189,7 +1171,7 @@ public class RCFile { } loadColumnNum = columnNumber; - if (skippedColIDs != null && skippedColIDs.length > 0) { + if (skippedColIDs.length > 0) { for (boolean skippedColID : skippedColIDs) { if (skippedColID) { loadColumnNum -= 1; @@ -1302,8 +1284,7 @@ public class RCFile { try { Class codecClass = conf.getClassByName( codecClassname).asSubclass(CompressionCodec.class); - codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, - conf); + codec = ReflectionUtils.newInstance(codecClass, conf); } catch (ClassNotFoundException cnfe) { throw new IllegalArgumentException( "Unknown codec: " + codecClassname, cnfe); @@ -1526,7 +1507,7 @@ public class RCFile { * {@link #next(LongWritable)} and * {@link #getCurrentRow(BytesRefArrayWritable)}. * - * @param columnID + * @param columnID the number of the column to get 0 to N-1 * @throws IOException */ public BytesRefArrayWritable getColumn(int columnID, @@ -1584,6 +1565,8 @@ public class RCFile { * @return whether there still has records or not * @throws IOException */ + @SuppressWarnings("unused") + @Deprecated public synchronized boolean nextColumnsBatch() throws IOException { passedRowsNum += (recordsNumInValBuffer - readRowsIndexInBuffer); return nextKeyBuffer() > 0; @@ -1620,15 +1603,12 @@ public class RCFile { eof.printStackTrace(); } } - if (ret > 0) { - return next(readRows); - } - return false; + return (ret > 0) && next(readRows); } private int nextKeyValueTolerateCorruptions() throws IOException { long currentOffset = in.getPos(); - int ret = -1; + int ret; try { ret = nextKeyBuffer(); this.currentValueBuffer(); @@ -1747,6 +1727,7 @@ public class RCFile { } /** Returns true iff the previous call to next passed a sync mark. */ + @SuppressWarnings("unused") public boolean syncSeen() { return syncSeen; } @@ -1762,6 +1743,7 @@ public class RCFile { return file.toString(); } + @SuppressWarnings("unused") public boolean isCompressedRCFile() { return this.decompress; }