diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 86d964c..4b76d74 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -815,6 +815,12 @@ public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet, if (first) { // Copy the group key to output batch now. We'll copy in the aggregates at the end of the group. first = false; + + // Evaluate the key expressions of just this first batch to get the correct key. + for (int i = 0; i < outputKeyLength; i++) { + keyExpressions[i].evaluate(batch); + } + groupKeyHelper.copyGroupKey(batch, outputBatch, buffer); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java index 0ff389e..64706ad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java @@ -32,6 +32,8 @@ */ public class VectorGroupKeyHelper extends VectorColumnSetInfo { + private int[] outputColumnNums; + public VectorGroupKeyHelper(int keyCount) { super(keyCount); } @@ -41,12 +43,14 @@ void init(VectorExpression[] keyExpressions) throws HiveException { // NOTE: To support pruning the grouping set id dummy key by VectorGroupbyOpeator MERGE_PARTIAL // case, we use the keyCount passed to the constructor and not keyExpressions.length. - // Inspect the output type of each key expression. + // Inspect the output type of each key expression. And, remember the output columns. + outputColumnNums = new int[keyCount]; for(int i=0; i < keyCount; ++i) { String typeName = VectorizationContext.mapTypeNameSynonyms(keyExpressions[i].getOutputType()); TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName); Type columnVectorType = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo); addKey(columnVectorType); + outputColumnNums[i] = keyExpressions[i].getOutputColumn(); } finishAdding(); } @@ -61,9 +65,9 @@ void init(VectorExpression[] keyExpressions) throws HiveException { public void copyGroupKey(VectorizedRowBatch inputBatch, VectorizedRowBatch outputBatch, DataOutputBuffer buffer) throws HiveException { for(int i = 0; i< longIndices.length; ++i) { - int keyIndex = longIndices[i]; - LongColumnVector inputColumnVector = (LongColumnVector) inputBatch.cols[keyIndex]; - LongColumnVector outputColumnVector = (LongColumnVector) outputBatch.cols[keyIndex]; + final int columnIndex = outputColumnNums[longIndices[i]]; + LongColumnVector inputColumnVector = (LongColumnVector) inputBatch.cols[columnIndex]; + LongColumnVector outputColumnVector = (LongColumnVector) outputBatch.cols[columnIndex]; // This vectorized code pattern says: // If the input batch has no nulls at all (noNulls is true) OR @@ -87,9 +91,9 @@ public void copyGroupKey(VectorizedRowBatch inputBatch, VectorizedRowBatch outpu } } for(int i=0;i retPair = validateAggregationDescs(desc.getAggregators(), processingMode, hasKeys); if (!retPair.left) { diff --git ql/src/test/results/clientpositive/llap/vector_groupby_grouping_id3.q.out ql/src/test/results/clientpositive/llap/vector_groupby_grouping_id3.q.out index 9eb2747..c3809d3 100644 --- ql/src/test/results/clientpositive/llap/vector_groupby_grouping_id3.q.out +++ ql/src/test/results/clientpositive/llap/vector_groupby_grouping_id3.q.out @@ -122,15 +122,29 @@ STAGE PLANS: partitionColumnCount: 0 scratchColumnTypeNames: bigint Reducer 2 - Execution mode: llap + Execution mode: vectorized, llap Reduce Vectorization: enabled: true enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true - notVectorizedReason: Key expression for GROUPBY operator: Non-column key expressions not supported for MERGEPARTIAL - vectorized: false + groupByVectorOutput: true + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 4 + dataColumns: KEY._col0:int, KEY._col1:int, KEY._col2:int, VALUE._col0:bigint + partitionColumnCount: 0 + scratchColumnTypeNames: bigint Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) + Group By Vectorization: + aggregators: VectorUDAFCountMerge(col 3) -> bigint + className: VectorGroupByOperator + vectorOutput: true + keyExpressions: col 0, col 1, ConstantVectorExpression(val 1) -> 4:long + native: false + projectedOutputColumns: [0] keys: KEY._col0 (type: int), KEY._col1 (type: int), 1 (type: int) mode: mergepartial outputColumnNames: _col0, _col1, _col3 @@ -139,9 +153,17 @@ STAGE PLANS: Select Operator expressions: _col0 (type: int), _col1 (type: int), 1 (type: int), _col3 (type: bigint) outputColumnNames: _col0, _col1, _col2, _col3 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumns: [0, 1, 3, 2] + selectExpressions: ConstantVectorExpression(val 1) -> 3:long Statistics: Num rows: 3 Data size: 20 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + File Sink Vectorization: + className: VectorFileSinkOperator + native: false Statistics: Num rows: 3 Data size: 20 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat