diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java index 8d75cf3..c9636a1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.ql.exec.vector.*; +import com.google.common.base.Preconditions; + /** * Constant is represented as a vector with repeating values. */ @@ -142,6 +144,7 @@ private void evaluateDecimal(VectorizedRowBatch vrg) { @Override public void evaluate(VectorizedRowBatch vrg) { + vrg.cols[outputColumn].reset(); switch (type) { case LONG: evaluateLong(vrg); @@ -156,6 +159,7 @@ public void evaluate(VectorizedRowBatch vrg) { evaluateDecimal(vrg); break; } + Preconditions.checkState(vrg.cols[outputColumn].validateIsNull(outputColumn)); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index a85bfef..e667ceb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -70,6 +70,8 @@ import org.apache.hadoop.io.Text; import org.apache.orc.OrcProto; +import com.google.common.base.Preconditions; + public class RecordReaderImpl implements RecordReader { static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class); private static final boolean isLogDebugEnabled = LOG.isDebugEnabled(); @@ -1056,12 +1058,37 @@ public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOExcept result.cols = cols; } else { result = (VectorizedRowBatch) previous; + // Reset batch proper. result.selectedInUse = false; + result.size = 0; + result.endOfFile = false; + + // Reset data columns but not partition columns. + int dataColumnCount = result.getDataColumnCount(); + for (int i = 0; i < dataColumnCount; i++) { + ColumnVector colVector = result.cols[i]; + if (colVector != null) { + colVector.reset(); + } + } + // Reset scratch columns that are after partition columns, if any. + for (int i = dataColumnCount + result.getPartitionColumnCount(); i < result.cols.length; i++) { + ColumnVector colVector = result.cols[i]; + if (colVector != null) { + if (colVector.noNulls) { + Preconditions.checkState(colVector.validateIsNull(i)); + } + colVector.reset(); + } + } + reader.setVectorColumnCount(result.getDataColumnCount()); reader.nextVector(result.cols, (int) batchSize); } result.size = (int) batchSize; + Preconditions.checkState(result.validateIsNull()); + advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true); return result; } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index 3fe28d8..df57952 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -69,6 +69,8 @@ import org.apache.orc.impl.SerializationUtils; import org.apache.orc.impl.StreamName; +import com.google.common.base.Preconditions; + /** * Factory for creating ORC tree readers. */ @@ -240,23 +242,19 @@ Object next(Object previous) throws IOException { */ public Object nextVector(Object previousVector, long batchSize) throws IOException { ColumnVector result = (ColumnVector) previousVector; + + // Validate the row has been reset. + Preconditions.checkState(result.isReset()); + if (present != null) { // Set noNulls and isNull vector of the ColumnVector based on // present stream - result.noNulls = true; for (int i = 0; i < batchSize; i++) { result.isNull[i] = (present.next() != 1); if (result.noNulls && result.isNull[i]) { result.noNulls = false; } } - } else { - // There is not present stream, this means that all the values are - // present. - result.noNulls = true; - for (int i = 0; i < batchSize; i++) { - result.isNull[i] = false; - } } return previousVector; } diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java index fcb1ae9..d7a6c9f 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.Arrays; +import com.google.common.base.Preconditions; + /** * ColumnVector contains the shared structure for the sub-types, * including NULL information, and whether this vector @@ -83,6 +85,68 @@ public ColumnVector(int len) { } /** + * Validate the either the isNull array is all false or the noNulls flag is false. + * @param column for error reporting. + * @return is boolean so calls to this method can be done in a Preconditions.checkState + */ + public boolean validateIsNull(int column) { + if (!noNulls) { + return true; + } + int row = findTrueInIsNullArray(); + if (row != -1) { + Preconditions.checkState(false, "column " + column + " noNulls is true and isNull[" + row + "] array entry is true"); + } + return true; + } + + /** + * Validate the either the isNull array is all false or the noNulls flag is false. + * @return is boolean so calls to this method can be done in a Preconditions.checkState + */ + public boolean validateIsNull() { + if (!noNulls) { + return true; + } + int row = findTrueInIsNullArray(); + if (row != -1) { + Preconditions.checkState(false, "noNulls is true and isNull[" + row + "] array entry is true"); + } + return true; + } + + /** + * Determine if the isNull array contains a true flag. + * @return -1 if not true flag was found; otherwise, the row number. + */ + public int findTrueInIsNullArray() { + + // Only call this method when noNulls is true; + Preconditions.checkState(noNulls); + + for (int i = 0; i < isNull.length; i++) { + if (isNull[i]) { + return i; + } + } + return -1; + } + + /** + * Determine if the column is in reset state. + * @return + */ + public boolean isReset() { + if (isRepeating) { + return false; + } + if (!noNulls) { + return false; + } + return findTrueInIsNullArray() == -1; + } + + /** * Resets the column to default state * - fills the isNull array with false * - sets noNulls to true @@ -91,6 +155,8 @@ public ColumnVector(int len) { public void reset() { if (!noNulls) { Arrays.fill(isNull, false); + } else { + Preconditions.checkState(validateIsNull()); } noNulls = true; isRepeating = false; diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index 380300e..8126dc3 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -24,6 +24,8 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; +import com.google.common.base.Preconditions; + /** * A VectorizedRowBatch is a set of rows, organized with each column * as a vector. It is the unit of query execution, organized to minimize @@ -108,6 +110,30 @@ public int getPartitionColumnCount() { return partitionColumnCount; } + public boolean validateIsNull() { + for (int i = 0; i < numCols; i++) { + ColumnVector colVector = cols[i]; + if (colVector != null && colVector.noNulls) { + Preconditions.checkState(colVector.validateIsNull(i)); + } + } + return true; + } + + public int invalidIsNullCol() { + int row; + for (int i = 0; i < numCols; i++) { + ColumnVector colVector = cols[i]; + if (colVector != null && colVector.noNulls) { + row = colVector.findTrueInIsNullArray(); + if (row != -1) { + return i; + } + } + } + return -1; + } + /** * Returns the maximum size of the batch (number of rows it can hold) */ @@ -196,8 +222,15 @@ public void reset() { selectedInUse = false; size = 0; endOfFile = false; - for (ColumnVector vc : cols) { + for (int i = 0; i < cols.length; i++) { + ColumnVector vc = cols[i]; if (vc != null) { + if (vc.noNulls) { + int invalidIsNullRow = vc.findTrueInIsNullArray(); + if (invalidIsNullRow != -1) { + throw new RuntimeException("Column " + i + " noNulls is true and isNull[" + invalidIsNullRow + "] array entry is true"); + } + } vc.reset(); vc.init(); }