diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index 8a54433..84aac96 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -86,7 +86,8 @@ List row = new ArrayList(Utilities.reduceFieldNameList.size()); - private DataOutputBuffer buffer; + private DataOutputBuffer keyBuffer; + private DataOutputBuffer valueBuffer; private VectorizedRowBatchCtx batchContext; private VectorizedRowBatch batch; @@ -136,7 +137,8 @@ void init(JobConf jconf, Operator reducer, boolean vectorized, TableDesc keyT if(vectorized) { keyStructInspector = (StructObjectInspector) keyObjectInspector; keysColumnOffset = keyStructInspector.getAllStructFieldRefs().size(); - buffer = new DataOutputBuffer(); + keyBuffer = new DataOutputBuffer(); + valueBuffer = new DataOutputBuffer(); } // We should initialize the SerDe with the TypeInfo when available. @@ -324,11 +326,9 @@ private void processKeyValues(Iterable values, byte tag) throws HiveExce * @return true if it is not done and can take more inputs */ private void processVectors(Iterable values, byte tag) throws HiveException { - batch.reset(); - /* deserialize key into columns */ VectorizedBatchUtil.addRowToBatchFrom(keyObject, keyStructInspector, - 0, 0, batch, buffer); + 0, 0, batch, keyBuffer); for(int i = 0; i < keysColumnOffset; i++) { VectorizedBatchUtil.setRepeatingColumn(batch, i); } @@ -341,18 +341,28 @@ private void processVectors(Iterable values, byte tag) throws HiveExcept Object valueObj = deserializeValue(valueWritable, tag); VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors, - rowIdx, keysColumnOffset, batch, buffer); + rowIdx, keysColumnOffset, batch, valueBuffer); rowIdx++; if (rowIdx >= BATCH_SIZE) { VectorizedBatchUtil.setBatchSize(batch, rowIdx); reducer.processOp(batch, tag); + + // Reset just the value columns and value buffer. + for (int i = keysColumnOffset; i < batch.numCols; i++) { + batch.cols[i].reset(); + } + valueBuffer.reset(); rowIdx = 0; } } if (rowIdx > 0) { + // Flush final partial batch. VectorizedBatchUtil.setBatchSize(batch, rowIdx); reducer.processOp(batch, tag); } + batch.reset(); + keyBuffer.reset(); + valueBuffer.reset(); } catch (Exception e) { String rowString = null; try {