Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (revision 1156839) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (working copy) @@ -339,6 +339,7 @@ keyBuffer.eachColumnUncompressedValueLen[colIndex]); loadedColumnsValueBuffer[index] = decompressedData; decompressedFlag[index] = true; + numCompressed--; return decompressedData.getData(); } } @@ -346,6 +347,7 @@ // used to load columns' value into memory private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null; private boolean[] decompressedFlag = null; + private int numCompressed; private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null; boolean inited = false; @@ -409,7 +411,11 @@ deflatFilter = codec.createInputStream(decompressBuffer, valDecompressor); } - + if (codec != null) { + numCompressed = decompressedFlag.length; + } else { + numCompressed = 0; + } for (int k = 0, readIndex = 0; k < columnNumber; k++) { if (skippedColIDs[k]) { continue; @@ -456,6 +462,9 @@ } addIndex++; } + if (codec != null) { + numCompressed = decompressedFlag.length; + } if (skipTotal != 0) { in.skipBytes(skipTotal); @@ -964,7 +973,12 @@ * */ public static class Reader { - + private static class SelectedColumn { + public int colIndex; + public int rowReadIndex; + public int runLength; + public int prvLength; + } private final Path file; private final FSDataInputStream in; @@ -999,17 +1013,22 @@ private int passedRowsNum = 0; - private int[] columnRowReadIndex = null; - private final NonSyncDataInputBuffer[] colValLenBufferReadIn; - private final int[] columnRunLength; - private final int[] columnPrvLength; + private boolean decompress = false; private Decompressor keyDecompressor; NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer(); - int[] prjColIDs = null; // selected column IDs + //Current state of each selected column - e.g. current run length, etc. + // The size of the array is equal to the number of selected columns + private final SelectedColumn[] selectedColumns; + // map of original column id -> index among selected columns + private final int[] revPrjColIDs; + + // column value lengths for each of the selected columns + private final NonSyncDataInputBuffer[] colValLenBufferReadIn; + /** Create a new RCFile reader. */ public Reader(FileSystem fs, Path file, Configuration conf) throws IOException { this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, 0, fs @@ -1053,7 +1072,7 @@ java.util.ArrayList notSkipIDs = ColumnProjectionUtils .getReadColumnIDs(conf); - skippedColIDs = new boolean[columnNumber]; + boolean[] skippedColIDs = new boolean[columnNumber]; if (notSkipIDs.size() > 0) { for (int i = 0; i < skippedColIDs.length; i++) { skippedColIDs[i] = true; @@ -1081,31 +1100,31 @@ } } + + revPrjColIDs = new int[columnNumber]; // get list of selected column IDs - prjColIDs = new int[loadColumnNum]; + selectedColumns = new SelectedColumn[loadColumnNum]; + colValLenBufferReadIn = new NonSyncDataInputBuffer[loadColumnNum]; for (int i = 0, j = 0; i < columnNumber; ++i) { if (!skippedColIDs[i]) { - prjColIDs[j++] = i; + SelectedColumn col = new SelectedColumn(); + col.colIndex = i; + col.runLength = 0; + col.prvLength = -1; + col.rowReadIndex = 0; + selectedColumns[j] = col; + colValLenBufferReadIn[j] = new NonSyncDataInputBuffer(); + revPrjColIDs[i] = j; + j++; + } else { + revPrjColIDs[i] = -1; } } - colValLenBufferReadIn = new NonSyncDataInputBuffer[columnNumber]; - columnRunLength = new int[columnNumber]; - columnPrvLength = new int[columnNumber]; - columnRowReadIndex = new int[columnNumber]; - for (int i = 0; i < columnNumber; i++) { - columnRowReadIndex[i] = 0; - if (!skippedColIDs[i]) { - colValLenBufferReadIn[i] = new NonSyncDataInputBuffer(); - } - columnRunLength[i] = 0; - columnPrvLength[i] = -1; - } - currentKey = createKeyBuffer(); currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec); } - + /** * Override this method to specialize the type of * {@link FSDataInputStream} returned. @@ -1334,13 +1353,14 @@ readRowsIndexInBuffer = 0; recordsNumInValBuffer = currentKey.numberRows; - for (int prjColID : prjColIDs) { - int i = prjColID; - colValLenBufferReadIn[i].reset(currentKey.allCellValLenBuffer[i] - .getData(), currentKey.allCellValLenBuffer[i].getLength()); - columnRowReadIndex[i] = 0; - columnRunLength[i] = 0; - columnPrvLength[i] = -1; + for (int selIx = 0; selIx < selectedColumns.length; selIx++) { + SelectedColumn col = selectedColumns[selIx]; + int colIx = col.colIndex; + NonSyncDataOutputBuffer buf = currentKey.allCellValLenBuffer[colIx]; + colValLenBufferReadIn[selIx].reset(buf.getData(), buf.getLength()); + col.rowReadIndex = 0; + col.runLength = 0; + col.prvLength = -1; } return currentKeyLength; @@ -1384,8 +1404,8 @@ */ public BytesRefArrayWritable getColumn(int columnID, BytesRefArrayWritable rest) throws IOException { - - if (skippedColIDs[columnID]) { + int selColIdx = revPrjColIDs[columnID]; + if (selColIdx == -1) { return null; } @@ -1402,16 +1422,26 @@ int columnNextRowStart = 0; fetchColumnTempBuf.reset(currentKey.allCellValLenBuffer[columnID] .getData(), currentKey.allCellValLenBuffer[columnID].getLength()); + SelectedColumn selCol = selectedColumns[selColIdx]; + byte[] uncompData = null; + ValueBuffer.LazyDecompressionCallbackImpl decompCallBack = null; + boolean decompressed = currentValue.decompressedFlag[selColIdx]; + if (decompressed) { + uncompData = + currentValue.loadedColumnsValueBuffer[selColIdx].getData(); + } else { + decompCallBack = currentValue.lazyDecompressCallbackObjs[selColIdx]; + } for (int i = 0; i < recordsNumInValBuffer; i++) { - int length = getColumnNextValueLength(columnID); + colAdvanceRow(selColIdx, selCol); + int length = selCol.prvLength; BytesRefWritable currentCell = rest.get(i); - if (currentValue.decompressedFlag[columnID]) { - currentCell.set(currentValue.loadedColumnsValueBuffer[columnID] - .getData(), columnNextRowStart, length); + + if (decompressed) { + currentCell.set(uncompData, columnNextRowStart, length); } else { - currentCell.set(currentValue.lazyDecompressCallbackObjs[columnID], - columnNextRowStart, length); + currentCell.set(decompCallBack, columnNextRowStart, length); } columnNextRowStart = columnNextRowStart + length; } @@ -1490,44 +1520,62 @@ // we do not use BytesWritable here to avoid the byte-copy from // DataOutputStream to BytesWritable - - for (int j = 0; j < prjColIDs.length; ++j) { - int i = prjColIDs[j]; - - BytesRefWritable ref = ret.unCheckedGet(i); - - int columnCurrentRowStart = columnRowReadIndex[i]; - int length = getColumnNextValueLength(i); - columnRowReadIndex[i] = columnCurrentRowStart + length; - - if (currentValue.decompressedFlag[j]) { + if (currentValue.numCompressed > 0) { + for (int j = 0; j < selectedColumns.length; ++j) { + SelectedColumn col = selectedColumns[j]; + int i = col.colIndex; + + BytesRefWritable ref = ret.unCheckedGet(i); + + colAdvanceRow(j, col); + + if (currentValue.decompressedFlag[j]) { + ref.set(currentValue.loadedColumnsValueBuffer[j].getData(), + col.rowReadIndex, col.prvLength); + } else { + ref.set(currentValue.lazyDecompressCallbackObjs[j], + col.rowReadIndex, col.prvLength); + } + col.rowReadIndex += col.prvLength; + } + } else { + // This version of the loop eliminates a condition check and branch + // and is measurably faster (20% or so) + for (int j = 0; j < selectedColumns.length; ++j) { + SelectedColumn col = selectedColumns[j]; + int i = col.colIndex; + + BytesRefWritable ref = ret.unCheckedGet(i); + + colAdvanceRow(j, col); ref.set(currentValue.loadedColumnsValueBuffer[j].getData(), - columnCurrentRowStart, length); - } else { - ref.set(currentValue.lazyDecompressCallbackObjs[j], - columnCurrentRowStart, length); + col.rowReadIndex, col.prvLength); + col.rowReadIndex += col.prvLength; } } rowFetched = true; } - private int getColumnNextValueLength(int i) throws IOException { - if (columnRunLength[i] > 0) { - --columnRunLength[i]; - return columnPrvLength[i]; + /** + * Advance column state to the next now: update offsets, run lengths etc + * @param selCol - index among selectedColumns + * @param col - column object to update the state of. prvLength will be + * set to the new read position + * @throws IOException + */ + private void colAdvanceRow(int selCol, SelectedColumn col) throws IOException { + if (col.runLength > 0) { + --col.runLength; } else { - int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[i]); + int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[selCol]); if (length < 0) { // we reach a runlength here, use the previous length and reset // runlength - columnRunLength[i] = ~length; - columnRunLength[i]--; - length = columnPrvLength[i]; + col.runLength = (~length) - 1; } else { - columnPrvLength[i] = length; - columnRunLength[i] = 0; + col.prvLength = length; + col.runLength = 0; } - return length; } }