diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java index 6654166..91123a8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java @@ -22,6 +22,8 @@ import org.apache.hadoop.io.Writable; +import com.google.common.base.Preconditions; + /** * ColumnVector contains the shared structure for the sub-types, * including NULL information, and whether this vector @@ -78,6 +80,28 @@ public ColumnVector(int len) { isRepeating = false; } + public boolean validateIsNull(int colNum) { + + // Only call this method when noNulls is true; + Preconditions.checkState(noNulls); + + for (int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { + Preconditions.checkState(!isNull[i], "column " + colNum + " noNulls is true and isNull[" + i + "] array entry is true"); + } + return true; + } + + public boolean validateIsNull() { + + // Only call this method when noNulls is true; + Preconditions.checkState(noNulls); + + for (int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { + Preconditions.checkState(!isNull[i], "noNulls is true and isNull[" + i + "] array entry is true"); + } + return true; + } + /** * Resets the column to default state * - fills the isNull array with false @@ -85,8 +109,10 @@ public ColumnVector(int len) { * - sets isRepeating to false */ public void reset() { - if (false == noNulls) { + if (!noNulls) { Arrays.fill(isNull, false); + } else { + Preconditions.checkState(validateIsNull()); } noNulls = true; isRepeating = false; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java index d1b8939..af3d85e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import com.google.common.base.Preconditions; + /** * Filter operator implementation. **/ @@ -89,6 +91,8 @@ public void process(Object row, int tag) throws HiveException { VectorizedRowBatch vrg = (VectorizedRowBatch) row; + Preconditions.checkState(vrg.validateIsNull()); + //The selected vector represents selected rows. //Clone the selected vector System.arraycopy(vrg.selected, 0, temporarySelected, 0, vrg.size); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 39a83e3..ed9d57d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -51,6 +51,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.DataOutputBuffer; +import com.google.common.base.Preconditions; + /** * Vectorized GROUP BY operator implementation. Consumes the vectorized input and * stores the aggregate operators' intermediate states. Emits row mode output. @@ -876,9 +878,14 @@ public void endGroup() throws HiveException { public void process(Object row, int tag) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) row; + Preconditions.checkState(batch.validateIsNull()); + if (batch.size > 0) { processingMode.processBatch(batch); } + + Preconditions.checkState(batch.validateIsNull()); + } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index 212aa99..5cf4a17 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import com.google.common.base.Preconditions; + /** * Select operator implementation. */ @@ -119,6 +121,9 @@ public void process(Object row, int tag) throws HiveException { } VectorizedRowBatch vrg = (VectorizedRowBatch) row; + + Preconditions.checkState(vrg.validateIsNull()); + for (int i = 0; i < vExpressions.length; i++) { try { vExpressions[i].evaluate(vrg); @@ -128,6 +133,8 @@ public void process(Object row, int tag) throws HiveException { } } + Preconditions.checkState(vrg.validateIsNull()); + // Prepare output, set the projections VectorExpressionWriter [] originalValueWriters = vrg.valueWriters; vrg.setValueWriters(valueWriters); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 3d7e4f0..b66743b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -659,11 +659,12 @@ public static String displayBytes(byte[] bytes, int start, int length) { return sb.toString(); } - public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) { + public static String debugOneRowString(VectorizedRowBatch batch, int index, + int[] projectedColumns, int projectionSize) { StringBuilder sb = new StringBuilder(); - sb.append(prefix + " row " + index + " "); - for (int p = 0; p < batch.projectionSize; p++) { - int column = batch.projectedColumns[p]; + sb.append("row " + index + " "); + for (int p = 0; p < projectionSize; p++) { + int column = projectedColumns[p]; if (p == column) { sb.append("(col " + p + ") "); } else { @@ -704,7 +705,28 @@ public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, Strin } sb.append(" "); } - LOG.info(sb.toString()); + return sb.toString(); + } + + public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, + int[] projectedColumns, int projectionSize, String prefix) { + LOG.info(prefix + " " + debugOneRowString(batch, index, projectedColumns, projectionSize)); + } + + public static void debugDisplayBatch(VectorizedRowBatch batch, + int[] projectedColumns, int projectionSize, String prefix) { + for (int i = 0; i < batch.size; i++) { + int index = (batch.selectedInUse ? batch.selected[i] : i); + debugDisplayOneRow(batch, index, projectedColumns, projectionSize, prefix); + } + } + + public static String debugOneRowString(VectorizedRowBatch batch, int index) { + return debugOneRowString(batch, index, batch.projectedColumns, batch.projectionSize); + } + + public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) { + LOG.info(prefix + " " + debugOneRowString(batch, index)); } public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) { @@ -713,4 +735,186 @@ public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) { debugDisplayOneRow(batch, index, prefix); } } + + public static String bareOneRowString(VectorizedRowBatch batch, int index, + int[] projectedColumns, int projectionSize) { + StringBuilder sb = new StringBuilder(); + for (int p = 0; p < projectionSize; p++) { + int column = projectedColumns[p]; + ColumnVector colVector = batch.cols[column]; + if (colVector == null) { + // sb.append("(null ColumnVector)"); + } else { + boolean isRepeating = colVector.isRepeating; + if (isRepeating) { + // sb.append("(repeating)"); + } + index = (isRepeating ? 0 : index); + if (colVector.noNulls || !colVector.isNull[index]) { + if (colVector instanceof LongColumnVector) { + sb.append(((LongColumnVector) colVector).vector[index]); + } else if (colVector instanceof DoubleColumnVector) { + sb.append(((DoubleColumnVector) colVector).vector[index]); + } else if (colVector instanceof BytesColumnVector) { + BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector; + byte[] bytes = bytesColumnVector.vector[index]; + int start = bytesColumnVector.start[index]; + int length = bytesColumnVector.length[index]; + if (bytes == null) { + sb.append("(Unexpected null bytes with start " + start + " length " + length + ")"); + } else { + sb.append("bytes: '" + displayBytes(bytes, start, length) + "'"); + } + } else if (colVector instanceof DecimalColumnVector) { + sb.append(((DecimalColumnVector) colVector).vector[index].toString()); + } else { + sb.append("Unknown"); + } + } else { + sb.append("NULL"); + } + } + sb.append(" "); + } + return sb.toString(); + } + + public static String bareOneRowString(VectorizedRowBatch batch, int index) { + return bareOneRowString(batch, index, batch.projectedColumns, batch.projectionSize); + } + + public static void bareDisplayOneRow(VectorizedRowBatch batch, int index, + int[] projectedColumns, int projectionSize, String prefix) { + LOG.info(prefix + " " + bareOneRowString(batch, index, projectedColumns, projectionSize)); + } + + public static void bareDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) { + LOG.info(prefix + " " + bareOneRowString(batch, index)); + } + + public static void bareDisplayBatch(VectorizedRowBatch batch, + int[] projectedColumns, int projectionSize, String prefix) { + for (int i = 0; i < batch.size; i++) { + int index = (batch.selectedInUse ? batch.selected[i] : i); + bareDisplayOneRow(batch, index, projectedColumns, projectionSize, prefix); + } + } + + public static void bareDisplayBatch(VectorizedRowBatch batch, String prefix) { + for (int i = 0; i < batch.size; i++) { + int index = (batch.selectedInUse ? batch.selected[i] : i); + bareDisplayOneRow(batch, index, prefix); + } + } + + public static class DebugBatchNullSummarizer { + + private ColumnVector.Type[] columnVectorTypes; + + private long[] nullCounters; + private long[] nonNullCounters; + private long[] repeatedColumnCounters; + + private int[] projectedColumns; + private int projectionSize; + + private long batchCount; + private long rowCount; + + public DebugBatchNullSummarizer() { + columnVectorTypes = null; + nullCounters = null; + nonNullCounters = null; + repeatedColumnCounters = null; + + projectedColumns = null; + projectionSize = 0; + batchCount = 0; + rowCount = 0; + } + + public void analyze(VectorizedRowBatch batch, String prefix) { + if (nullCounters == null) { + columnVectorTypes = new ColumnVector.Type[batch.projectionSize]; + for (int i = 0; i < batch.projectionSize; i++) { + int p = batch.projectedColumns[i]; + ColumnVector colVector = batch.cols[p]; + ColumnVector.Type type; + if (colVector == null) { + type = null; + } else if (colVector instanceof LongColumnVector) { + type = ColumnVector.Type.LONG; + } else if (colVector instanceof DoubleColumnVector) { + type = ColumnVector.Type.DOUBLE; + } else if (colVector instanceof BytesColumnVector) { + type = ColumnVector.Type.BYTES; + } else if (colVector instanceof DecimalColumnVector) { + type = ColumnVector.Type.DECIMAL; + } else { + throw new RuntimeException("Unknown column vector type " + colVector.getClass().getName()); + } + columnVectorTypes[i] = type; + } + nullCounters = new long[batch.projectionSize]; + nonNullCounters = new long[batch.projectionSize]; + repeatedColumnCounters = new long[batch.projectionSize]; + projectedColumns = Arrays.copyOf(batch.projectedColumns, batch.projectionSize); + projectionSize = batch.projectionSize; + } + + boolean selectedInUse = batch.selectedInUse; + int[] selected = batch.selected; + for (int i = 0; i < batch.projectionSize; i++) { + int p = batch.projectedColumns[i]; + ColumnVector colVector = batch.cols[p]; + if (colVector == null) { + continue; + } + if (colVector.isRepeating) { + repeatedColumnCounters[i]++; + if (colVector.noNulls || !colVector.isNull[0]) { + nonNullCounters[i] += batch.size; + } else { + nullCounters[i] += batch.size; + } + } else if (colVector.noNulls) { + nonNullCounters[i] += batch.size; + } else { + for (int r = 0; r < batch.size; r++) { + int adjustedIndex = (selectedInUse ? selected[r] : r); + if (colVector.noNulls || !colVector.isNull[adjustedIndex]) { + nonNullCounters[i] ++; + } else { + nullCounters[i] ++; + } + } + } + } + batchCount++; + rowCount += batch.size; + } + + public void displaySummary(String prefix) { + StringBuilder sb = new StringBuilder(); + sb.append(prefix + " "); + sb.append("batch count "); + sb.append(batchCount); + sb.append(", row count "); + sb.append(rowCount); + for (int i = 0; i < projectionSize; i++) { + sb.append(", "); + sb.append(columnVectorTypes[i] == null ? "NULL" : columnVectorTypes[i].name()); + sb.append(" "); + sb.append(nonNullCounters[i]); + sb.append(":"); + sb.append(nullCounters[i]); + if (repeatedColumnCounters[i] != 0) { + sb.append("(repeated "); + sb.append(repeatedColumnCounters[i]); + sb.append(")"); + } + } + LOG.info(sb.toString()); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index 7e41384..07ae873 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -96,6 +96,16 @@ public VectorizedRowBatch(int numCols, int size) { } } + public boolean validateIsNull() { + for (int i = 0; i < numCols; i++) { + ColumnVector colVector = cols[i]; + if (colVector != null && colVector.noNulls) { + colVector.validateIsNull(i); + } + } + return true; + } + /** * Returns the maximum size of the batch (number of rows it can hold) */ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java index 8d75cf3..9487090 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java @@ -142,6 +142,7 @@ private void evaluateDecimal(VectorizedRowBatch vrg) { @Override public void evaluate(VectorizedRowBatch vrg) { + vrg.cols[outputColumn].reset(); switch (type) { case LONG: evaluateLong(vrg); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 0adbea1..fa42038 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -49,6 +49,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.ByteStream.Output; +import com.google.common.base.Preconditions; + /** * This class has methods for generating vectorized join results and forwarding batchs. * @@ -100,6 +102,40 @@ protected void commonSetup(VectorizedRowBatch batch) throws HiveException { //------------------------------------------------------------------------------------------------ + protected boolean verifyOutputColumnReset(ColumnVector colVector, String title, int column) { + if (colVector.isRepeating) { + Preconditions.checkState(false, title + " " + column + " isRepeating is true"); + } + if (!colVector.noNulls) { + Preconditions.checkState(false, title + " " + column + " noNulls is false"); + } + boolean[] isNull = colVector.isNull; + for (int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { + if (isNull[i]) { + Preconditions.checkState(false, title + " " + column + " isNull[" + i + "] is true"); + } + } + return true; + } + + protected boolean verifyOutputColumnsReset(VectorizedRowBatch batch) { + + // For join operators that can generate small table results, verify their + // (target) scratch columns. + + for (int column : smallTableOutputVectorColumns) { + Preconditions.checkState(verifyOutputColumnReset(batch.cols[column], "small table column", column)); + } + + for (int column : bigTableOuterKeyOutputVectorColumns) { + Preconditions.checkState(verifyOutputColumnReset(batch.cols[column], "big table outer join key", column)); + } + + return true; + } + + //------------------------------------------------------------------------------------------------ + protected void performValueExpressions(VectorizedRowBatch batch, int[] allMatchs, int allMatchCount) { /* @@ -576,6 +612,9 @@ protected void reProcessBigTable(int partitionId) */ public void forwardBigTableBatch(VectorizedRowBatch batch) throws HiveException { + // Make sure MapJoin didn't corrupt the batch. + Preconditions.checkState(batch.validateIsNull()); + // Save original projection. int[] originalProjections = batch.projectedColumns; int originalProjectionSize = batch.projectionSize; @@ -589,6 +628,9 @@ public void forwardBigTableBatch(VectorizedRowBatch batch) throws HiveException // Revert the projected columns back, because batch can be re-used by our parent operators. batch.projectionSize = originalProjectionSize; batch.projectedColumns = originalProjections; + + // Make sure some downstream operator didn't corrupt the batch. + Preconditions.checkState(batch.validateIsNull()); } @@ -596,7 +638,15 @@ public void forwardBigTableBatch(VectorizedRowBatch batch) throws HiveException * Forward the overflow batch and reset the batch. */ protected void forwardOverflow() throws HiveException { + + // Make sure MapJoin didn't corrupt the batch. + Preconditions.checkState(overflowBatch.validateIsNull()); + forward(overflowBatch, null); + + // Make sure some downstream operator didn't corrupt the batch. + Preconditions.checkState(overflowBatch.validateIsNull()); + overflowBatch.reset(); } @@ -604,7 +654,14 @@ protected void forwardOverflow() throws HiveException { * Forward the overflow batch, but do not reset the batch. */ private void forwardOverflowNoReset() throws HiveException { + + // Make sure MapJoin didn't corrupt the batch. + Preconditions.checkState(overflowBatch.validateIsNull()); + forward(overflowBatch, null); + + // Make sure some downstream operator didn't corrupt the batch. + Preconditions.checkState(overflowBatch.validateIsNull()); } /* diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java index f18b982..3d2fc3b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import com.google.common.base.Preconditions; + /** * This class has methods for generating vectorized join results for the big table only * variation of inner joins. @@ -147,6 +149,8 @@ protected void finishInnerBigOnly(VectorizedRowBatch batch, VectorMapJoinHashTableResult[] hashTableResults, int hashMapResultCount) throws HiveException, IOException { + Preconditions.checkState(batch.validateIsNull()); + // Get rid of spills before we start modifying the batch. if (spillCount > 0) { spillHashMapBatch(batch, hashTableResults, @@ -158,6 +162,9 @@ protected void finishInnerBigOnly(VectorizedRowBatch batch, */ if (allMatchCount > 0 && bigTableValueExpressions != null) { performValueExpressions(batch, allMatchs, allMatchCount); + + Preconditions.checkState(batch.validateIsNull()); + } int numSel = 0; @@ -177,6 +184,9 @@ protected void finishInnerBigOnly(VectorizedRowBatch batch, } batch.size = numSel; batch.selectedInUse = true; + + Preconditions.checkState(batch.validateIsNull()); + } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java index c42cfd9..d6f1da5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java @@ -37,6 +37,8 @@ // Single-Column Long specific imports. import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an inner join on a Single-Column Long * and only big table columns appear in the join result so a hash multi-set is used. @@ -127,6 +129,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an inner big-only join. + Preconditions.checkState(batch.validateIsNull()); + // (Currently none) // innerBigOnlyPerBatchSetup(batch); @@ -200,6 +204,9 @@ public void process(Object row, int tag) throws HiveException { LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); } finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]); + + Preconditions.checkState(batch.validateIsNull()); + } else { /* diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java index 047af78..d1243b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an inner join on Multi-Key * and only big table columns appear in the join result so a hash multi-set is used. @@ -132,6 +134,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an inner big-only join. + Preconditions.checkState(batch.validateIsNull()); + // (Currently none) // innerBigOnlyPerBatchSetup(batch); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java index c8852b0..fa5b340 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an inner join on a Single-Column String * and only big table columns appear in the join result so a hash multi-set is used. @@ -118,6 +120,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an inner big-only join. + Preconditions.checkState(batch.validateIsNull()); + // (Currently none) // innerBigOnlyPerBatchSetup(batch); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java index 2f08cd2..165e473 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java @@ -36,6 +36,8 @@ // Single-Column Long specific imports. import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an inner join on a Single-Column Long * using a hash map. @@ -126,6 +128,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an inner join. + Preconditions.checkState(batch.validateIsNull()); + innerPerBatchSetup(batch); // For inner joins, we may apply the filter(s) now. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java index 2ce7a6f..b398dfa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an inner join on a Multi-Key * using a hash map. @@ -130,6 +132,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an inner join. + Preconditions.checkState(batch.validateIsNull()); + innerPerBatchSetup(batch); // For inner joins, we may apply the filter(s) now. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java index b2963d3..77d9f30 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java @@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an inner join on a Single-Column String * using a hash map. @@ -117,6 +119,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an inner join. + Preconditions.checkState(batch.validateIsNull()); + innerPerBatchSetup(batch); // For inner joins, we may apply the filter(s) now. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java index 529b3db..f14a1d9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java @@ -37,6 +37,8 @@ // Single-Column Long specific imports. import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an left semi join on a Single-Column Long * using a hash set. @@ -127,6 +129,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an left semi join. + Preconditions.checkState(batch.validateIsNull()); + // (Currently none) // leftSemiPerBatchSetup(batch); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java index 8ce91aa..ad1389a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an left semi join on Multi-Key * using hash set. @@ -131,6 +133,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an left semi join. + Preconditions.checkState(batch.validateIsNull()); + // (Currently none) // leftSemiPerBatchSetup(batch); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java index 48f20b7..e618eb9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an left semi join on a Single-Column String * using a hash set. @@ -118,6 +120,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an left semi join. + Preconditions.checkState(batch.validateIsNull()); + // (Currently none) // leftSemiPerBatchSetup(batch); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java index 57814fd..8d24341 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin; import java.io.IOException; +import java.util.Arrays; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -33,6 +35,8 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.serde2.WriteBuffers.ByteSegmentRef; +import com.google.common.base.Preconditions; + /** * This class has methods for generating vectorized join results for outer joins. * @@ -263,9 +267,11 @@ private void doValueExpr(VectorizedRowBatch batch, private int subtractFromInputSelected(boolean inputSelectedInUse, int inputLogicalSize, int[] remove, int removeSize, int[] difference) throws HiveException { - // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { - // throw new HiveException("remove is not in sort order and unique"); - // } + Preconditions.checkState((inputSelected != remove) && (remove != difference) && (difference != inputSelected)); + + Preconditions.checkState(verifyMonotonicallyIncreasing(inputSelected, inputLogicalSize)); + + Preconditions.checkState(verifyMonotonicallyIncreasing(remove, removeSize)); int differenceCount = 0; @@ -294,9 +300,7 @@ private int subtractFromInputSelected(boolean inputSelectedInUse, int inputLogic throw new HiveException("Not all batch indices removed"); } - // if (!verifyMonotonicallyIncreasing(difference, differenceCount)) { - // throw new HiveException("difference is not in sort order and unique"); - // } + Preconditions.checkState(verifyMonotonicallyIncreasing(difference, differenceCount)); return differenceCount; } @@ -323,9 +327,12 @@ private int subtractFromInputSelected(boolean inputSelectedInUse, int inputLogic private int subtract(int[] all, int allSize, int[] remove, int removeSize, int[] difference) throws HiveException { - // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { - // throw new HiveException("remove is not in sort order and unique"); - // } + + Preconditions.checkState((all != remove) && (remove != difference) && (difference != all)); + + Preconditions.checkState(verifyMonotonicallyIncreasing(all, allSize)); + + Preconditions.checkState(verifyMonotonicallyIncreasing(remove, removeSize)); int differenceCount = 0; @@ -344,6 +351,8 @@ private int subtract(int[] all, int allSize, throw new HiveException("Not all batch indices removed"); } + Preconditions.checkState(verifyMonotonicallyIncreasing(difference, differenceCount)); + return differenceCount; } @@ -363,14 +372,12 @@ private int subtract(int[] all, int allSize, private int sortMerge(int[] selected1, int selected1Count, int[] selected2, int selected2Count, int[] sortMerged) throws HiveException { - // if (!verifyMonotonicallyIncreasing(selected1, selected1Count)) { - // throw new HiveException("selected1 is not in sort order and unique"); - // } - // if (!verifyMonotonicallyIncreasing(selected2, selected2Count)) { - // throw new HiveException("selected1 is not in sort order and unique"); - // } + Preconditions.checkState((selected1 != selected2) && (selected2 != sortMerged) && (sortMerged != selected1)); + Preconditions.checkState(verifyMonotonicallyIncreasing(selected1, selected1Count)); + + Preconditions.checkState(verifyMonotonicallyIncreasing(selected2, selected2Count)); int sortMergeCount = 0; @@ -390,13 +397,160 @@ private int sortMerge(int[] selected1, int selected1Count, } } - // if (!verifyMonotonicallyIncreasing(sortMerged, sortMergeCount)) { - // throw new HiveException("sortMerged is not in sort order and unique"); - // } + Preconditions.checkState(verifyMonotonicallyIncreasing(sortMerged, sortMergeCount)); return sortMergeCount; } + private boolean validateCoverage(boolean inputSelectedInUse, int inputLogicalSize, int spillCount, + int allMatchCount, int noMatchCount) { + + Preconditions.checkState(inputLogicalSize >= 0); + Preconditions.checkState(inputLogicalSize <= VectorizedRowBatch.DEFAULT_SIZE); + if (inputSelectedInUse) { + Preconditions.checkState(verifyMonotonicallyIncreasing(inputSelected, inputLogicalSize)); + } + Preconditions.checkState(spillCount >= 0); + Preconditions.checkState(verifyMonotonicallyIncreasing(spills, spillCount)); + + Preconditions.checkState(allMatchCount >= 0); + Preconditions.checkState(verifyMonotonicallyIncreasing(allMatchs, allMatchCount)); + + Preconditions.checkState(noMatchCount >= 0); + Preconditions.checkState(verifyMonotonicallyIncreasing(noMatchs, noMatchCount)); + + int allCount = spillCount + allMatchCount + noMatchCount; + + Preconditions.checkState(inputLogicalSize == allCount); + + boolean[] allValidate = new boolean[VectorizedRowBatch.DEFAULT_SIZE]; + + int batchIndex; + + for (int i = 0; i < spillCount; i++) { + batchIndex = spills[i]; + Preconditions.checkState(!allValidate[batchIndex]); + allValidate[batchIndex] = true; + } + + for (int i = 0; i < allMatchCount; i++) { + batchIndex = allMatchs[i]; + Preconditions.checkState(!allValidate[batchIndex]); + allValidate[batchIndex] = true; + } + + for (int i = 0; i < noMatchCount; i++) { + batchIndex = noMatchs[i]; + Preconditions.checkState(!allValidate[batchIndex]); + allValidate[batchIndex] = true; + } + + if (inputSelectedInUse) { + for (int i = 0; i < inputLogicalSize; i++) { + batchIndex = inputSelected[i]; + Preconditions.checkState(allValidate[batchIndex]); + } + } else { + for (int i = 0; i < inputLogicalSize; i++) { + Preconditions.checkState(allValidate[i]); + } + } + + return true; + } + + private boolean validateOuterResult(VectorizedRowBatch batch, + boolean inputSelectedInUse, int inputLogicalSize, int spillCount, + int allMatchCount, int equalKeySeriesCount, int noMatchCount) { + + // Assume validateCoverage called. + + boolean[] expectedRows = new boolean[VectorizedRowBatch.DEFAULT_SIZE]; + + int numSel = 0; + for (int i = 0; i < equalKeySeriesCount; i++) { + int allMatchesIndex = equalKeySeriesAllMatchIndices[i]; + boolean isSingleValue = equalKeySeriesIsSingleValue[i]; + int duplicateCount = equalKeySeriesDuplicateCounts[i]; + + if (isSingleValue) { + for (int s = 0; s < duplicateCount; s++) { + int batchIndex = allMatchs[allMatchesIndex + s]; + Preconditions.checkState(!expectedRows[batchIndex]); + expectedRows[batchIndex] = true; + numSel++; + } + } + } + + for (int i = 0; i < noMatchCount; i++) { + int batchIndex = noMatchs[i]; + Preconditions.checkState(!expectedRows[batchIndex]); + expectedRows[batchIndex] = true; + numSel++; + } + + Preconditions.checkState(batch.size == numSel); + if (numSel > 0) { + Preconditions.checkState(batch.selectedInUse); + Preconditions.checkState(verifyMonotonicallyIncreasing(batch.selected, numSel)); + for (int i = 0; i < numSel; i++) { + int batchIndex = batch.selected[i]; + Preconditions.checkState(expectedRows[batchIndex]); + } + } + + for (int i = 0; i < noMatchCount; i++) { + int batchIndex = noMatchs[i]; + + for (int column : bigTableOuterKeyOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex); + Preconditions.checkState(!colVector.noNulls); + Preconditions.checkState(colVector.isNull[adjustedIndex]); + } + + // Small table values are set to null. + for (int column : smallTableOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex); + Preconditions.checkState(!colVector.noNulls); + Preconditions.checkState(colVector.isNull[adjustedIndex]); + } + } + + for (int i = 0; i < equalKeySeriesCount; i++) { + int allMatchesIndex = equalKeySeriesAllMatchIndices[i]; + boolean isSingleValue = equalKeySeriesIsSingleValue[i]; + int duplicateCount = equalKeySeriesDuplicateCounts[i]; + + if (isSingleValue) { + for (int s = 0; s < duplicateCount; s++) { + int batchIndex = allMatchs[allMatchesIndex + s]; + + for (int column : bigTableOuterKeyOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex); + Preconditions.checkState( + (colVector.noNulls && !colVector.isNull[adjustedIndex]) || + !colVector.noNulls); + } + + // Small table values are set to null. + for (int column : smallTableOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex); + Preconditions.checkState( + (colVector.noNulls && !colVector.isNull[adjustedIndex]) || + !colVector.noNulls); + } + } + } + } + + return true; + } + /** * Generate the outer join output results for one vectorized row batch. * @@ -474,6 +628,10 @@ public void finishOuter(VectorizedRowBatch batch, } } + Preconditions.checkState( + validateCoverage(inputSelectedInUse, inputLogicalSize, spillCount, + allMatchCount, noMatchCount)); + // When we generate results into the overflow batch, we may still end up with fewer rows // in the big table batch. So, nulSel and the batch's selected array will be rebuilt with // just the big table rows that need to be forwarded, minus any rows processed with the @@ -543,6 +701,11 @@ public void finishOuter(VectorizedRowBatch batch, } } } + + Preconditions.checkState( + validateOuterResult(batch, + inputSelectedInUse, inputLogicalSize, spillCount, + allMatchCount, equalKeySeriesCount, noMatchCount)); } /** @@ -570,6 +733,8 @@ protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs, // key as null, too. for (int column : bigTableOuterKeyOutputVectorColumns) { ColumnVector colVector = batch.cols[column]; + Preconditions.checkState(!colVector.isRepeating); + Preconditions.checkState(!colVector.isNull[batchIndex]); colVector.noNulls = false; colVector.isNull[batchIndex] = true; } @@ -577,6 +742,8 @@ protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs, // Small table values are set to null. for (int column : smallTableOutputVectorColumns) { ColumnVector colVector = batch.cols[column]; + Preconditions.checkState(!colVector.isRepeating); + Preconditions.checkState(!colVector.isNull[batchIndex]); colVector.noNulls = false; colVector.isNull[batchIndex] = true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java index f971727..c2087df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java @@ -38,6 +38,8 @@ // Single-Column Long specific imports. import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an outer join on a Single-Column Long * using a hash map. @@ -136,6 +138,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an outer join. + Preconditions.checkState(batch.validateIsNull()); + outerPerBatchSetup(batch); // For outer join, remember our input rows before ON expression filtering or before diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java index 6b2a9fc..430e593 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an outer join on Multi-Key * using a hash map. @@ -139,6 +141,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an outer join. + Preconditions.checkState(batch.validateIsNull()); + outerPerBatchSetup(batch); // For outer join, remember our input rows before ON expression filtering or before diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java index dfdd6d7..d04ae82 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java @@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an outer join on a Single-Column String * using a hash map. @@ -126,6 +128,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an outer join. + Preconditions.checkState(batch.validateIsNull()); + outerPerBatchSetup(batch); // For outer join, remember our input rows before ON expression filtering or before diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index ecd9b14..d171328 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -168,23 +168,16 @@ Object next(Object previous) throws IOException { */ public Object nextVector(Object previousVector, long batchSize) throws IOException { ColumnVector result = (ColumnVector) previousVector; + result.reset(); if (present != null) { // Set noNulls and isNull vector of the ColumnVector based on // present stream - result.noNulls = true; for (int i = 0; i < batchSize; i++) { result.isNull[i] = (present.next() != 1); if (result.noNulls && result.isNull[i]) { result.noNulls = false; } } - } else { - // There is not present stream, this means that all the values are - // present. - result.noNulls = true; - for (int i = 0; i < batchSize; i++) { - result.isNull[i] = false; - } } return previousVector; }