diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java index 767df2161b..ef5ca02989 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java @@ -64,6 +64,8 @@ protected int[] reduceSinkPartitionColumnMap; protected TypeInfo[] reduceSinkPartitionTypeInfos; + private boolean isSingleReducer; + protected VectorExpression[] reduceSinkPartitionExpressions; // The above members are initialized by the constructor and must not be @@ -119,6 +121,8 @@ public VectorReduceSinkObjectHashOperator(CompilationOpContext ctx, OperatorDesc reduceSinkPartitionTypeInfos = vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos(); reduceSinkPartitionExpressions = vectorReduceSinkInfo.getReduceSinkPartitionExpressions(); } + + isSingleReducer = this.conf.getNumReducers() == 1; } private ObjectInspector[] getObjectInspectorArray(TypeInfo[] typeInfos) { @@ -255,48 +259,35 @@ public void process(Object row, int tag) throws HiveException { final int size = batch.size; - if (isEmptyBuckets) { // EmptyBuckets = true - if (isEmptyPartitions) { // isEmptyPartition = true - for (int logical = 0; logical< size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - final int hashCode = nonPartitionRandom.nextInt(); - postProcess(batch, batchIndex, tag, hashCode); - } - } else { // isEmptyPartition = false - for (int logical = 0; logical< size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); - final int hashCode = hashFunc.apply(partitionFieldValues, partitionObjectInspectors); - postProcess(batch, batchIndex, tag, hashCode); + for (int logical = 0; logical< size; logical++) { + final int batchIndex = (selectedInUse ? selected[logical] : logical); + int hashCode; + if (isEmptyPartitions) { + if (isSingleReducer) { + // Empty partition, single reducer -> constant hashCode + hashCode = 0; + } else { + // Empty partition, multiple reducers -> random hashCode + hashCode = nonPartitionRandom.nextInt(); } + } else { + // Compute hashCode from partitions + partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); + hashCode = hashFunc.apply(partitionFieldValues, partitionObjectInspectors); } - } else { // EmptyBuckets = false - if (isEmptyPartitions) { // isEmptyPartition = true - for (int logical = 0; logical< size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); - final int bucketNum = ObjectInspectorUtils.getBucketNumber( - hashFunc.apply(bucketFieldValues, bucketObjectInspectors), numBuckets); - final int hashCode = nonPartitionRandom.nextInt() * 31 + bucketNum; - if (bucketExpr != null) { - evaluateBucketExpr(batch, batchIndex, bucketNum); - } - postProcess(batch, batchIndex, tag, hashCode); - } - } else { // isEmptyPartition = false - for (int logical = 0; logical< size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); - bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); - final int bucketNum = ObjectInspectorUtils.getBucketNumber( + + // Compute hashCode from buckets + if (!isEmptyBuckets) { + bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); + final int bucketNum = ObjectInspectorUtils.getBucketNumber( hashFunc.apply(bucketFieldValues, bucketObjectInspectors), numBuckets); - final int hashCode = hashFunc.apply(partitionFieldValues, partitionObjectInspectors) * 31 + bucketNum; - if (bucketExpr != null) { - evaluateBucketExpr(batch, batchIndex, bucketNum); - } - postProcess(batch, batchIndex, tag, hashCode); + if (bucketExpr != null) { + evaluateBucketExpr(batch, batchIndex, bucketNum); } + hashCode = hashCode * 31 + bucketNum; } + + postProcess(batch, batchIndex, tag, hashCode); } } catch (Exception e) { throw new HiveException(e); diff --git ql/src/test/results/clientpositive/llap/murmur_hash_migration.q.out ql/src/test/results/clientpositive/llap/murmur_hash_migration.q.out index 5343628252..98e366a270 100644 --- ql/src/test/results/clientpositive/llap/murmur_hash_migration.q.out +++ ql/src/test/results/clientpositive/llap/murmur_hash_migration.q.out @@ -417,11 +417,11 @@ STAGE PLANS: outputColumnNames: _col0, _col1, _col2, _col3 input vertices: 1 Map 3 - Statistics: Num rows: 220 Data size: 41800 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 150 Data size: 28500 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: int), _col1 (type: string), _col2 (type: int), _col3 (type: string) sort order: ++++ - Statistics: Num rows: 220 Data size: 41800 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 150 Data size: 28500 Basic stats: COMPLETE Column stats: COMPLETE Execution mode: vectorized, llap LLAP IO: no inputs Map 3 @@ -451,10 +451,10 @@ STAGE PLANS: Select Operator expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: string), KEY.reducesinkkey2 (type: int), KEY.reducesinkkey3 (type: string) outputColumnNames: _col0, _col1, _col2, _col3 - Statistics: Num rows: 220 Data size: 41800 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 150 Data size: 28500 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false - Statistics: Num rows: 220 Data size: 41800 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 150 Data size: 28500 Basic stats: COMPLETE Column stats: COMPLETE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -558,11 +558,11 @@ STAGE PLANS: outputColumnNames: _col0, _col1, _col2, _col3 input vertices: 1 Map 3 - Statistics: Num rows: 220 Data size: 41800 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 150 Data size: 28500 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: int), _col1 (type: string), _col2 (type: int), _col3 (type: string) sort order: ++++ - Statistics: Num rows: 220 Data size: 41800 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 150 Data size: 28500 Basic stats: COMPLETE Column stats: COMPLETE Execution mode: vectorized, llap LLAP IO: no inputs Map 3 @@ -592,10 +592,10 @@ STAGE PLANS: Select Operator expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: string), KEY.reducesinkkey2 (type: int), KEY.reducesinkkey3 (type: string) outputColumnNames: _col0, _col1, _col2, _col3 - Statistics: Num rows: 220 Data size: 41800 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 150 Data size: 28500 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false - Statistics: Num rows: 220 Data size: 41800 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 150 Data size: 28500 Basic stats: COMPLETE Column stats: COMPLETE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat