diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index d7264c2..8cd49c5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -60,6 +60,8 @@ import org.apache.tez.runtime.library.api.KeyValueReader; import org.apache.tez.runtime.library.api.KeyValuesReader; +import com.google.common.base.Preconditions; + /** * Process input from tez LogicalInput and write output - for a map plan * Just pump the records through the query plan. @@ -100,7 +102,6 @@ // number of columns pertaining to keys in a vectorized row batch private int firstValueColumnOffset; - private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE; private StructObjectInspector keyStructInspector; private StructObjectInspector valueStructInspectors; @@ -428,6 +429,8 @@ private void processVectorGroup(BytesWritable keyWritable, VectorizedBatchUtil.setRepeatingColumn(batch, i); } + final int maxSize = batch.getMaxSize(); + Preconditions.checkState(maxSize > 0); int rowIdx = 0; try { for (Object value : values) { @@ -444,8 +447,10 @@ private void processVectorGroup(BytesWritable keyWritable, valueLazyBinaryDeserializeToRow.deserialize(batch, rowIdx); } rowIdx++; - if (rowIdx >= BATCH_SIZE) { - VectorizedBatchUtil.setBatchSize(batch, rowIdx); + if (rowIdx >= maxSize) { + + // Batch is full. + batch.size = rowIdx; reducer.process(batch, tag); // Reset just the value columns and value buffer.