diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java index 767df2161b..ef5ca02989 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java @@ -64,6 +64,8 @@ protected int[] reduceSinkPartitionColumnMap; protected TypeInfo[] reduceSinkPartitionTypeInfos; + private boolean isSingleReducer; + protected VectorExpression[] reduceSinkPartitionExpressions; // The above members are initialized by the constructor and must not be @@ -119,6 +121,8 @@ public VectorReduceSinkObjectHashOperator(CompilationOpContext ctx, OperatorDesc reduceSinkPartitionTypeInfos = vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos(); reduceSinkPartitionExpressions = vectorReduceSinkInfo.getReduceSinkPartitionExpressions(); } + + isSingleReducer = this.conf.getNumReducers() == 1; } private ObjectInspector[] getObjectInspectorArray(TypeInfo[] typeInfos) { @@ -255,48 +259,35 @@ public void process(Object row, int tag) throws HiveException { final int size = batch.size; - if (isEmptyBuckets) { // EmptyBuckets = true - if (isEmptyPartitions) { // isEmptyPartition = true - for (int logical = 0; logical< size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - final int hashCode = nonPartitionRandom.nextInt(); - postProcess(batch, batchIndex, tag, hashCode); - } - } else { // isEmptyPartition = false - for (int logical = 0; logical< size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); - final int hashCode = hashFunc.apply(partitionFieldValues, partitionObjectInspectors); - postProcess(batch, batchIndex, tag, hashCode); + for (int logical = 0; logical< size; logical++) { + final int batchIndex = (selectedInUse ? selected[logical] : logical); + int hashCode; + if (isEmptyPartitions) { + if (isSingleReducer) { + // Empty partition, single reducer -> constant hashCode + hashCode = 0; + } else { + // Empty partition, multiple reducers -> random hashCode + hashCode = nonPartitionRandom.nextInt(); } + } else { + // Compute hashCode from partitions + partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); + hashCode = hashFunc.apply(partitionFieldValues, partitionObjectInspectors); } - } else { // EmptyBuckets = false - if (isEmptyPartitions) { // isEmptyPartition = true - for (int logical = 0; logical< size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); - final int bucketNum = ObjectInspectorUtils.getBucketNumber( - hashFunc.apply(bucketFieldValues, bucketObjectInspectors), numBuckets); - final int hashCode = nonPartitionRandom.nextInt() * 31 + bucketNum; - if (bucketExpr != null) { - evaluateBucketExpr(batch, batchIndex, bucketNum); - } - postProcess(batch, batchIndex, tag, hashCode); - } - } else { // isEmptyPartition = false - for (int logical = 0; logical< size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); - bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); - final int bucketNum = ObjectInspectorUtils.getBucketNumber( + + // Compute hashCode from buckets + if (!isEmptyBuckets) { + bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); + final int bucketNum = ObjectInspectorUtils.getBucketNumber( hashFunc.apply(bucketFieldValues, bucketObjectInspectors), numBuckets); - final int hashCode = hashFunc.apply(partitionFieldValues, partitionObjectInspectors) * 31 + bucketNum; - if (bucketExpr != null) { - evaluateBucketExpr(batch, batchIndex, bucketNum); - } - postProcess(batch, batchIndex, tag, hashCode); + if (bucketExpr != null) { + evaluateBucketExpr(batch, batchIndex, bucketNum); } + hashCode = hashCode * 31 + bucketNum; } + + postProcess(batch, batchIndex, tag, hashCode); } } catch (Exception e) { throw new HiveException(e);