diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java index 58f1190..54596ca 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import com.google.common.base.Preconditions; + /** * Filter operator implementation. **/ @@ -90,6 +92,8 @@ public void process(Object row, int tag) throws HiveException { VectorizedRowBatch vrg = (VectorizedRowBatch) row; + Preconditions.checkState(vrg.validateIsNull()); + //The selected vector represents selected rows. //Clone the selected vector System.arraycopy(vrg.selected, 0, temporarySelected, 0, vrg.size); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 0524c08..7e5cb2d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -51,6 +51,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.DataOutputBuffer; +import com.google.common.base.Preconditions; + /** * Vectorized GROUP BY operator implementation. Consumes the vectorized input and * stores the aggregate operators' intermediate states. Emits row mode output. @@ -877,9 +879,14 @@ public void endGroup() throws HiveException { public void process(Object row, int tag) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) row; + Preconditions.checkState(batch.validateIsNull()); + if (batch.size > 0) { processingMode.processBatch(batch); } + + Preconditions.checkState(batch.validateIsNull()); + } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index 73b905f..97635a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import com.google.common.base.Preconditions; + /** * Select operator implementation. */ @@ -118,6 +120,9 @@ public void process(Object row, int tag) throws HiveException { } VectorizedRowBatch vrg = (VectorizedRowBatch) row; + + Preconditions.checkState(vrg.validateIsNull()); + for (int i = 0; i < vExpressions.length; i++) { try { vExpressions[i].evaluate(vrg); @@ -127,6 +132,8 @@ public void process(Object row, int tag) throws HiveException { } } + Preconditions.checkState(vrg.validateIsNull()); + // Prepare output, set the projections int[] originalProjections = vrg.projectedColumns; int originalProjectionSize = vrg.projectionSize; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 9b90f37..e3ac3f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -646,11 +646,12 @@ public static String displayBytes(byte[] bytes, int start, int length) { return sb.toString(); } - public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) { + public static String debugOneRowString(VectorizedRowBatch batch, int index, + int[] projectedColumns, int projectionSize) { StringBuilder sb = new StringBuilder(); - sb.append(prefix + " row " + index + " "); - for (int p = 0; p < batch.projectionSize; p++) { - int column = batch.projectedColumns[p]; + sb.append("row " + index + " "); + for (int p = 0; p < projectionSize; p++) { + int column = projectedColumns[p]; if (p == column) { sb.append("(col " + p + ") "); } else { @@ -691,7 +692,28 @@ public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, Strin } sb.append(" "); } - LOG.info(sb.toString()); + return sb.toString(); + } + + public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, + int[] projectedColumns, int projectionSize, String prefix) { + LOG.info(prefix + " " + debugOneRowString(batch, index, projectedColumns, projectionSize)); + } + + public static void debugDisplayBatch(VectorizedRowBatch batch, + int[] projectedColumns, int projectionSize, String prefix) { + for (int i = 0; i < batch.size; i++) { + int index = (batch.selectedInUse ? batch.selected[i] : i); + debugDisplayOneRow(batch, index, projectedColumns, projectionSize, prefix); + } + } + + public static String debugOneRowString(VectorizedRowBatch batch, int index) { + return debugOneRowString(batch, index, batch.projectedColumns, batch.projectionSize); + } + + public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) { + LOG.info(prefix + " " + debugOneRowString(batch, index)); } public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) { @@ -700,4 +722,186 @@ public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) { debugDisplayOneRow(batch, index, prefix); } } + + public static String bareOneRowString(VectorizedRowBatch batch, int index, + int[] projectedColumns, int projectionSize) { + StringBuilder sb = new StringBuilder(); + for (int p = 0; p < projectionSize; p++) { + int column = projectedColumns[p]; + ColumnVector colVector = batch.cols[column]; + if (colVector == null) { + // sb.append("(null ColumnVector)"); + } else { + boolean isRepeating = colVector.isRepeating; + if (isRepeating) { + // sb.append("(repeating)"); + } + index = (isRepeating ? 0 : index); + if (colVector.noNulls || !colVector.isNull[index]) { + if (colVector instanceof LongColumnVector) { + sb.append(((LongColumnVector) colVector).vector[index]); + } else if (colVector instanceof DoubleColumnVector) { + sb.append(((DoubleColumnVector) colVector).vector[index]); + } else if (colVector instanceof BytesColumnVector) { + BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector; + byte[] bytes = bytesColumnVector.vector[index]; + int start = bytesColumnVector.start[index]; + int length = bytesColumnVector.length[index]; + if (bytes == null) { + sb.append("(Unexpected null bytes with start " + start + " length " + length + ")"); + } else { + sb.append("bytes: '" + displayBytes(bytes, start, length) + "'"); + } + } else if (colVector instanceof DecimalColumnVector) { + sb.append(((DecimalColumnVector) colVector).vector[index].toString()); + } else { + sb.append("Unknown"); + } + } else { + sb.append("NULL"); + } + } + sb.append(" "); + } + return sb.toString(); + } + + public static String bareOneRowString(VectorizedRowBatch batch, int index) { + return bareOneRowString(batch, index, batch.projectedColumns, batch.projectionSize); + } + + public static void bareDisplayOneRow(VectorizedRowBatch batch, int index, + int[] projectedColumns, int projectionSize, String prefix) { + LOG.info(prefix + " " + bareOneRowString(batch, index, projectedColumns, projectionSize)); + } + + public static void bareDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) { + LOG.info(prefix + " " + bareOneRowString(batch, index)); + } + + public static void bareDisplayBatch(VectorizedRowBatch batch, + int[] projectedColumns, int projectionSize, String prefix) { + for (int i = 0; i < batch.size; i++) { + int index = (batch.selectedInUse ? batch.selected[i] : i); + bareDisplayOneRow(batch, index, projectedColumns, projectionSize, prefix); + } + } + + public static void bareDisplayBatch(VectorizedRowBatch batch, String prefix) { + for (int i = 0; i < batch.size; i++) { + int index = (batch.selectedInUse ? batch.selected[i] : i); + bareDisplayOneRow(batch, index, prefix); + } + } + + public static class DebugBatchNullSummarizer { + + private ColumnVector.Type[] columnVectorTypes; + + private long[] nullCounters; + private long[] nonNullCounters; + private long[] repeatedColumnCounters; + + private int[] projectedColumns; + private int projectionSize; + + private long batchCount; + private long rowCount; + + public DebugBatchNullSummarizer() { + columnVectorTypes = null; + nullCounters = null; + nonNullCounters = null; + repeatedColumnCounters = null; + + projectedColumns = null; + projectionSize = 0; + batchCount = 0; + rowCount = 0; + } + + public void analyze(VectorizedRowBatch batch, String prefix) { + if (nullCounters == null) { + columnVectorTypes = new ColumnVector.Type[batch.projectionSize]; + for (int i = 0; i < batch.projectionSize; i++) { + int p = batch.projectedColumns[i]; + ColumnVector colVector = batch.cols[p]; + ColumnVector.Type type; + if (colVector == null) { + type = null; + } else if (colVector instanceof LongColumnVector) { + type = ColumnVector.Type.LONG; + } else if (colVector instanceof DoubleColumnVector) { + type = ColumnVector.Type.DOUBLE; + } else if (colVector instanceof BytesColumnVector) { + type = ColumnVector.Type.BYTES; + } else if (colVector instanceof DecimalColumnVector) { + type = ColumnVector.Type.DECIMAL; + } else { + throw new RuntimeException("Unknown column vector type " + colVector.getClass().getName()); + } + columnVectorTypes[i] = type; + } + nullCounters = new long[batch.projectionSize]; + nonNullCounters = new long[batch.projectionSize]; + repeatedColumnCounters = new long[batch.projectionSize]; + projectedColumns = Arrays.copyOf(batch.projectedColumns, batch.projectionSize); + projectionSize = batch.projectionSize; + } + + boolean selectedInUse = batch.selectedInUse; + int[] selected = batch.selected; + for (int i = 0; i < batch.projectionSize; i++) { + int p = batch.projectedColumns[i]; + ColumnVector colVector = batch.cols[p]; + if (colVector == null) { + continue; + } + if (colVector.isRepeating) { + repeatedColumnCounters[i]++; + if (colVector.noNulls || !colVector.isNull[0]) { + nonNullCounters[i] += batch.size; + } else { + nullCounters[i] += batch.size; + } + } else if (colVector.noNulls) { + nonNullCounters[i] += batch.size; + } else { + for (int r = 0; r < batch.size; r++) { + int adjustedIndex = (selectedInUse ? selected[r] : r); + if (colVector.noNulls || !colVector.isNull[adjustedIndex]) { + nonNullCounters[i] ++; + } else { + nullCounters[i] ++; + } + } + } + } + batchCount++; + rowCount += batch.size; + } + + public void displaySummary(String prefix) { + StringBuilder sb = new StringBuilder(); + sb.append(prefix + " "); + sb.append("batch count "); + sb.append(batchCount); + sb.append(", row count "); + sb.append(rowCount); + for (int i = 0; i < projectionSize; i++) { + sb.append(", "); + sb.append(columnVectorTypes[i] == null ? "NULL" : columnVectorTypes[i].name()); + sb.append(" "); + sb.append(nonNullCounters[i]); + sb.append(":"); + sb.append(nullCounters[i]); + if (repeatedColumnCounters[i] != 0) { + sb.append("(repeated "); + sb.append(repeatedColumnCounters[i]); + sb.append(")"); + } + } + LOG.info(sb.toString()); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java index 8d75cf3..9487090 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java @@ -142,6 +142,7 @@ private void evaluateDecimal(VectorizedRowBatch vrg) { @Override public void evaluate(VectorizedRowBatch vrg) { + vrg.cols[outputColumn].reset(); switch (type) { case LONG: evaluateLong(vrg); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index c1c137b..7204409 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -51,6 +51,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.ByteStream.Output; +import com.google.common.base.Preconditions; + /** * This class has methods for generating vectorized join results and forwarding batchs. * @@ -104,6 +106,40 @@ protected void commonSetup(VectorizedRowBatch batch) throws HiveException { //------------------------------------------------------------------------------------------------ + protected boolean verifyOutputColumnReset(ColumnVector colVector, String title, int column) { + if (colVector.isRepeating) { + Preconditions.checkState(false, title + " " + column + " isRepeating is true"); + } + if (!colVector.noNulls) { + Preconditions.checkState(false, title + " " + column + " noNulls is false"); + } + boolean[] isNull = colVector.isNull; + for (int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { + if (isNull[i]) { + Preconditions.checkState(false, title + " " + column + " isNull[" + i + "] is true"); + } + } + return true; + } + + protected boolean verifyOutputColumnsReset(VectorizedRowBatch batch) { + + // For join operators that can generate small table results, verify their + // (target) scratch columns. + + for (int column : smallTableOutputVectorColumns) { + Preconditions.checkState(verifyOutputColumnReset(batch.cols[column], "small table column", column)); + } + + for (int column : bigTableOuterKeyOutputVectorColumns) { + Preconditions.checkState(verifyOutputColumnReset(batch.cols[column], "big table outer join key", column)); + } + + return true; + } + + //------------------------------------------------------------------------------------------------ + protected void performValueExpressions(VectorizedRowBatch batch, int[] allMatchs, int allMatchCount) { /* @@ -582,6 +618,9 @@ protected void reProcessBigTable(int partitionId) */ public void forwardBigTableBatch(VectorizedRowBatch batch) throws HiveException { + // Make sure MapJoin didn't corrupt the batch. + Preconditions.checkState(batch.validateIsNull()); + // Save original projection. int[] originalProjections = batch.projectedColumns; int originalProjectionSize = batch.projectionSize; @@ -595,6 +634,9 @@ public void forwardBigTableBatch(VectorizedRowBatch batch) throws HiveException // Revert the projected columns back, because batch can be re-used by our parent operators. batch.projectionSize = originalProjectionSize; batch.projectedColumns = originalProjections; + + // Make sure some downstream operator didn't corrupt the batch. + Preconditions.checkState(batch.validateIsNull()); } @@ -602,7 +644,15 @@ public void forwardBigTableBatch(VectorizedRowBatch batch) throws HiveException * Forward the overflow batch and reset the batch. */ protected void forwardOverflow() throws HiveException { + + // Make sure MapJoin didn't corrupt the batch. + Preconditions.checkState(overflowBatch.validateIsNull()); + forward(overflowBatch, null); + + // Make sure some downstream operator didn't corrupt the batch. + Preconditions.checkState(overflowBatch.validateIsNull()); + overflowBatch.reset(); } @@ -610,7 +660,14 @@ protected void forwardOverflow() throws HiveException { * Forward the overflow batch, but do not reset the batch. */ private void forwardOverflowNoReset() throws HiveException { + + // Make sure MapJoin didn't corrupt the batch. + Preconditions.checkState(overflowBatch.validateIsNull()); + forward(overflowBatch, null); + + // Make sure some downstream operator didn't corrupt the batch. + Preconditions.checkState(overflowBatch.validateIsNull()); } /* diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java index 6b33a39..09dcd77 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import com.google.common.base.Preconditions; + /** * This class has methods for generating vectorized join results for the big table only * variation of inner joins. @@ -147,6 +149,8 @@ protected void finishInnerBigOnly(VectorizedRowBatch batch, VectorMapJoinHashTableResult[] hashTableResults, int hashMapResultCount) throws HiveException, IOException { + Preconditions.checkState(batch.validateIsNull()); + // Get rid of spills before we start modifying the batch. if (spillCount > 0) { spillHashMapBatch(batch, hashTableResults, @@ -158,6 +162,9 @@ protected void finishInnerBigOnly(VectorizedRowBatch batch, */ if (allMatchCount > 0 && bigTableValueExpressions != null) { performValueExpressions(batch, allMatchs, allMatchCount); + + Preconditions.checkState(batch.validateIsNull()); + } int numSel = 0; @@ -177,6 +184,9 @@ protected void finishInnerBigOnly(VectorizedRowBatch batch, } batch.size = numSel; batch.selectedInUse = true; + + Preconditions.checkState(batch.validateIsNull()); + } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java index 9e77d22..6795abd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java @@ -37,6 +37,8 @@ // Single-Column Long specific imports. import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an inner join on a Single-Column Long * and only big table columns appear in the join result so a hash multi-set is used. @@ -127,6 +129,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an inner big-only join. + Preconditions.checkState(batch.validateIsNull()); + // (Currently none) // innerBigOnlyPerBatchSetup(batch); @@ -200,6 +204,9 @@ public void process(Object row, int tag) throws HiveException { LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); } finishInnerBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0]); + + Preconditions.checkState(batch.validateIsNull()); + } else { /* diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java index e4f6c5d..c0af16e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an inner join on Multi-Key * and only big table columns appear in the join result so a hash multi-set is used. @@ -132,6 +134,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an inner big-only join. + Preconditions.checkState(batch.validateIsNull()); + // (Currently none) // innerBigOnlyPerBatchSetup(batch); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java index 2711b10..0778798 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an inner join on a Single-Column String * and only big table columns appear in the join result so a hash multi-set is used. @@ -118,6 +120,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an inner big-only join. + Preconditions.checkState(batch.validateIsNull()); + // (Currently none) // innerBigOnlyPerBatchSetup(batch); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java index 0197225..4f9c061 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerLongOperator.java @@ -36,6 +36,8 @@ // Single-Column Long specific imports. import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an inner join on a Single-Column Long * using a hash map. @@ -126,6 +128,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an inner join. + Preconditions.checkState(batch.validateIsNull()); + innerPerBatchSetup(batch); // For inner joins, we may apply the filter(s) now. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java index 837d97b..36771a7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an inner join on a Multi-Key * using a hash map. @@ -130,6 +132,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an inner join. + Preconditions.checkState(batch.validateIsNull()); + innerPerBatchSetup(batch); // For inner joins, we may apply the filter(s) now. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java index b2711c3..16d8ba0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerStringOperator.java @@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an inner join on a Single-Column String * using a hash map. @@ -117,6 +119,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an inner join. + Preconditions.checkState(batch.validateIsNull()); + innerPerBatchSetup(batch); // For inner joins, we may apply the filter(s) now. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java index 4b8ab58..c97513b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiLongOperator.java @@ -37,6 +37,8 @@ // Single-Column Long specific imports. import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an left semi join on a Single-Column Long * using a hash set. @@ -127,6 +129,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an left semi join. + Preconditions.checkState(batch.validateIsNull()); + // (Currently none) // leftSemiPerBatchSetup(batch); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java index bdf7901..4f280e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an left semi join on Multi-Key * using hash set. @@ -131,6 +133,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an left semi join. + Preconditions.checkState(batch.validateIsNull()); + // (Currently none) // leftSemiPerBatchSetup(batch); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java index a8d3459..a3585f6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiStringOperator.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an left semi join on a Single-Column String * using a hash set. @@ -118,6 +120,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an left semi join. + Preconditions.checkState(batch.validateIsNull()); + // (Currently none) // leftSemiPerBatchSetup(batch); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java index 5a88784..9e630f7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.serde2.WriteBuffers.ByteSegmentRef; +import com.google.common.base.Preconditions; + /** * This class has methods for generating vectorized join results for outer joins. * @@ -263,9 +265,11 @@ private void doValueExpr(VectorizedRowBatch batch, private int subtractFromInputSelected(boolean inputSelectedInUse, int inputLogicalSize, int[] remove, int removeSize, int[] difference) throws HiveException { - // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { - // throw new HiveException("remove is not in sort order and unique"); - // } + Preconditions.checkState((inputSelected != remove) && (remove != difference) && (difference != inputSelected)); + + Preconditions.checkState(verifyMonotonicallyIncreasing(inputSelected, inputLogicalSize)); + + Preconditions.checkState(verifyMonotonicallyIncreasing(remove, removeSize)); int differenceCount = 0; @@ -294,9 +298,7 @@ private int subtractFromInputSelected(boolean inputSelectedInUse, int inputLogic throw new HiveException("Not all batch indices removed"); } - // if (!verifyMonotonicallyIncreasing(difference, differenceCount)) { - // throw new HiveException("difference is not in sort order and unique"); - // } + Preconditions.checkState(verifyMonotonicallyIncreasing(difference, differenceCount)); return differenceCount; } @@ -323,9 +325,12 @@ private int subtractFromInputSelected(boolean inputSelectedInUse, int inputLogic private int subtract(int[] all, int allSize, int[] remove, int removeSize, int[] difference) throws HiveException { - // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { - // throw new HiveException("remove is not in sort order and unique"); - // } + + Preconditions.checkState((all != remove) && (remove != difference) && (difference != all)); + + Preconditions.checkState(verifyMonotonicallyIncreasing(all, allSize)); + + Preconditions.checkState(verifyMonotonicallyIncreasing(remove, removeSize)); int differenceCount = 0; @@ -344,6 +349,8 @@ private int subtract(int[] all, int allSize, throw new HiveException("Not all batch indices removed"); } + Preconditions.checkState(verifyMonotonicallyIncreasing(difference, differenceCount)); + return differenceCount; } @@ -363,14 +370,12 @@ private int subtract(int[] all, int allSize, private int sortMerge(int[] selected1, int selected1Count, int[] selected2, int selected2Count, int[] sortMerged) throws HiveException { - // if (!verifyMonotonicallyIncreasing(selected1, selected1Count)) { - // throw new HiveException("selected1 is not in sort order and unique"); - // } - // if (!verifyMonotonicallyIncreasing(selected2, selected2Count)) { - // throw new HiveException("selected1 is not in sort order and unique"); - // } + Preconditions.checkState((selected1 != selected2) && (selected2 != sortMerged) && (sortMerged != selected1)); + + Preconditions.checkState(verifyMonotonicallyIncreasing(selected1, selected1Count)); + Preconditions.checkState(verifyMonotonicallyIncreasing(selected2, selected2Count)); int sortMergeCount = 0; @@ -390,13 +395,160 @@ private int sortMerge(int[] selected1, int selected1Count, } } - // if (!verifyMonotonicallyIncreasing(sortMerged, sortMergeCount)) { - // throw new HiveException("sortMerged is not in sort order and unique"); - // } + Preconditions.checkState(verifyMonotonicallyIncreasing(sortMerged, sortMergeCount)); return sortMergeCount; } + private boolean validateCoverage(boolean inputSelectedInUse, int inputLogicalSize, int spillCount, + int allMatchCount, int noMatchCount) { + + Preconditions.checkState(inputLogicalSize >= 0); + Preconditions.checkState(inputLogicalSize <= VectorizedRowBatch.DEFAULT_SIZE); + if (inputSelectedInUse) { + Preconditions.checkState(verifyMonotonicallyIncreasing(inputSelected, inputLogicalSize)); + } + Preconditions.checkState(spillCount >= 0); + Preconditions.checkState(verifyMonotonicallyIncreasing(spills, spillCount)); + + Preconditions.checkState(allMatchCount >= 0); + Preconditions.checkState(verifyMonotonicallyIncreasing(allMatchs, allMatchCount)); + + Preconditions.checkState(noMatchCount >= 0); + Preconditions.checkState(verifyMonotonicallyIncreasing(noMatchs, noMatchCount)); + + int allCount = spillCount + allMatchCount + noMatchCount; + + Preconditions.checkState(inputLogicalSize == allCount); + + boolean[] allValidate = new boolean[VectorizedRowBatch.DEFAULT_SIZE]; + + int batchIndex; + + for (int i = 0; i < spillCount; i++) { + batchIndex = spills[i]; + Preconditions.checkState(!allValidate[batchIndex]); + allValidate[batchIndex] = true; + } + + for (int i = 0; i < allMatchCount; i++) { + batchIndex = allMatchs[i]; + Preconditions.checkState(!allValidate[batchIndex]); + allValidate[batchIndex] = true; + } + + for (int i = 0; i < noMatchCount; i++) { + batchIndex = noMatchs[i]; + Preconditions.checkState(!allValidate[batchIndex]); + allValidate[batchIndex] = true; + } + + if (inputSelectedInUse) { + for (int i = 0; i < inputLogicalSize; i++) { + batchIndex = inputSelected[i]; + Preconditions.checkState(allValidate[batchIndex]); + } + } else { + for (int i = 0; i < inputLogicalSize; i++) { + Preconditions.checkState(allValidate[i]); + } + } + + return true; + } + + private boolean validateOuterResult(VectorizedRowBatch batch, + boolean inputSelectedInUse, int inputLogicalSize, int spillCount, + int allMatchCount, int equalKeySeriesCount, int noMatchCount) { + + // Assume validateCoverage called. + + boolean[] expectedRows = new boolean[VectorizedRowBatch.DEFAULT_SIZE]; + + int numSel = 0; + for (int i = 0; i < equalKeySeriesCount; i++) { + int allMatchesIndex = equalKeySeriesAllMatchIndices[i]; + boolean isSingleValue = equalKeySeriesIsSingleValue[i]; + int duplicateCount = equalKeySeriesDuplicateCounts[i]; + + if (isSingleValue) { + for (int s = 0; s < duplicateCount; s++) { + int batchIndex = allMatchs[allMatchesIndex + s]; + Preconditions.checkState(!expectedRows[batchIndex]); + expectedRows[batchIndex] = true; + numSel++; + } + } + } + + for (int i = 0; i < noMatchCount; i++) { + int batchIndex = noMatchs[i]; + Preconditions.checkState(!expectedRows[batchIndex]); + expectedRows[batchIndex] = true; + numSel++; + } + + Preconditions.checkState(batch.size == numSel); + if (numSel > 0) { + Preconditions.checkState(batch.selectedInUse); + Preconditions.checkState(verifyMonotonicallyIncreasing(batch.selected, numSel)); + for (int i = 0; i < numSel; i++) { + int batchIndex = batch.selected[i]; + Preconditions.checkState(expectedRows[batchIndex]); + } + } + + for (int i = 0; i < noMatchCount; i++) { + int batchIndex = noMatchs[i]; + + for (int column : bigTableOuterKeyOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex); + Preconditions.checkState(!colVector.noNulls); + Preconditions.checkState(colVector.isNull[adjustedIndex]); + } + + // Small table values are set to null. + for (int column : smallTableOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex); + Preconditions.checkState(!colVector.noNulls); + Preconditions.checkState(colVector.isNull[adjustedIndex]); + } + } + + for (int i = 0; i < equalKeySeriesCount; i++) { + int allMatchesIndex = equalKeySeriesAllMatchIndices[i]; + boolean isSingleValue = equalKeySeriesIsSingleValue[i]; + int duplicateCount = equalKeySeriesDuplicateCounts[i]; + + if (isSingleValue) { + for (int s = 0; s < duplicateCount; s++) { + int batchIndex = allMatchs[allMatchesIndex + s]; + + for (int column : bigTableOuterKeyOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex); + Preconditions.checkState( + (colVector.noNulls && !colVector.isNull[adjustedIndex]) || + !colVector.noNulls); + } + + // Small table values are set to null. + for (int column : smallTableOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex); + Preconditions.checkState( + (colVector.noNulls && !colVector.isNull[adjustedIndex]) || + !colVector.noNulls); + } + } + } + } + + return true; + } + /** * Generate the outer join output results for one vectorized row batch. * @@ -474,6 +626,10 @@ public void finishOuter(VectorizedRowBatch batch, } } + Preconditions.checkState( + validateCoverage(inputSelectedInUse, inputLogicalSize, spillCount, + allMatchCount, noMatchCount)); + // When we generate results into the overflow batch, we may still end up with fewer rows // in the big table batch. So, nulSel and the batch's selected array will be rebuilt with // just the big table rows that need to be forwarded, minus any rows processed with the @@ -543,6 +699,11 @@ public void finishOuter(VectorizedRowBatch batch, } } } + + Preconditions.checkState( + validateOuterResult(batch, + inputSelectedInUse, inputLogicalSize, spillCount, + allMatchCount, equalKeySeriesCount, noMatchCount)); } /** @@ -570,6 +731,8 @@ protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs, // key as null, too. for (int column : bigTableOuterKeyOutputVectorColumns) { ColumnVector colVector = batch.cols[column]; + Preconditions.checkState(!colVector.isRepeating); + Preconditions.checkState(!colVector.isNull[batchIndex]); colVector.noNulls = false; colVector.isNull[batchIndex] = true; } @@ -577,6 +740,8 @@ protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs, // Small table values are set to null. for (int column : smallTableOutputVectorColumns) { ColumnVector colVector = batch.cols[column]; + Preconditions.checkState(!colVector.isRepeating); + Preconditions.checkState(!colVector.isNull[batchIndex]); colVector.noNulls = false; colVector.isNull[batchIndex] = true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java index 5b687fd..3b7ddad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterLongOperator.java @@ -38,6 +38,8 @@ // Single-Column Long specific imports. import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an outer join on a Single-Column Long * using a hash map. @@ -136,6 +138,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an outer join. + Preconditions.checkState(batch.validateIsNull()); + outerPerBatchSetup(batch); // For outer join, remember our input rows before ON expression filtering or before diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java index e212a2a..609eec2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an outer join on Multi-Key * using a hash map. @@ -139,6 +141,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an outer join. + Preconditions.checkState(batch.validateIsNull()); + outerPerBatchSetup(batch); // For outer join, remember our input rows before ON expression filtering or before diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java index e4107ff..8b7d162 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java @@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; +import com.google.common.base.Preconditions; + /* * Specialized class for doing a vectorized map join that is an outer join on a Single-Column String * using a hash map. @@ -126,6 +128,8 @@ public void process(Object row, int tag) throws HiveException { // Do the per-batch setup for an outer join. + Preconditions.checkState(batch.validateIsNull()); + outerPerBatchSetup(batch); // For outer join, remember our input rows before ON expression filtering or before diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index 3fe28d8..cc67e6b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -240,23 +240,16 @@ Object next(Object previous) throws IOException { */ public Object nextVector(Object previousVector, long batchSize) throws IOException { ColumnVector result = (ColumnVector) previousVector; + result.reset(); if (present != null) { // Set noNulls and isNull vector of the ColumnVector based on // present stream - result.noNulls = true; for (int i = 0; i < batchSize; i++) { result.isNull[i] = (present.next() != 1); if (result.noNulls && result.isNull[i]) { result.noNulls = false; } } - } else { - // There is not present stream, this means that all the values are - // present. - result.noNulls = true; - for (int i = 0; i < batchSize; i++) { - result.isNull[i] = false; - } } return previousVector; } diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java index fcb1ae9..8368ff7 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.Arrays; +import com.google.common.base.Preconditions; + /** * ColumnVector contains the shared structure for the sub-types, * including NULL information, and whether this vector @@ -82,6 +84,28 @@ public ColumnVector(int len) { preFlattenIsRepeating = false; } + public boolean validateIsNull(int colNum) { + + // Only call this method when noNulls is true; + Preconditions.checkState(noNulls); + + for (int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { + Preconditions.checkState(!isNull[i], "column " + colNum + " noNulls is true and isNull[" + i + "] array entry is true"); + } + return true; + } + + public boolean validateIsNull() { + + // Only call this method when noNulls is true; + Preconditions.checkState(noNulls); + + for (int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { + Preconditions.checkState(!isNull[i], "noNulls is true and isNull[" + i + "] array entry is true"); + } + return true; + } + /** * Resets the column to default state * - fills the isNull array with false @@ -91,6 +115,8 @@ public ColumnVector(int len) { public void reset() { if (!noNulls) { Arrays.fill(isNull, false); + } else { + Preconditions.checkState(validateIsNull()); } noNulls = true; isRepeating = false; diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index 380300e..dc6b1f9 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -95,6 +95,16 @@ public VectorizedRowBatch(int numCols, int size) { partitionColumnCount = -1; } + public boolean validateIsNull() { + for (int i = 0; i < numCols; i++) { + ColumnVector colVector = cols[i]; + if (colVector != null && colVector.noNulls) { + colVector.validateIsNull(i); + } + } + return true; + } + public void setPartitionInfo(int dataColumnCount, int partitionColumnCount) { this.dataColumnCount = dataColumnCount; this.partitionColumnCount = partitionColumnCount;