diff --git a/metastore/bin/.gitignore b/metastore/bin/.gitignore index 0dd9890..4f00cd9 100644 --- a/metastore/bin/.gitignore +++ b/metastore/bin/.gitignore @@ -1 +1 @@ -# Dummy file to make Git recognize this empty directory +/src/ diff --git a/ql/.gitignore b/ql/.gitignore index 916e17c..3285bd9 100644 --- a/ql/.gitignore +++ b/ql/.gitignore @@ -1 +1,2 @@ dependency-reduced-pom.xml +/bin/ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java index 46854e6..ba04410 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java @@ -31,6 +31,7 @@ import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.RelFactories.ProjectFactory; import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexInputRef; @@ -47,7 +48,9 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ExprNodeConverter; import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; @@ -613,5 +616,45 @@ public Void visitCall(org.apache.calcite.rex.RexCall call) { } return bldr.build(); + } + + public static ImmutableList getInputRef(List inputRefs, RelNode inputRel) { + ImmutableList.Builder bldr = ImmutableList. builder(); + for (int i : inputRefs) { + bldr.add(new RexInputRef(i, (RelDataType) inputRel.getRowType().getFieldList().get(i).getType())); + } + return bldr.build(); + } + + public static ExprNodeDesc getExprNode(Integer inputRefIndx, RelNode inputRel, + ExprNodeConverter exprConv) { + ExprNodeDesc exprNode = null; + RexNode rexInputRef = new RexInputRef(inputRefIndx, (RelDataType) inputRel.getRowType() + .getFieldList().get(inputRefIndx).getType()); + exprNode = rexInputRef.accept(exprConv); + + return exprNode; + } + + public static List getExprNodes(List inputRefs, RelNode inputRel, + String inputTabAlias) { + List exprNodes = new ArrayList(); + List rexInputRefs = getInputRef(inputRefs, inputRel); + // TODO: Change ExprNodeConverter to be independent of Partition Expr + ExprNodeConverter exprConv = new ExprNodeConverter(inputTabAlias, inputRel.getRowType(), false, inputRel.getCluster().getTypeFactory()); + for (RexNode iRef : rexInputRefs) { + exprNodes.add(iRef.accept(exprConv)); + } + return exprNodes; + } + + public static List getFieldNames(List inputRefs, RelNode inputRel) { + List fieldNames = new ArrayList(); + List schemaNames = inputRel.getRowType().getFieldNames(); + for (Integer iRef : inputRefs) { + fieldNames.add(schemaNames.get(iRef)); + } + + return fieldNames; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java index 8f981f2..e868e37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java @@ -266,8 +266,8 @@ public void computePartitionList(HiveConf conf, RexNode pruneNode) { } // We have valid pruning expressions, only retrieve qualifying partitions - ExprNodeDesc pruneExpr = pruneNode - .accept(new ExprNodeConverter(getName(), getRowType(), true)); + ExprNodeDesc pruneExpr = pruneNode.accept(new ExprNodeConverter(getName(), getRowType(), + true, this.getRelOptSchema().getTypeFactory())); partitionList = PartitionPruner.prune(hiveTblMetadata, pruneExpr, conf, getName(), partitionCache); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java index 5cccd3c..4058dcf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java @@ -26,6 +26,7 @@ import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexFieldCollation; @@ -36,6 +37,8 @@ import org.apache.calcite.rex.RexVisitorImpl; import org.apache.calcite.rex.RexWindow; import org.apache.calcite.rex.RexWindowBound; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter.Schema; @@ -72,20 +75,22 @@ RelDataType outputRowType; boolean partitioningExpr; WindowFunctionSpec wfs; + private final RelDataTypeFactory dTFactory; public ExprNodeConverter(String tabAlias, RelDataType inputRowType, - boolean partitioningExpr) { - this(tabAlias, null, inputRowType, null, partitioningExpr); + boolean partitioningExpr, RelDataTypeFactory dTFactory) { + this(tabAlias, null, inputRowType, null, partitioningExpr, dTFactory); } public ExprNodeConverter(String tabAlias, String columnAlias, RelDataType inputRowType, - RelDataType outputRowType, boolean partitioningExpr) { + RelDataType outputRowType, boolean partitioningExpr, RelDataTypeFactory dTFactory) { super(true); this.tabAlias = tabAlias; this.columnAlias = columnAlias; this.inputRowType = inputRowType; this.outputRowType = outputRowType; this.partitioningExpr = partitioningExpr; + this.dTFactory = dTFactory; } public WindowFunctionSpec getWindowFunctionSpec() { @@ -116,9 +121,15 @@ public ExprNodeDesc visitCall(RexCall call) { args.add(operand.accept(this)); } - // If Expr is flat (and[p,q,r,s] or[p,q,r,s]) then recursively build the - // exprnode - if (ASTConverter.isFlat(call)) { + // If Call is a redundant cast then bail out. Ex: cast(true)BOOLEAN + if (call.isA(SqlKind.CAST) + && (call.operands.size() == 1) + && SqlTypeUtil.equalSansNullability(dTFactory, call.getType(), + call.operands.get(0).getType())) { + return args.get(0); + } else if (ASTConverter.isFlat(call)) { + // If Expr is flat (and[p,q,r,s] or[p,q,r,s]) then recursively build the + // exprnode ArrayList tmpExprArgs = new ArrayList(); tmpExprArgs.addAll(args.subList(0, 2)); gfDesc = new ExprNodeGenericFuncDesc(TypeConverter.convert(call.getType()), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java new file mode 100644 index 0000000..bc87b36 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java @@ -0,0 +1,1237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.calcite.translator; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter.OpAttr; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.GenericUDAFInfo; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +import com.google.common.collect.ImmutableList; + +/** + * TODO:
+ * 1. Change the output col/ExprNodeColumn names to external names.
+ * 2. Verify if we need to use the "KEY."/"VALUE." in RS cols; switch to + * external names if possible.
+ * 3. In ExprNode & in ColumnInfo the tableAlias/VirtualColumn is specified + * differently for different GB/RS in pipeline. Remove the different treatments. + * 3. VirtualColMap needs to be maintained + * + */ +public class HiveGBOpConvUtil { + private static enum HIVEGBPHYSICALMODE { + MAP_SIDE_GB_NO_SKEW_NO_ADD_MR_JOB, MAP_SIDE_GB_NO_SKEW_ADD_MR_JOB, MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT, MAP_SIDE_GB_SKEW_GBKEYS_AND_DIST_UDAF_NOT_PRESENT, NO_MAP_SIDE_GB_NO_SKEW, NO_MAP_SIDE_GB_SKEW + }; + + private static class UDAFAttrs { + private boolean isDistinctUDAF; + private String udafName; + private GenericUDAFEvaluator udafEvaluator; + private ArrayList udafParams = new ArrayList(); + private List udafParamsIndxInGBInfoDistExprs = new ArrayList(); + }; + + private static class GBInfo { + private List outputColNames = new ArrayList(); + + private List gbKeyColNamesInInput = new ArrayList(); + private List gbKeyTypes = new ArrayList(); + private List gbKeys = new ArrayList(); + + private List grpSets = new ArrayList(); + private boolean grpSetRqrAdditionalMRJob; + private boolean grpIdFunctionNeeded; + + private List distExprNames = new ArrayList(); + private List distExprTypes = new ArrayList(); + private List distExprNodes = new ArrayList(); + private List> distColIndices = new ArrayList>(); + + private List deDupedNonDistIrefs = new ArrayList(); + + private List udafAttrs = new ArrayList(); + private boolean containsDistinctAggr = false; + + float groupByMemoryUsage; + float memoryThreshold; + + private HIVEGBPHYSICALMODE gbPhysicalPipelineMode; + }; + + private static HIVEGBPHYSICALMODE getAggOPMode(HiveConf hc, GBInfo gbInfo) { + HIVEGBPHYSICALMODE gbPhysicalPipelineMode = HIVEGBPHYSICALMODE.MAP_SIDE_GB_NO_SKEW_NO_ADD_MR_JOB; + + if (hc.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) { + if (!hc.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { + if (!gbInfo.grpSetRqrAdditionalMRJob) { + gbPhysicalPipelineMode = HIVEGBPHYSICALMODE.MAP_SIDE_GB_NO_SKEW_NO_ADD_MR_JOB; + } else { + gbPhysicalPipelineMode = HIVEGBPHYSICALMODE.MAP_SIDE_GB_NO_SKEW_ADD_MR_JOB; + } + } else { + if (gbInfo.containsDistinctAggr || !gbInfo.gbKeys.isEmpty()) { + gbPhysicalPipelineMode = HIVEGBPHYSICALMODE.MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT; + } else { + gbPhysicalPipelineMode = HIVEGBPHYSICALMODE.MAP_SIDE_GB_SKEW_GBKEYS_AND_DIST_UDAF_NOT_PRESENT; + } + } + } else { + if (!hc.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { + gbPhysicalPipelineMode = HIVEGBPHYSICALMODE.NO_MAP_SIDE_GB_NO_SKEW; + } else { + gbPhysicalPipelineMode = HIVEGBPHYSICALMODE.NO_MAP_SIDE_GB_SKEW; + } + } + + return gbPhysicalPipelineMode; + } + + // For each of the GB op in the logical GB this should be called seperately; + // otherwise GBevaluator and expr nodes may get shared among multiple GB ops + private static GBInfo getGBInfo(HiveAggregate aggRel, OpAttr inputOpAf, HiveConf hc) { + GBInfo gbInfo = new GBInfo(); + + // 0. Collect AggRel output col Names + gbInfo.outputColNames.addAll(aggRel.getRowType().getFieldNames()); + + // 1. Collect GB Keys + RelNode aggInputRel = aggRel.getInput(); + ExprNodeConverter exprConv = new ExprNodeConverter(inputOpAf.tabAlias, + aggInputRel.getRowType(), false, aggRel.getCluster().getTypeFactory()); + + ExprNodeDesc tmpExprNodeDesc; + for (int i : aggRel.getGroupSet()) { + RexInputRef iRef = new RexInputRef(i, (RelDataType) aggInputRel.getRowType().getFieldList() + .get(i).getType()); + tmpExprNodeDesc = iRef.accept(exprConv); + gbInfo.gbKeys.add(tmpExprNodeDesc); + gbInfo.gbKeyColNamesInInput.add(aggInputRel.getRowType().getFieldNames().get(i)); + gbInfo.gbKeyTypes.add(tmpExprNodeDesc.getTypeInfo()); + } + + // 2. Collect Grouping Set info + if (aggRel.indicator) { + // 2.1 Translate Grouping set col bitset + ImmutableList lstGrpSet = aggRel.getGroupSets(); + int bitmap = 0; + for (ImmutableBitSet grpSet : lstGrpSet) { + bitmap = 0; + for (Integer bitIdx : grpSet.asList()) { + bitmap = SemanticAnalyzer.setBit(bitmap, bitIdx); + } + gbInfo.grpSets.add(bitmap); + } + Collections.sort(gbInfo.grpSets); + + // 2.2 Check if GRpSet require additional MR Job + gbInfo.grpSetRqrAdditionalMRJob = gbInfo.grpSets.size() > hc + .getIntVar(HiveConf.ConfVars.HIVE_NEW_JOB_GROUPING_SET_CARDINALITY); + + // 2.3 Check if GROUPING_ID needs to be projected out + if (!aggRel.getAggCallList().isEmpty() + && (aggRel.getAggCallList().get(aggRel.getAggCallList().size() - 1).getAggregation() == HiveGroupingID.INSTANCE)) { + gbInfo.grpIdFunctionNeeded = true; + } + } + + // 3. Walk through UDAF & Collect Distinct Info + Set distinctRefs = new HashSet(); + Map distParamInRefsToOutputPos = new HashMap(); + for (AggregateCall aggCall : aggRel.getAggCallList()) { + if ((aggCall.getAggregation() == HiveGroupingID.INSTANCE) || !aggCall.isDistinct()) { + continue; + } + + List argLst = new ArrayList(aggCall.getArgList()); + List argNames = HiveCalciteUtil.getFieldNames(argLst, aggInputRel); + ExprNodeDesc distinctExpr; + for (int i = 0; i < argLst.size(); i++) { + if (!distinctRefs.contains(argLst.get(i))) { + distinctRefs.add(argLst.get(i)); + distParamInRefsToOutputPos.put(argLst.get(i), gbInfo.distExprNodes.size()); + distinctExpr = HiveCalciteUtil.getExprNode(argLst.get(i), aggInputRel, exprConv); + gbInfo.distExprNodes.add(distinctExpr); + gbInfo.distExprNames.add(argNames.get(i)); + gbInfo.distExprTypes.add(distinctExpr.getTypeInfo()); + } + } + } + + // 4. Walk through UDAF & Collect UDAF Info + Set deDupedNonDistIrefsSet = new HashSet(); + for (AggregateCall aggCall : aggRel.getAggCallList()) { + if (aggCall.getAggregation() == HiveGroupingID.INSTANCE) { + continue; + } + + UDAFAttrs udafAttrs = new UDAFAttrs(); + udafAttrs.udafParams.addAll(HiveCalciteUtil.getExprNodes(aggCall.getArgList(), aggInputRel, + inputOpAf.tabAlias)); + udafAttrs.udafName = aggCall.getAggregation().getName(); + udafAttrs.isDistinctUDAF = aggCall.isDistinct(); + List argLst = new ArrayList(aggCall.getArgList()); + List distColIndicesOfUDAF = new ArrayList(); + List distUDAFParamsIndxInDistExprs = new ArrayList(); + for (int i = 0; i < argLst.size(); i++) { + // NOTE: distinct expr can not be part of of GB key (we assume plan + // gen would have prevented it) + if (udafAttrs.isDistinctUDAF) { + distColIndicesOfUDAF.add(distParamInRefsToOutputPos.get(argLst.get(i))); + distUDAFParamsIndxInDistExprs.add(distParamInRefsToOutputPos.get(argLst.get(i))); + } else { + // TODO: this seems wrong (following what Hive Regular does) + if (!distParamInRefsToOutputPos.containsKey(argLst.get(i)) + && !deDupedNonDistIrefsSet.contains(argLst.get(i))) { + deDupedNonDistIrefsSet.add(i); + gbInfo.deDupedNonDistIrefs.add(udafAttrs.udafParams.get(i)); + } + } + } + + if (udafAttrs.isDistinctUDAF) { + gbInfo.containsDistinctAggr = true; + + udafAttrs.udafParamsIndxInGBInfoDistExprs = distUDAFParamsIndxInDistExprs; + gbInfo.distColIndices.add(distColIndicesOfUDAF); + } + try { + udafAttrs.udafEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator(udafAttrs.udafName, + new ArrayList(udafAttrs.udafParams), new ASTNode(), + udafAttrs.isDistinctUDAF, false); + } catch (SemanticException e) { + throw new RuntimeException(e); + } + gbInfo.udafAttrs.add(udafAttrs); + } + + // 4. Gather GB Memory threshold + gbInfo.groupByMemoryUsage = HiveConf.getFloatVar(hc, HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY); + gbInfo.memoryThreshold = HiveConf.getFloatVar(hc, HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD); + + // 5. Gather GB Physical pipeline (based on user config & Grping Sets size) + gbInfo.gbPhysicalPipelineMode = getAggOPMode(hc, gbInfo); + + return gbInfo; + } + + static OpAttr translateGB(OpAttr inputOpAf, HiveAggregate aggRel, HiveConf hc) + throws SemanticException { + OpAttr translatedGBOpAttr = null; + GBInfo gbInfo = getGBInfo(aggRel, inputOpAf, hc); + + switch (gbInfo.gbPhysicalPipelineMode) { + case MAP_SIDE_GB_NO_SKEW_NO_ADD_MR_JOB: + translatedGBOpAttr = genMapSideGBNoSkewNoAddMRJob(inputOpAf, aggRel, gbInfo); + break; + case MAP_SIDE_GB_NO_SKEW_ADD_MR_JOB: + translatedGBOpAttr = genMapSideGBNoSkewAddMRJob(inputOpAf, aggRel, gbInfo); + break; + case MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT: + translatedGBOpAttr = genMapSideGBSkewGBKeysOrDistUDAFPresent(inputOpAf, aggRel, gbInfo); + break; + case MAP_SIDE_GB_SKEW_GBKEYS_AND_DIST_UDAF_NOT_PRESENT: + translatedGBOpAttr = genMapSideGBSkewGBKeysAndDistUDAFNotPresent(inputOpAf, aggRel, gbInfo); + break; + case NO_MAP_SIDE_GB_NO_SKEW: + translatedGBOpAttr = genNoMapSideGBNoSkew(inputOpAf, aggRel, gbInfo); + break; + case NO_MAP_SIDE_GB_SKEW: + translatedGBOpAttr = genNoMapSideGBSkew(inputOpAf, aggRel, gbInfo); + break; + } + + return translatedGBOpAttr; + } + + /** + * GB-RS-GB1 + * + * Construct GB-RS-GB Pipe line. User has enabled Map Side GB, specified no + * skew and Grp Set is below the threshold. + * + * @param inputOpAf + * @param aggRel + * @param gbInfo + * @return + * @throws SemanticException + */ + private static OpAttr genMapSideGBNoSkewNoAddMRJob(OpAttr inputOpAf, HiveAggregate aggRel, + GBInfo gbInfo) throws SemanticException { + OpAttr mapSideGB = null; + OpAttr mapSideRS = null; + OpAttr reduceSideGB = null; + + // 1. Insert MapSide GB + mapSideGB = genMapSideGB(inputOpAf, gbInfo); + + // 2. Insert MapSide RS + mapSideRS = genMapSideGBRS(mapSideGB, gbInfo); + + // 3. Insert ReduceSide GB + reduceSideGB = genReduceSideGB1(mapSideRS, gbInfo, false, false, GroupByDesc.Mode.MERGEPARTIAL); + + return reduceSideGB; + } + + /** + * GB-RS-GB1-RS-GB2 + */ + private static OpAttr genGBRSGBRSGBOpPipeLine(OpAttr inputOpAf, HiveAggregate aggRel, + GBInfo gbInfo) throws SemanticException { + OpAttr mapSideGB = null; + OpAttr mapSideRS = null; + OpAttr reduceSideGB1 = null; + OpAttr reduceSideRS = null; + OpAttr reduceSideGB2 = null; + + // 1. Insert MapSide GB + mapSideGB = genMapSideGB(inputOpAf, gbInfo); + + // 2. Insert MapSide RS + mapSideRS = genMapSideGBRS(mapSideGB, gbInfo); + + // 3. Insert ReduceSide GB1 + boolean computeGrpSet = (gbInfo.gbPhysicalPipelineMode == HIVEGBPHYSICALMODE.MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT) ? false : true; + reduceSideGB1 = genReduceSideGB1(mapSideRS, gbInfo, computeGrpSet, false, GroupByDesc.Mode.PARTIALS); + + // 4. Insert RS on reduce side with Reduce side GB as input + reduceSideRS = genReduceGBRS(reduceSideGB1, gbInfo); + + // 5. Insert ReduceSide GB2 + reduceSideGB2 = genReduceSideGB2(reduceSideRS, gbInfo); + + return reduceSideGB2; + } + + /** + * GB-RS-GB1-RS-GB2 + * + * @param inputOpAf + * @param aggRel + * @param gbInfo + * @return + * @throws SemanticException + */ + private static OpAttr genMapSideGBNoSkewAddMRJob(OpAttr inputOpAf, HiveAggregate aggRel, + GBInfo gbInfo) throws SemanticException { + // 1. Sanity check + if (gbInfo.containsDistinctAggr) { + String errorMsg = "The number of rows per input row due to grouping sets is " + + gbInfo.grpSets.size(); + throw new SemanticException( + ErrorMsg.HIVE_GROUPING_SETS_THRESHOLD_NOT_ALLOWED_WITH_DISTINCTS.getMsg(errorMsg)); + } + + // 2. Gen GB-RS-GB-RS-GB pipeline + return genGBRSGBRSGBOpPipeLine(inputOpAf, aggRel, gbInfo); + } + + /** + * GB-RS-GB1-RS-GB2 + * + * @param inputOpAf + * @param aggRel + * @param gbInfo + * @return + * @throws SemanticException + */ + private static OpAttr genMapSideGBSkewGBKeysOrDistUDAFPresent(OpAttr inputOpAf, + HiveAggregate aggRel, GBInfo gbInfo) throws SemanticException { + // 1. Sanity check + if (gbInfo.grpSetRqrAdditionalMRJob) { + String errorMsg = "The number of rows per input row due to grouping sets is " + + gbInfo.grpSets.size(); + throw new SemanticException( + ErrorMsg.HIVE_GROUPING_SETS_THRESHOLD_NOT_ALLOWED_WITH_SKEW.getMsg(errorMsg)); + } + + // 2. Gen GB-RS-GB-RS-GB pipeline + return genGBRSGBRSGBOpPipeLine(inputOpAf, aggRel, gbInfo); + } + + /** + * GB-RS-GB2 + * + * @param inputOpAf + * @param aggRel + * @param gbInfo + * @return + * @throws SemanticException + */ + private static OpAttr genMapSideGBSkewGBKeysAndDistUDAFNotPresent(OpAttr inputOpAf, + HiveAggregate aggRel, GBInfo gbInfo) throws SemanticException { + OpAttr mapSideGB = null; + OpAttr mapSideRS = null; + OpAttr reduceSideGB2 = null; + + // 1. Sanity check + if (gbInfo.grpSetRqrAdditionalMRJob) { + String errorMsg = "The number of rows per input row due to grouping sets is " + + gbInfo.grpSets.size(); + throw new SemanticException( + ErrorMsg.HIVE_GROUPING_SETS_THRESHOLD_NOT_ALLOWED_WITH_SKEW.getMsg(errorMsg)); + } + + // 1. Insert MapSide GB + mapSideGB = genMapSideGB(inputOpAf, gbInfo); + + // 2. Insert MapSide RS + mapSideRS = genMapSideGBRS(mapSideGB, gbInfo); + + // 3. Insert ReduceSide GB2 + reduceSideGB2 = genReduceSideGB2(mapSideRS, gbInfo); + + return reduceSideGB2; + } + + /** + * RS-Gb1 + * + * @param inputOpAf + * @param aggRel + * @param gbInfo + * @return + * @throws SemanticException + */ + private static OpAttr genNoMapSideGBNoSkew(OpAttr inputOpAf, HiveAggregate aggRel, GBInfo gbInfo) + throws SemanticException { + OpAttr mapSideRS = null; + OpAttr reduceSideGB1NoMapGB = null; + + // 1. Insert MapSide RS + mapSideRS = genMapSideRS(inputOpAf, gbInfo); + + // 2. Insert ReduceSide GB + reduceSideGB1NoMapGB = genReduceSideGB1NoMapGB(mapSideRS, gbInfo, GroupByDesc.Mode.COMPLETE); + + return reduceSideGB1NoMapGB; + } + + /** + * RS-GB1-RS-GB2 + * + * @param inputOpAf + * @param aggRel + * @param gbInfo + * @return + * @throws SemanticException + */ + private static OpAttr genNoMapSideGBSkew(OpAttr inputOpAf, HiveAggregate aggRel, GBInfo gbInfo) + throws SemanticException { + OpAttr mapSideRS = null; + OpAttr reduceSideGB1NoMapGB = null; + OpAttr reduceSideRS = null; + OpAttr reduceSideGB2 = null; + + // 1. Insert MapSide RS + mapSideRS = genMapSideRS(inputOpAf, gbInfo); + + // 2. Insert ReduceSide GB + reduceSideGB1NoMapGB = genReduceSideGB1NoMapGB(mapSideRS, gbInfo, GroupByDesc.Mode.PARTIAL1); + + // 3. Insert RS on reduce side with Reduce side GB as input + reduceSideRS = genReduceGBRS(reduceSideGB1NoMapGB, gbInfo); + + // 4. Insert ReduceSide GB2 + reduceSideGB2 = genReduceSideGB2(reduceSideRS, gbInfo); + + return reduceSideGB2; + } + + private static int getParallelismForReduceSideRS(GBInfo gbInfo) { + int degreeOfParallelism = 0; + + switch (gbInfo.gbPhysicalPipelineMode) { + case MAP_SIDE_GB_NO_SKEW_ADD_MR_JOB: + case MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT: + case NO_MAP_SIDE_GB_SKEW: + if (gbInfo.gbKeys.isEmpty()) { + degreeOfParallelism = 1; + } else { + degreeOfParallelism = -1; + } + break; + default: + throw new RuntimeException( + "Unable to determine Reducer Parallelism - Invalid Physical Mode: " + + gbInfo.gbPhysicalPipelineMode); + } + + return degreeOfParallelism; + } + + private static int getParallelismForMapSideRS(GBInfo gbInfo) { + int degreeOfParallelism = 0; + + switch (gbInfo.gbPhysicalPipelineMode) { + case MAP_SIDE_GB_NO_SKEW_NO_ADD_MR_JOB: + case MAP_SIDE_GB_NO_SKEW_ADD_MR_JOB: + case NO_MAP_SIDE_GB_NO_SKEW: + if (gbInfo.gbKeys.isEmpty()) { + degreeOfParallelism = 1; + } else { + degreeOfParallelism = -1; + } + break; + case NO_MAP_SIDE_GB_SKEW: + case MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT: + degreeOfParallelism = -1; + break; + case MAP_SIDE_GB_SKEW_GBKEYS_AND_DIST_UDAF_NOT_PRESENT: + degreeOfParallelism = 1; + break; + default: + throw new RuntimeException( + "Unable to determine Reducer Parallelism - Invalid Physical Mode: " + + gbInfo.gbPhysicalPipelineMode); + } + + return degreeOfParallelism; + } + + private static int getNumPartFieldsForReduceSideRS(GBInfo gbInfo) { + int numPartFields = 0; + + switch (gbInfo.gbPhysicalPipelineMode) { + case MAP_SIDE_GB_NO_SKEW_ADD_MR_JOB: + numPartFields = gbInfo.gbKeys.size() + 1; + break; + case MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT: + case NO_MAP_SIDE_GB_SKEW: + numPartFields = gbInfo.gbKeys.size(); + break; + default: + throw new RuntimeException( + "Unable to determine Number of Partition Fields - Invalid Physical Mode: " + + gbInfo.gbPhysicalPipelineMode); + } + + return numPartFields; + } + + private static int getNumPartFieldsForMapSideRS(GBInfo gbInfo) { + int numPartFields = 0; + + switch (gbInfo.gbPhysicalPipelineMode) { + case MAP_SIDE_GB_NO_SKEW_NO_ADD_MR_JOB: + case MAP_SIDE_GB_NO_SKEW_ADD_MR_JOB: + case MAP_SIDE_GB_SKEW_GBKEYS_AND_DIST_UDAF_NOT_PRESENT: + case NO_MAP_SIDE_GB_NO_SKEW: + numPartFields += gbInfo.gbKeys.size(); + break; + case NO_MAP_SIDE_GB_SKEW: + case MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT: + if (gbInfo.containsDistinctAggr) { + numPartFields = Integer.MAX_VALUE; + } else { + numPartFields = -1; + } + break; + default: + throw new RuntimeException( + "Unable to determine Number of Partition Fields - Invalid Physical Mode: " + + gbInfo.gbPhysicalPipelineMode); + } + + return numPartFields; + } + + private static boolean inclGrpSetInReduceSide(GBInfo gbInfo) { + boolean inclGrpSet = false; + + if (gbInfo.grpSets.size() > 0 + && (gbInfo.gbPhysicalPipelineMode == HIVEGBPHYSICALMODE.MAP_SIDE_GB_NO_SKEW_ADD_MR_JOB || gbInfo.gbPhysicalPipelineMode == HIVEGBPHYSICALMODE.MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT)) { + inclGrpSet = true; + } + + return inclGrpSet; + } + + private static boolean inclGrpSetInMapSide(GBInfo gbInfo) { + boolean inclGrpSet = false; + + if (gbInfo.grpSets.size() > 0 + && ((gbInfo.gbPhysicalPipelineMode == HIVEGBPHYSICALMODE.MAP_SIDE_GB_NO_SKEW_NO_ADD_MR_JOB) || + gbInfo.gbPhysicalPipelineMode == HIVEGBPHYSICALMODE.MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT)) { + inclGrpSet = true; + } + + return inclGrpSet; + } + + private static OpAttr genReduceGBRS(OpAttr inputOpAf, GBInfo gbInfo) throws SemanticException { + Map colExprMap = new HashMap(); + ArrayList outputColumnNames = new ArrayList(); + ArrayList colInfoLst = new ArrayList(); + GroupByOperator reduceSideGB1 = (GroupByOperator) inputOpAf.inputs.get(0); + List gb1ColInfoLst = reduceSideGB1.getSchema().getSignature(); + + ArrayList reduceKeys = getReduceKeysForRS(reduceSideGB1, 0, + gbInfo.gbKeys.size() - 1, outputColumnNames, false, colInfoLst, colExprMap, true, true); + if (inclGrpSetInReduceSide(gbInfo)) { + addGrpSetCol(false, gb1ColInfoLst.get(reduceKeys.size()).getInternalName(), true, reduceKeys, + outputColumnNames, colInfoLst, colExprMap); + } + + ArrayList reduceValues = getValueKeysForRS(reduceSideGB1, reduceSideGB1.getConf() + .getKeys().size(), outputColumnNames, colInfoLst, colExprMap, true, true); + + ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(PlanUtils + .getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, true, -1, + getNumPartFieldsForReduceSideRS(gbInfo), getParallelismForReduceSideRS(gbInfo), + AcidUtils.Operation.NOT_ACID), new RowSchema(colInfoLst), reduceSideGB1); + + rsOp.setColumnExprMap(colExprMap); + + return new OpAttr("", new HashMap(), rsOp); + } + + private static OpAttr genMapSideGBRS(OpAttr inputOpAf, GBInfo gbInfo) throws SemanticException { + Map colExprMap = new HashMap(); + List outputKeyColumnNames = new ArrayList(); + List outputValueColumnNames = new ArrayList(); + ArrayList colInfoLst = new ArrayList(); + GroupByOperator mapGB = (GroupByOperator) inputOpAf.inputs.get(0); + int distColStartIndx = gbInfo.gbKeys.size() + (gbInfo.grpSets.size() > 0 ? 1 : 0); + + ArrayList reduceKeys = getReduceKeysForRS(mapGB, 0, gbInfo.gbKeys.size() - 1, + outputKeyColumnNames, false, colInfoLst, colExprMap, false, false); + int keyLength = reduceKeys.size(); + + if (inclGrpSetInMapSide(gbInfo)) { + addGrpSetCol(false, SemanticAnalyzer.getColumnInternalName(reduceKeys.size()), true, + reduceKeys, outputKeyColumnNames, colInfoLst, colExprMap); + keyLength++; + } + if (mapGB.getConf().getKeys().size() > reduceKeys.size()) { + // NOTE: All dist cols have single output col name; + reduceKeys.addAll(getReduceKeysForRS(mapGB, reduceKeys.size(), mapGB.getConf().getKeys() + .size() - 1, outputKeyColumnNames, true, colInfoLst, colExprMap, false, false)); + } + + ArrayList reduceValues = getValueKeysForRS(mapGB, mapGB.getConf().getKeys() + .size(), outputValueColumnNames, colInfoLst, colExprMap, false, false); + List> distinctColIndices = getDistColIndices(gbInfo, distColStartIndx); + + ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(PlanUtils + .getReduceSinkDesc(reduceKeys, keyLength, reduceValues, distinctColIndices, + outputKeyColumnNames, outputValueColumnNames, true, -1, + getNumPartFieldsForMapSideRS(gbInfo), getParallelismForMapSideRS(gbInfo), + AcidUtils.Operation.NOT_ACID), new RowSchema(colInfoLst), mapGB); + + rsOp.setColumnExprMap(colExprMap); + + return new OpAttr("", new HashMap(), rsOp); + } + + private static OpAttr genMapSideRS(OpAttr inputOpAf, GBInfo gbInfo) throws SemanticException { + Map colExprMap = new HashMap(); + List outputKeyColumnNames = new ArrayList(); + List outputValueColumnNames = new ArrayList(); + ArrayList colInfoLst = new ArrayList(); + int distColStartIndx = gbInfo.gbKeys.size() + (gbInfo.grpSets.size() > 0 ? 1 : 0); + String outputColName; + + // 1. Add GB Keys to reduce keys + ArrayList reduceKeys = getReduceKeysForRS(inputOpAf.inputs.get(0), 0, + gbInfo.gbKeys.size() - 1, outputKeyColumnNames, false, colInfoLst, colExprMap, false, false); + int keyLength = reduceKeys.size(); + + // 2. Add Dist UDAF args to reduce keys + if (gbInfo.containsDistinctAggr) { + // TODO: Why is this needed (doesn't represent any cols) + String udafName = SemanticAnalyzer.getColumnInternalName(reduceKeys.size()); + outputKeyColumnNames.add(udafName); + for (int i = 0; i < gbInfo.distExprNodes.size(); i++) { + reduceKeys.add(gbInfo.distExprNodes.get(i)); + outputColName = SemanticAnalyzer.getColumnInternalName(i); + String field = Utilities.ReduceField.KEY.toString() + "." + udafName + ":" + i + "." + + outputColName; + ColumnInfo colInfo = new ColumnInfo(field, gbInfo.distExprNodes.get(i).getTypeInfo(), null, + false); + colInfoLst.add(colInfo); + colExprMap.put(field, gbInfo.distExprNodes.get(i)); + } + } + + // 3. Add UDAF args deduped to reduce values + ArrayList reduceValues = new ArrayList(); + for (int i = 0; i < gbInfo.deDupedNonDistIrefs.size(); i++) { + reduceValues.add(gbInfo.deDupedNonDistIrefs.get(i)); + outputColName = SemanticAnalyzer.getColumnInternalName(reduceValues.size() - 1); + outputValueColumnNames.add(outputColName); + String field = Utilities.ReduceField.VALUE.toString() + "." + outputColName; + colInfoLst.add(new ColumnInfo(field, reduceValues.get(reduceValues.size() - 1).getTypeInfo(), + null, false)); + colExprMap.put(field, reduceValues.get(reduceValues.size() - 1)); + } + + // 4. Gen RS + ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(PlanUtils + .getReduceSinkDesc(reduceKeys, keyLength, reduceValues, + getDistColIndices(gbInfo, distColStartIndx), outputKeyColumnNames, + outputValueColumnNames, true, -1, getNumPartFieldsForMapSideRS(gbInfo), + getParallelismForMapSideRS(gbInfo), AcidUtils.Operation.NOT_ACID), new RowSchema( + colInfoLst), inputOpAf.inputs.get(0)); + + rsOp.setColumnExprMap(colExprMap); + + return new OpAttr("", new HashMap(), rsOp); + } + + private static OpAttr genReduceSideGB2(OpAttr inputOpAf, GBInfo gbInfo) throws SemanticException { + ArrayList outputColNames = new ArrayList(); + ArrayList colInfoLst = new ArrayList(); + Map colExprMap = new HashMap(); + String colOutputName = null; + ReduceSinkOperator rs = (ReduceSinkOperator) inputOpAf.inputs.get(0); + List rsColInfoLst = rs.getSchema().getSignature(); + ColumnInfo ci; + + // 1. Build GB Keys, grouping set starting position + // 1.1 First Add original GB Keys + ArrayList gbKeys = ExprNodeDescUtils.genExprNodeDesc(rs, 0, + gbInfo.gbKeys.size() - 1, false, false); + for (int i = 0; i < gbInfo.gbKeys.size(); i++) { + ci = rsColInfoLst.get(i); + colOutputName = gbInfo.outputColNames.get(i); + outputColNames.add(colOutputName); + colInfoLst.add(new ColumnInfo(colOutputName, ci.getType(), "", false)); + colExprMap.put(colOutputName, gbKeys.get(i)); + } + // 1.2 Add GrpSet Col + int groupingSetsPosition = -1; + if (inclGrpSetInReduceSide(gbInfo) && gbInfo.grpIdFunctionNeeded) { + groupingSetsPosition = gbKeys.size(); + ExprNodeDesc grpSetColExpr = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, + rsColInfoLst.get(groupingSetsPosition).getInternalName(), null, false); + gbKeys.add(grpSetColExpr); + colOutputName = gbInfo.outputColNames.get(gbInfo.outputColNames.size() - 1); + ; + outputColNames.add(colOutputName); + colInfoLst.add(new ColumnInfo(colOutputName, TypeInfoFactory.stringTypeInfo, null, true)); + colExprMap.put(colOutputName, grpSetColExpr); + } + + // 2. Add UDAF + UDAFAttrs udafAttr; + ArrayList aggregations = new ArrayList(); + int udafStartPosInGBInfOutputColNames = gbInfo.grpSets.isEmpty() ? gbInfo.gbKeys.size() + : gbInfo.gbKeys.size() * 2; + int udafStartPosInInputRS = gbInfo.grpSets.isEmpty() ? gbInfo.gbKeys.size() : gbInfo.gbKeys.size() + 1; + + for (int i = 0; i < gbInfo.udafAttrs.size(); i++) { + udafAttr = gbInfo.udafAttrs.get(i); + ArrayList aggParameters = new ArrayList(); + aggParameters.add(new ExprNodeColumnDesc(rsColInfoLst.get(udafStartPosInInputRS + i))); + colOutputName = gbInfo.outputColNames.get(udafStartPosInGBInfOutputColNames + i); + outputColNames.add(colOutputName); + Mode udafMode = SemanticAnalyzer.groupByDescModeToUDAFMode(GroupByDesc.Mode.FINAL, + udafAttr.isDistinctUDAF); + GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo(udafAttr.udafEvaluator, udafMode, + aggParameters); + aggregations.add(new AggregationDesc(udafAttr.udafName.toLowerCase(), + udaf.genericUDAFEvaluator, udaf.convertedParameters, false, udafMode)); + colInfoLst.add(new ColumnInfo(colOutputName, udaf.returnType, "", false)); + } + + Operator rsGBOp2 = OperatorFactory.getAndMakeChild(new GroupByDesc(GroupByDesc.Mode.FINAL, + outputColNames, gbKeys, aggregations, false, gbInfo.groupByMemoryUsage, + gbInfo.memoryThreshold, null, false, groupingSetsPosition, gbInfo.containsDistinctAggr), + new RowSchema(colInfoLst), rs); + + rsGBOp2.setColumnExprMap(colExprMap); + + // TODO: Shouldn't we propgate vc? is it vc col from tab or all vc + return new OpAttr("", new HashMap(), rsGBOp2); + } + + private static OpAttr genReduceSideGB1(OpAttr inputOpAf, GBInfo gbInfo, boolean computeGrpSet, + boolean propagateConstInDistinctUDAF, GroupByDesc.Mode gbMode) throws SemanticException { + ArrayList outputColNames = new ArrayList(); + ArrayList colInfoLst = new ArrayList(); + Map colExprMap = new HashMap(); + String colOutputName = null; + ReduceSinkOperator rs = (ReduceSinkOperator) inputOpAf.inputs.get(0); + List rsColInfoLst = rs.getSchema().getSignature(); + ColumnInfo ci; + boolean finalGB = (gbInfo.gbPhysicalPipelineMode == HIVEGBPHYSICALMODE.MAP_SIDE_GB_NO_SKEW_NO_ADD_MR_JOB); + + // 1. Build GB Keys, grouping set starting position + // 1.1 First Add original GB Keys + ArrayList gbKeys = ExprNodeDescUtils.genExprNodeDesc(rs, 0, + gbInfo.gbKeys.size() - 1, false, false); + for (int i = 0; i < gbInfo.gbKeys.size(); i++) { + ci = rsColInfoLst.get(i); + if (finalGB) { + colOutputName = gbInfo.outputColNames.get(i); + } else { + colOutputName = SemanticAnalyzer.getColumnInternalName(i); + } + outputColNames.add(colOutputName); + colInfoLst.add(new ColumnInfo(colOutputName, ci.getType(), "", false)); + colExprMap.put(colOutputName, gbKeys.get(i)); + } + + // 1.2 Add GrpSet Col + int groupingSetsColPosition = -1; + if ((!finalGB && gbInfo.grpSets.size() > 0) || (finalGB && gbInfo.grpIdFunctionNeeded)) { + groupingSetsColPosition = gbInfo.gbKeys.size(); + if (computeGrpSet) { + // GrpSet Col needs to be constructed + gbKeys.add(new ExprNodeConstantDesc("0")); + } else { + // GrpSet Col already part of input RS + // TODO: Can't we just copy the ExprNodeDEsc from input (Do we need to + // explicitly set table alias to null & VC to false + gbKeys.addAll(ExprNodeDescUtils.genExprNodeDesc(rs, groupingSetsColPosition, + groupingSetsColPosition, false, true)); + } + + colOutputName = SemanticAnalyzer.getColumnInternalName(groupingSetsColPosition); + if (finalGB) { + colOutputName = gbInfo.outputColNames.get(gbInfo.outputColNames.size() - 1); + } + outputColNames.add(colOutputName); + colInfoLst.add(new ColumnInfo(colOutputName, TypeInfoFactory.stringTypeInfo, null, true)); + colExprMap.put(colOutputName, gbKeys.get(groupingSetsColPosition)); + } + + // 2. Walk through UDAF and add them to GB + String lastReduceKeyColName = null; + if (!rs.getConf().getOutputKeyColumnNames().isEmpty()) { + lastReduceKeyColName = rs.getConf().getOutputKeyColumnNames() + .get(rs.getConf().getOutputKeyColumnNames().size() - 1); + } + int numDistinctUDFs = 0; + int distinctStartPosInReduceKeys = gbKeys.size(); + List reduceValues = rs.getConf().getValueCols(); + ArrayList aggregations = new ArrayList(); + int udafColStartPosInOriginalGB = (gbInfo.grpSets.size() > 0) ? gbInfo.gbKeys.size() * 2 + : gbInfo.gbKeys.size(); + int udafColStartPosInRS = rs.getConf().getKeyCols().size(); + for (int i = 0; i < gbInfo.udafAttrs.size(); i++) { + UDAFAttrs udafAttr = gbInfo.udafAttrs.get(i); + ArrayList aggParameters = new ArrayList(); + + if (udafAttr.isDistinctUDAF) { + ColumnInfo rsDistUDAFParamColInfo; + ExprNodeDesc distinctUDAFParam; + ExprNodeDesc constantPropDistinctUDAFParam; + for (int j = 0; j < udafAttr.udafParamsIndxInGBInfoDistExprs.size(); j++) { + rsDistUDAFParamColInfo = rsColInfoLst.get(distinctStartPosInReduceKeys + j); + String rsDistUDAFParamName = rsDistUDAFParamColInfo.getInternalName(); + // TODO: verify if this is needed + if (lastReduceKeyColName != null) { + rsDistUDAFParamName = Utilities.ReduceField.KEY.name() + "." + lastReduceKeyColName + + ":" + numDistinctUDFs + "." + SemanticAnalyzer.getColumnInternalName(j); + } + distinctUDAFParam = new ExprNodeColumnDesc(rsDistUDAFParamColInfo.getType(), + rsDistUDAFParamName, rsDistUDAFParamColInfo.getTabAlias(), + rsDistUDAFParamColInfo.getIsVirtualCol()); + if (propagateConstInDistinctUDAF) { + // TODO: Implement propConstDistUDAFParams + constantPropDistinctUDAFParam = SemanticAnalyzer + .isConstantParameterInAggregationParameters( + rsDistUDAFParamColInfo.getInternalName(), reduceValues); + if (constantPropDistinctUDAFParam != null) { + distinctUDAFParam = constantPropDistinctUDAFParam; + } + } + aggParameters.add(distinctUDAFParam); + } + numDistinctUDFs++; + } else { + aggParameters.add(new ExprNodeColumnDesc(rsColInfoLst.get(udafColStartPosInRS + i))); + } + Mode udafMode = SemanticAnalyzer.groupByDescModeToUDAFMode(gbMode, udafAttr.isDistinctUDAF); + GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo(udafAttr.udafEvaluator, udafMode, + aggParameters); + aggregations.add(new AggregationDesc(udafAttr.udafName.toLowerCase(), + udaf.genericUDAFEvaluator, udaf.convertedParameters, + (gbMode != GroupByDesc.Mode.FINAL && udafAttr.isDistinctUDAF), udafMode)); + + if (finalGB) { + colOutputName = gbInfo.outputColNames.get(udafColStartPosInOriginalGB + i); + } else { + colOutputName = SemanticAnalyzer.getColumnInternalName(gbKeys.size() + aggregations.size() + - 1); + } + + colInfoLst.add(new ColumnInfo(colOutputName, udaf.returnType, "", false)); + outputColNames.add(colOutputName); + } + + // Nothing special needs to be done for grouping sets if + // this is the final group by operator, and multiple rows corresponding to + // the + // grouping sets have been generated upstream. + // However, if an addition MR job has been created to handle grouping sets, + // additional rows corresponding to grouping sets need to be created here. + //TODO: Clean up/refactor assumptions + boolean includeGrpSetInGBDesc = (gbInfo.grpSets.size() > 0) + && !finalGB + && !(gbInfo.gbPhysicalPipelineMode == HIVEGBPHYSICALMODE.MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT); + Operator rsGBOp = OperatorFactory.getAndMakeChild(new GroupByDesc(gbMode, outputColNames, + gbKeys, aggregations, gbInfo.groupByMemoryUsage, gbInfo.memoryThreshold, gbInfo.grpSets, + includeGrpSetInGBDesc, groupingSetsColPosition, + gbInfo.containsDistinctAggr), new RowSchema(colInfoLst), rs); + + rsGBOp.setColumnExprMap(colExprMap); + + return new OpAttr("", new HashMap(), rsGBOp); + } + + /** + * RS-GB0 + * + * @param inputOpAf + * @param gbInfo + * @param gbMode + * @return + * @throws SemanticException + */ + private static OpAttr genReduceSideGB1NoMapGB(OpAttr inputOpAf, GBInfo gbInfo, + GroupByDesc.Mode gbMode) throws SemanticException { + ArrayList outputColNames = new ArrayList(); + ArrayList colInfoLst = new ArrayList(); + Map colExprMap = new HashMap(); + String colOutputName = null; + ReduceSinkOperator rs = (ReduceSinkOperator) inputOpAf.inputs.get(0); + List rsColInfoLst = rs.getSchema().getSignature(); + ColumnInfo ci; + boolean useOriginalGBNames = (gbInfo.gbPhysicalPipelineMode == HIVEGBPHYSICALMODE.NO_MAP_SIDE_GB_NO_SKEW); + + // 1. Build GB Keys, grouping set starting position + // 1.1 First Add original GB Keys + ArrayList gbKeys = ExprNodeDescUtils.genExprNodeDesc(rs, 0, + gbInfo.gbKeys.size() - 1, true, false); + for (int i = 0; i < gbInfo.gbKeys.size(); i++) { + ci = rsColInfoLst.get(i); + if (useOriginalGBNames) { + colOutputName = gbInfo.outputColNames.get(i); + } else { + colOutputName = SemanticAnalyzer.getColumnInternalName(i); + } + outputColNames.add(colOutputName); + colInfoLst.add(new ColumnInfo(colOutputName, ci.getType(), null, false)); + colExprMap.put(colOutputName, gbKeys.get(i)); + } + + // 2. Walk through UDAF and add them to GB + String lastReduceKeyColName = null; + if (!rs.getConf().getOutputKeyColumnNames().isEmpty()) { + lastReduceKeyColName = rs.getConf().getOutputKeyColumnNames() + .get(rs.getConf().getOutputKeyColumnNames().size() - 1); + } + int numDistinctUDFs = 0; + int distinctStartPosInReduceKeys = gbKeys.size(); + List reduceValues = rs.getConf().getValueCols(); + ArrayList aggregations = new ArrayList(); + int udafColStartPosInOriginalGB = gbInfo.gbKeys.size(); + for (int i = 0; i < gbInfo.udafAttrs.size(); i++) { + UDAFAttrs udafAttr = gbInfo.udafAttrs.get(i); + ArrayList aggParameters = new ArrayList(); + + ColumnInfo rsUDAFParamColInfo; + ExprNodeDesc udafParam; + ExprNodeDesc constantPropDistinctUDAFParam; + for (int j = 0; j < udafAttr.udafParams.size(); j++) { + rsUDAFParamColInfo = rsColInfoLst.get(distinctStartPosInReduceKeys + j); + String rsUDAFParamName = rsUDAFParamColInfo.getInternalName(); + // TODO: verify if this is needed + if (udafAttr.isDistinctUDAF && lastReduceKeyColName != null) { + rsUDAFParamName = Utilities.ReduceField.KEY.name() + "." + lastReduceKeyColName + ":" + + numDistinctUDFs + "." + SemanticAnalyzer.getColumnInternalName(j); + } + udafParam = new ExprNodeColumnDesc(rsUDAFParamColInfo.getType(), rsUDAFParamName, + rsUDAFParamColInfo.getTabAlias(), rsUDAFParamColInfo.getIsVirtualCol()); + constantPropDistinctUDAFParam = SemanticAnalyzer + .isConstantParameterInAggregationParameters(rsUDAFParamColInfo.getInternalName(), + reduceValues); + if (constantPropDistinctUDAFParam != null) { + udafParam = constantPropDistinctUDAFParam; + } + aggParameters.add(udafParam); + } + + if (udafAttr.isDistinctUDAF) { + numDistinctUDFs++; + } + Mode udafMode = SemanticAnalyzer.groupByDescModeToUDAFMode(gbMode, udafAttr.isDistinctUDAF); + GenericUDAFInfo udaf = SemanticAnalyzer.getGenericUDAFInfo(udafAttr.udafEvaluator, udafMode, + aggParameters); + aggregations.add(new AggregationDesc(udafAttr.udafName.toLowerCase(), + udaf.genericUDAFEvaluator, udaf.convertedParameters, udafAttr.isDistinctUDAF, udafMode)); + if (useOriginalGBNames) { + colOutputName = gbInfo.outputColNames.get(udafColStartPosInOriginalGB + i); + } else { + colOutputName = SemanticAnalyzer.getColumnInternalName(gbKeys.size() + aggregations.size() + - 1); + } + + colInfoLst.add(new ColumnInfo(colOutputName, udaf.returnType, "", false)); + outputColNames.add(colOutputName); + } + + Operator rsGB1 = OperatorFactory.getAndMakeChild(new GroupByDesc(gbMode, outputColNames, + gbKeys, aggregations, false, gbInfo.groupByMemoryUsage, gbInfo.memoryThreshold, null, + false, -1, numDistinctUDFs > 0), new RowSchema(colInfoLst), rs); + rsGB1.setColumnExprMap(colExprMap); + + return new OpAttr("", new HashMap(), rsGB1); + } + + @SuppressWarnings("unchecked") + private static OpAttr genMapSideGB(OpAttr inputOpAf, GBInfo gbAttrs) throws SemanticException { + ArrayList outputColNames = new ArrayList(); + ArrayList colInfoLst = new ArrayList(); + Map colExprMap = new HashMap(); + Set gbKeyColsAsNamesFrmIn = new HashSet(); + String colOutputName = null; + + // 1. Build GB Keys, grouping set starting position + // 1.1 First Add original GB Keys + ArrayList gbKeys = new ArrayList(); + for (int i = 0; i < gbAttrs.gbKeys.size(); i++) { + gbKeys.add(gbAttrs.gbKeys.get(i)); + colOutputName = SemanticAnalyzer.getColumnInternalName(i); + colInfoLst.add(new ColumnInfo(colOutputName, gbAttrs.gbKeyTypes.get(i), "", false)); + outputColNames.add(colOutputName); + gbKeyColsAsNamesFrmIn.add(gbAttrs.gbKeyColNamesInInput.get(i)); + colExprMap.put(colOutputName, gbKeys.get(i)); + } + // 1.2. Adjust GroupingSet Position, GBKeys for GroupingSet Position if + // needed. NOTE: GroupingID is added to map side GB only if we don't GrpSet + // doesn't require additional MR Jobs + int groupingSetsPosition = -1; + boolean inclGrpID = inclGrpSetInMapSide(gbAttrs); + if (inclGrpID) { + groupingSetsPosition = gbKeys.size(); + addGrpSetCol(true, null, false, gbKeys, outputColNames, colInfoLst, colExprMap); + } + // 1.3. Add all distinct params + // NOTE: distinct expr can not be part of of GB key (we assume plan + // gen would have prevented it) + for (int i = 0; i < gbAttrs.distExprNodes.size(); i++) { + if (!gbKeyColsAsNamesFrmIn.contains(gbAttrs.distExprNames.get(i))) { + gbKeys.add(gbAttrs.distExprNodes.get(i)); + colOutputName = SemanticAnalyzer.getColumnInternalName(gbKeys.size() - 1); + colInfoLst.add(new ColumnInfo(colOutputName, gbAttrs.distExprTypes.get(i), "", false)); + outputColNames.add(colOutputName); + gbKeyColsAsNamesFrmIn.add(gbAttrs.distExprNames.get(i)); + colExprMap.put(colOutputName, gbKeys.get(gbKeys.size() - 1)); + } + } + + // 2. Build Aggregations + ArrayList aggregations = new ArrayList(); + for (UDAFAttrs udafAttr : gbAttrs.udafAttrs) { + Mode amode = SemanticAnalyzer.groupByDescModeToUDAFMode(GroupByDesc.Mode.HASH, + udafAttr.isDistinctUDAF); + aggregations.add(new AggregationDesc(udafAttr.udafName.toLowerCase(), udafAttr.udafEvaluator, + udafAttr.udafParams, udafAttr.isDistinctUDAF, amode)); + GenericUDAFInfo udafInfo; + try { + udafInfo = SemanticAnalyzer.getGenericUDAFInfo(udafAttr.udafEvaluator, amode, + udafAttr.udafParams); + } catch (SemanticException e) { + throw new RuntimeException(e); + } + colOutputName = SemanticAnalyzer.getColumnInternalName(gbKeys.size() + aggregations.size() + - 1); + colInfoLst.add(new ColumnInfo(colOutputName, udafInfo.returnType, "", false)); + outputColNames.add(colOutputName); + } + + // 3. Create GB + @SuppressWarnings("rawtypes") + Operator gbOp = OperatorFactory.getAndMakeChild(new GroupByDesc(GroupByDesc.Mode.HASH, + outputColNames, gbKeys, aggregations, false, gbAttrs.groupByMemoryUsage, + gbAttrs.memoryThreshold, gbAttrs.grpSets, inclGrpID, groupingSetsPosition, + gbAttrs.containsDistinctAggr), new RowSchema(colInfoLst), inputOpAf.inputs.get(0)); + + // 5. Setup Expr Col Map + // NOTE: UDAF is not included in ExprColMap + gbOp.setColumnExprMap(colExprMap); + + return new OpAttr("", new HashMap(), gbOp); + } + + private static void addGrpSetCol(boolean createConstantExpr, String grpSetIDExprName, + boolean addReducePrefixToColInfoName, List exprLst, + List outputColumnNames, List colInfoLst, + Map colExprMap) throws SemanticException { + String outputColName = null; + ExprNodeDesc grpSetColExpr = null; + + if (createConstantExpr) { + grpSetColExpr = new ExprNodeConstantDesc("0"); + } else { + grpSetColExpr = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, grpSetIDExprName, + null, false); + } + exprLst.add(grpSetColExpr); + + outputColName = SemanticAnalyzer.getColumnInternalName(exprLst.size() - 1); + outputColumnNames.add(outputColName); + String internalColName = outputColName; + if (addReducePrefixToColInfoName) { + internalColName = Utilities.ReduceField.KEY.toString() + "." + outputColName; + } + colInfoLst.add(new ColumnInfo(internalColName, grpSetColExpr.getTypeInfo(), null, true)); + colExprMap.put(internalColName, grpSetColExpr); + } + + /** + * Get Reduce Keys for RS following MapSide GB + * + * @param reduceKeys + * assumed to be deduped list of exprs + * @param outputKeyColumnNames + * @param colExprMap + * @return List of ExprNodeDesc of ReduceKeys + * @throws SemanticException + */ + private static ArrayList getReduceKeysForRS(Operator inOp, int startPos, + int endPos, List outputKeyColumnNames, boolean addOnlyOneKeyColName, + ArrayList colInfoLst, Map colExprMap, + boolean addEmptyTabAlias, boolean setColToNonVirtual) throws SemanticException { + ArrayList reduceKeys = null; + if (endPos < 0) { + reduceKeys = new ArrayList(); + } else { + reduceKeys = ExprNodeDescUtils.genExprNodeDesc(inOp, startPos, endPos, addEmptyTabAlias, + setColToNonVirtual); + int outColNameIndx = startPos; + for (int i = 0; i < reduceKeys.size(); ++i) { + String outputColName = SemanticAnalyzer.getColumnInternalName(outColNameIndx); + outColNameIndx++; + if (!addOnlyOneKeyColName || i == 0) { + outputKeyColumnNames.add(outputColName); + } + + // TODO: Verify if this is needed (Why can't it be always null/empty + String tabAlias = addEmptyTabAlias ? "" : null; + ColumnInfo colInfo = new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + + outputColName, reduceKeys.get(i).getTypeInfo(), tabAlias, false); + colInfoLst.add(colInfo); + colExprMap.put(colInfo.getInternalName(), reduceKeys.get(i)); + } + } + + return reduceKeys; + } + + /** + * Get Value Keys for RS following MapSide GB + * + * @param GroupByOperator + * MapSide GB + * @param outputKeyColumnNames + * @param colExprMap + * @return List of ExprNodeDesc of Values + * @throws SemanticException + */ + private static ArrayList getValueKeysForRS(Operator inOp, int aggStartPos, + List outputKeyColumnNames, ArrayList colInfoLst, + Map colExprMap, boolean addEmptyTabAlias, boolean setColToNonVirtual) + throws SemanticException { + List mapGBColInfoLst = inOp.getSchema().getSignature(); + ArrayList valueKeys = null; + if (aggStartPos >= mapGBColInfoLst.size()) { + valueKeys = new ArrayList(); + } else { + valueKeys = ExprNodeDescUtils.genExprNodeDesc(inOp, aggStartPos, mapGBColInfoLst.size() - 1, + true, setColToNonVirtual); + for (int i = 0; i < valueKeys.size(); ++i) { + String outputColName = SemanticAnalyzer.getColumnInternalName(i); + outputKeyColumnNames.add(outputColName); + // TODO: Verify if this is needed (Why can't it be always null/empty + String tabAlias = addEmptyTabAlias ? "" : null; + ColumnInfo colInfo = new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + + outputColName, valueKeys.get(i).getTypeInfo(), tabAlias, false); + colInfoLst.add(colInfo); + colExprMap.put(colInfo.getInternalName(), valueKeys.get(i)); + } + } + + return valueKeys; + } + + private static List> getDistColIndices(GBInfo gbAttrs, int distOffSet) + throws SemanticException { + List> distColIndices = new ArrayList>(); + + for (List udafDistCols : gbAttrs.distColIndices) { + List udfAdjustedDistColIndx = new ArrayList(); + for (Integer distIndx : udafDistCols) { + udfAdjustedDistColIndx.add(distIndx + distOffSet); + } + distColIndices.add(udfAdjustedDistColIndx); + } + + return distColIndices; + } + + // TODO: Implement this + private static ExprNodeDesc propConstDistUDAFParams() { + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index 8babd7a..b721190 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; @@ -105,7 +106,7 @@ NO_SKEW_NO_MAP_SIDE_AGG, // Corresponds to SemAnalyzer genGroupByPlan1MR SKEW_NO_MAP_SIDE_AGG, // Corresponds to SemAnalyzer genGroupByPlan2MR NO_SKEW_MAP_SIDE_AGG, // Corresponds to SemAnalyzer - // genGroupByPlanMapAggrNoSkew + // genGroupByPlanMapAggrNoSkew SKEW_MAP_SIDE_AGG // Corresponds to SemAnalyzer genGroupByPlanMapAggr2MR }; @@ -114,45 +115,26 @@ private final HiveConf hiveConf; private final UnparseTranslator unparseTranslator; private final Map> topOps; - private final HIVEAGGOPMODE aggMode; private final boolean strictMode; private int reduceSinkTagGenerator; - public static HIVEAGGOPMODE getAggOPMode(HiveConf hc) { - HIVEAGGOPMODE aggOpMode = HIVEAGGOPMODE.NO_SKEW_NO_MAP_SIDE_AGG; - - if (hc.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) { - if (!hc.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { - aggOpMode = HIVEAGGOPMODE.NO_SKEW_MAP_SIDE_AGG; - } else { - aggOpMode = HIVEAGGOPMODE.SKEW_MAP_SIDE_AGG; - } - } else if (hc.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { - aggOpMode = HIVEAGGOPMODE.SKEW_NO_MAP_SIDE_AGG; - } - - return aggOpMode; - } - - public HiveOpConverter(SemanticAnalyzer semanticAnalyzer, - HiveConf hiveConf, UnparseTranslator unparseTranslator, - Map> topOps, - HIVEAGGOPMODE aggMode, boolean strictMode) { + public HiveOpConverter(SemanticAnalyzer semanticAnalyzer, HiveConf hiveConf, + UnparseTranslator unparseTranslator, Map> topOps, + boolean strictMode) { this.semanticAnalyzer = semanticAnalyzer; this.hiveConf = hiveConf; this.unparseTranslator = unparseTranslator; this.topOps = topOps; - this.aggMode = aggMode; this.strictMode = strictMode; this.reduceSinkTagGenerator = 0; } - private class OpAttr { - private final String tabAlias; + static class OpAttr { + final String tabAlias; ImmutableList inputs; ImmutableMap vcolMap; - private OpAttr(String tabAlias, Map vcolMap, Operator... inputs) { + OpAttr(String tabAlias, Map vcolMap, Operator... inputs) { this.tabAlias = tabAlias; this.vcolMap = ImmutableMap.copyOf(vcolMap); this.inputs = ImmutableList.copyOf(inputs); @@ -178,7 +160,7 @@ OpAttr dispatch(RelNode rn) throws SemanticException { } else if (rn instanceof SemiJoin) { SemiJoin sj = (SemiJoin) rn; HiveJoin hj = HiveJoin.getJoin(sj.getCluster(), sj.getLeft(), sj.getRight(), - sj.getCondition(), sj.getJoinType(), true); + sj.getCondition(), sj.getJoinType(), true); return visit(hj); } else if (rn instanceof HiveFilter) { return visit((HiveFilter) rn); @@ -188,9 +170,11 @@ OpAttr dispatch(RelNode rn) throws SemanticException { return visit((HiveUnion) rn); } else if (rn instanceof LogicalExchange) { return visit((LogicalExchange) rn); + } else if (rn instanceof HiveAggregate) { + return visit((HiveAggregate) rn); } LOG.error(rn.getClass().getCanonicalName() + "operator translation not supported" - + " yet in return path."); + + " yet in return path."); return null; } @@ -203,8 +187,8 @@ OpAttr dispatch(RelNode rn) throws SemanticException { OpAttr visit(HiveTableScan scanRel) { if (LOG.isDebugEnabled()) { - LOG.debug("Translating operator rel#" + scanRel.getId() + ":" + scanRel.getRelTypeName() + - " with row type: [" + scanRel.getRowType() + "]"); + LOG.debug("Translating operator rel#" + scanRel.getId() + ":" + scanRel.getRelTypeName() + + " with row type: [" + scanRel.getRowType() + "]"); } RelOptHiveTable ht = (RelOptHiveTable) scanRel.getTable(); @@ -264,24 +248,23 @@ OpAttr visit(HiveProject projectRel) throws SemanticException { OpAttr inputOpAf = dispatch(projectRel.getInput()); if (LOG.isDebugEnabled()) { - LOG.debug("Translating operator rel#" + projectRel.getId() + ":" + projectRel.getRelTypeName() + - " with row type: [" + projectRel.getRowType() + "]"); + LOG.debug("Translating operator rel#" + projectRel.getId() + ":" + + projectRel.getRelTypeName() + " with row type: [" + projectRel.getRowType() + "]"); } WindowingSpec windowingSpec = new WindowingSpec(); List exprCols = new ArrayList(); - for (int pos=0; pos> children = new ArrayList>( - joinRel.getInputs().size()); - for (int i=0; i> children = new ArrayList>(joinRel.getInputs().size()); + for (int i = 0; i < inputs.length; i++) { inputs[i] = dispatch(joinRel.getInput(i)); children.add(inputs[i].inputs.get(0)); } if (LOG.isDebugEnabled()) { - LOG.debug("Translating operator rel#" + joinRel.getId() + ":" + joinRel.getRelTypeName() + - " with row type: [" + joinRel.getRowType() + "]"); + LOG.debug("Translating operator rel#" + joinRel.getId() + ":" + joinRel.getRelTypeName() + + " with row type: [" + joinRel.getRowType() + "]"); } // 2. Convert join condition @@ -323,15 +305,16 @@ OpAttr visit(HiveJoin joinRel) throws SemanticException { // 4. Generate Join operator JoinOperator joinOp = genJoin(joinRel, joinPredInfo, children, joinKeys); - // 5. TODO: Extract condition for non-equi join elements (if any) and add it + // 5. TODO: Extract condition for non-equi join elements (if any) and + // add it // 6. Virtual columns - Map vcolMap = new HashMap(); + Map vcolMap = new HashMap(); vcolMap.putAll(inputs[0].vcolMap); if (extractJoinType(joinRel) != JoinType.LEFTSEMI) { int shift = inputs[0].inputs.get(0).getSchema().getSignature().size(); - for (int i=1; i inputOp = inputOpAf.inputs.get(0); Operator resultOp = inputOpAf.inputs.get(0); // 1. If we need to sort tuples based on the value of some - // of their columns + // of their columns if (sortRel.getCollation() != RelCollations.EMPTY) { - - // In strict mode, in the presence of order by, limit must be specified + + // In strict mode, in the presence of order by, limit must be + // specified if (strictMode && sortRel.fetch == null) { throw new SemanticException(ErrorMsg.NO_LIMIT_WITH_ORDERBY.getMsg()); } - + // 1.a. Extract order for each column from collation - // Generate sortCols and order + // Generate sortCols and order List sortCols = new ArrayList(); StringBuilder order = new StringBuilder(); for (RelCollation collation : sortRel.getCollationList()) { for (RelFieldCollation sortInfo : collation.getFieldCollations()) { int sortColumnPos = sortInfo.getFieldIndex(); - ColumnInfo columnInfo = new ColumnInfo(inputOp.getSchema().getSignature(). - get(sortColumnPos)); + ColumnInfo columnInfo = new ColumnInfo(inputOp.getSchema().getSignature() + .get(sortColumnPos)); ExprNodeColumnDesc sortColumn = new ExprNodeColumnDesc(columnInfo.getType(), - columnInfo.getInternalName(), columnInfo.getTabAlias(), - columnInfo.getIsVirtualCol()); + columnInfo.getInternalName(), columnInfo.getTabAlias(), columnInfo.getIsVirtualCol()); sortCols.add(sortColumn); if (sortInfo.getDirection() == RelFieldCollation.Direction.DESCENDING) { order.append("-"); - } - else { + } else { order.append("+"); } } } // Use only 1 reducer for order by int numReducers = 1; - + // 1.b. Generate reduce sink and project operator - resultOp = genReduceSinkAndBacktrackSelect(resultOp, sortCols.toArray(new ExprNodeDesc[sortCols.size()]), - -1, new ArrayList(), order.toString(), numReducers, - Operation.NOT_ACID, strictMode); + resultOp = genReduceSinkAndBacktrackSelect(resultOp, + sortCols.toArray(new ExprNodeDesc[sortCols.size()]), -1, new ArrayList(), + order.toString(), numReducers, Operation.NOT_ACID, strictMode); } // 2. If we need to generate limit @@ -407,14 +392,14 @@ else if (sortRel.fetch == null) { LimitDesc limitDesc = new LimitDesc(limit); // TODO: Set 'last limit' global property ArrayList cinfoLst = createColInfos(inputOp); - resultOp = (LimitOperator) OperatorFactory.getAndMakeChild( - limitDesc, new RowSchema(cinfoLst), resultOp); - + resultOp = (LimitOperator) OperatorFactory.getAndMakeChild(limitDesc, + new RowSchema(cinfoLst), resultOp); + if (LOG.isDebugEnabled()) { LOG.debug("Generated " + resultOp + " with row schema: [" + resultOp.getSchema() + "]"); } } - + // 3. Return result return inputOpAf.clone(resultOp); } @@ -426,12 +411,13 @@ OpAttr visit(HiveFilter filterRel) throws SemanticException { OpAttr inputOpAf = dispatch(filterRel.getInput()); if (LOG.isDebugEnabled()) { - LOG.debug("Translating operator rel#" + filterRel.getId() + ":" + filterRel.getRelTypeName() + - " with row type: [" + filterRel.getRowType() + "]"); + LOG.debug("Translating operator rel#" + filterRel.getId() + ":" + filterRel.getRelTypeName() + + " with row type: [" + filterRel.getRowType() + "]"); } ExprNodeDesc filCondExpr = filterRel.getCondition().accept( - new ExprNodeConverter(inputOpAf.tabAlias, filterRel.getInput().getRowType(), false)); + new ExprNodeConverter(inputOpAf.tabAlias, filterRel.getInput().getRowType(), false, + filterRel.getCluster().getTypeFactory())); FilterDesc filDesc = new FilterDesc(filCondExpr, false); ArrayList cinfoLst = createColInfos(inputOpAf.inputs.get(0)); FilterOperator filOp = (FilterOperator) OperatorFactory.getAndMakeChild(filDesc, new RowSchema( @@ -447,13 +433,13 @@ OpAttr visit(HiveFilter filterRel) throws SemanticException { OpAttr visit(HiveUnion unionRel) throws SemanticException { // 1. Convert inputs OpAttr[] inputs = new OpAttr[unionRel.getInputs().size()]; - for (int i=0; i cinfoLst = createColInfos(inputs[0].inputs.get(0)); Operator[] children = new Operator[inputs.length]; - for (int i=0; i unionOp = OperatorFactory.getAndMakeChild( - unionDesc, new RowSchema(cinfoLst), children); + Operator unionOp = OperatorFactory.getAndMakeChild(unionDesc, + new RowSchema(cinfoLst), children); if (LOG.isDebugEnabled()) { LOG.debug("Generated " + unionOp + " with row schema: [" + unionOp.getSchema() + "]"); @@ -479,8 +465,8 @@ OpAttr visit(LogicalExchange exchangeRel) throws SemanticException { OpAttr inputOpAf = dispatch(exchangeRel.getInput()); if (LOG.isDebugEnabled()) { - LOG.debug("Translating operator rel#" + exchangeRel.getId() + ":" + exchangeRel.getRelTypeName() + - " with row type: [" + exchangeRel.getRowType() + "]"); + LOG.debug("Translating operator rel#" + exchangeRel.getId() + ":" + + exchangeRel.getRelTypeName() + " with row type: [" + exchangeRel.getRowType() + "]"); } RelDistribution distribution = exchangeRel.getDistribution(); @@ -488,23 +474,22 @@ OpAttr visit(LogicalExchange exchangeRel) throws SemanticException { throw new SemanticException("Only hash distribution supported for LogicalExchange"); } ExprNodeDesc[] expressions = new ExprNodeDesc[distribution.getKeys().size()]; - for (int i=0; i input = inputOpAf.inputs.get(0); - + wSpec.validateAndMakeEffective(); WindowingComponentizer groups = new WindowingComponentizer(wSpec); RowResolver rr = new RowResolver(); @@ -512,7 +497,7 @@ private OpAttr genPTF(OpAttr inputOpAf, WindowingSpec wSpec) throws SemanticExce rr.put(ci.getTabAlias(), ci.getInternalName(), ci); } - while(groups.hasNext() ) { + while (groups.hasNext()) { wSpec = groups.next(hiveConf, semanticAnalyzer, unparseTranslator, rr); // 1. Create RS and backtrack Select operator on top @@ -544,17 +529,17 @@ private OpAttr genPTF(OpAttr inputOpAf, WindowingSpec wSpec) throws SemanticExce } SelectOperator selectOp = genReduceSinkAndBacktrackSelect(input, - keyCols.toArray(new ExprNodeDesc[keyCols.size()]), - reduceSinkTagGenerator++, partCols, order.toString(), - -1, Operation.NOT_ACID, strictMode); + keyCols.toArray(new ExprNodeDesc[keyCols.size()]), reduceSinkTagGenerator++, partCols, + order.toString(), -1, Operation.NOT_ACID, strictMode); // 2. Finally create PTF PTFTranslator translator = new PTFTranslator(); - PTFDesc ptfDesc = translator.translate(wSpec, semanticAnalyzer, hiveConf, rr, unparseTranslator); + PTFDesc ptfDesc = translator.translate(wSpec, semanticAnalyzer, hiveConf, rr, + unparseTranslator); RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr(); Operator ptfOp = OperatorFactory.getAndMakeChild(ptfDesc, - new RowSchema(ptfOpRR.getColumnInfos()), selectOp); + new RowSchema(ptfOpRR.getColumnInfos()), selectOp); if (LOG.isDebugEnabled()) { LOG.debug("Generated " + ptfOp + " with row schema: [" + ptfOp.getSchema() + "]"); @@ -564,12 +549,11 @@ private OpAttr genPTF(OpAttr inputOpAf, WindowingSpec wSpec) throws SemanticExce rr = ptfOpRR; input = ptfOp; } - + return inputOpAf.clone(input); } - private ExprNodeDesc[][] extractJoinKeys(JoinPredicateInfo joinPredInfo, - List inputs) { + private ExprNodeDesc[][] extractJoinKeys(JoinPredicateInfo joinPredInfo, List inputs) { ExprNodeDesc[][] joinKeys = new ExprNodeDesc[inputs.size()][]; for (int i = 0; i < inputs.size(); i++) { joinKeys[i] = new ExprNodeDesc[joinPredInfo.getEquiJoinPredicateElements().size()]; @@ -583,22 +567,20 @@ private OpAttr genPTF(OpAttr inputOpAf, WindowingSpec wSpec) throws SemanticExce } private static SelectOperator genReduceSinkAndBacktrackSelect(Operator input, - ExprNodeDesc[] keys, int tag, ArrayList partitionCols, - String order, int numReducers, Operation acidOperation, - boolean strictMode) throws SemanticException { + ExprNodeDesc[] keys, int tag, ArrayList partitionCols, String order, + int numReducers, Operation acidOperation, boolean strictMode) throws SemanticException { // 1. Generate RS operator - ReduceSinkOperator rsOp = genReduceSink(input, keys, tag, partitionCols, - order, numReducers, acidOperation, strictMode); + ReduceSinkOperator rsOp = genReduceSink(input, keys, tag, partitionCols, order, numReducers, + acidOperation, strictMode); // 2. Generate backtrack Select operator - Map descriptors = buildBacktrackFromReduceSink( - (ReduceSinkOperator) rsOp, input); - SelectDesc selectDesc = new SelectDesc( - new ArrayList(descriptors.values()), - new ArrayList(descriptors.keySet())); + Map descriptors = buildBacktrackFromReduceSink((ReduceSinkOperator) rsOp, + input); + SelectDesc selectDesc = new SelectDesc(new ArrayList(descriptors.values()), + new ArrayList(descriptors.keySet())); ArrayList cinfoLst = createColInfos(input); SelectOperator selectOp = (SelectOperator) OperatorFactory.getAndMakeChild(selectDesc, - new RowSchema(cinfoLst), rsOp); + new RowSchema(cinfoLst), rsOp); selectOp.setColumnExprMap(descriptors); if (LOG.isDebugEnabled()) { @@ -608,19 +590,17 @@ private static SelectOperator genReduceSinkAndBacktrackSelect(Operator input, return selectOp; } - private static ReduceSinkOperator genReduceSink(Operator input, ExprNodeDesc[] keys, - int tag, int numReducers, Operation acidOperation, - boolean strictMode) throws SemanticException { - return genReduceSink(input, keys, tag, new ArrayList(), - "", numReducers, acidOperation, strictMode); + private static ReduceSinkOperator genReduceSink(Operator input, ExprNodeDesc[] keys, int tag, + int numReducers, Operation acidOperation, boolean strictMode) throws SemanticException { + return genReduceSink(input, keys, tag, new ArrayList(), "", numReducers, + acidOperation, strictMode); } - + @SuppressWarnings({ "rawtypes", "unchecked" }) - private static ReduceSinkOperator genReduceSink(Operator input, - ExprNodeDesc[] keys, int tag, ArrayList partitionCols, - String order, int numReducers, Operation acidOperation, - boolean strictMode) throws SemanticException { - Operator dummy = Operator.createDummy(); // dummy for backtracking + private static ReduceSinkOperator genReduceSink(Operator input, ExprNodeDesc[] keys, int tag, + ArrayList partitionCols, String order, int numReducers, + Operation acidOperation, boolean strictMode) throws SemanticException { + Operator dummy = Operator.createDummy(); // dummy for backtracking dummy.setParentOperators(Arrays.asList(input)); ArrayList reduceKeys = new ArrayList(); @@ -690,31 +670,29 @@ private static ReduceSinkOperator genReduceSink(Operator input, ReduceSinkDesc rsDesc; if (order.isEmpty()) { - rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, - outputColumnNames, false, tag, reduceKeys.size(), - numReducers, acidOperation); + rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, false, tag, + reduceKeys.size(), numReducers, acidOperation); } else { - rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, - outputColumnNames, false, tag, partitionCols, - order, numReducers, acidOperation); + rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, false, tag, + partitionCols, order, numReducers, acidOperation); } ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(rsDesc, - new RowSchema(outputColumns), input); + new RowSchema(outputColumns), input); List keyColNames = rsDesc.getOutputKeyColumnNames(); - for (int i = 0 ; i < keyColNames.size(); i++) { + for (int i = 0; i < keyColNames.size(); i++) { colExprMap.put(Utilities.ReduceField.KEY + "." + keyColNames.get(i), reduceKeys.get(i)); } List valColNames = rsDesc.getOutputValueColumnNames(); - for (int i = 0 ; i < valColNames.size(); i++) { + for (int i = 0; i < valColNames.size(); i++) { colExprMap.put(Utilities.ReduceField.VALUE + "." + valColNames.get(i), reduceValues.get(i)); } rsOp.setValueIndex(index); rsOp.setColumnExprMap(colExprMap); - rsOp.setInputAliases(input.getSchema().getColumnNames().toArray( - new String[input.getSchema().getColumnNames().size()])); + rsOp.setInputAliases(input.getSchema().getColumnNames() + .toArray(new String[input.getSchema().getColumnNames().size()])); if (LOG.isDebugEnabled()) { LOG.debug("Generated " + rsOp + " with row schema: [" + rsOp.getSchema() + "]"); @@ -728,14 +706,14 @@ private static JoinOperator genJoin(HiveJoin hiveJoin, JoinPredicateInfo joinPre // Extract join type JoinType joinType = extractJoinType(hiveJoin); - + // NOTE: Currently binary joins only JoinCondDesc[] joinCondns = new JoinCondDesc[1]; joinCondns[0] = new JoinCondDesc(new JoinCond(0, 1, joinType)); ArrayList outputColumns = new ArrayList(); - ArrayList outputColumnNames = - new ArrayList(hiveJoin.getRowType().getFieldNames()); + ArrayList outputColumnNames = new ArrayList(hiveJoin.getRowType() + .getFieldNames()); Operator[] childOps = new Operator[children.size()]; Map reversedExprs = new HashMap(); @@ -769,7 +747,7 @@ private static JoinOperator genJoin(HiveJoin hiveJoin, JoinPredicateInfo joinPre posToAliasMap.put(pos, new HashSet(inputRS.getSchema().getTableNames())); Map descriptors = buildBacktrackFromReduceSink(outputPos, - outputColumnNames, keyColNames, valColNames, index, parent); + outputColumnNames, keyColNames, valColNames, index, parent); List parentColumns = parent.getSchema().getSignature(); for (int i = 0; i < index.length; i++) { @@ -785,15 +763,13 @@ private static JoinOperator genJoin(HiveJoin hiveJoin, JoinPredicateInfo joinPre childOps[pos] = inputRS; } - boolean noOuterJoin = joinType != JoinType.FULLOUTER - && joinType != JoinType.LEFTOUTER - && joinType != JoinType.RIGHTOUTER; - JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, - joinCondns, joinKeys); + boolean noOuterJoin = joinType != JoinType.FULLOUTER && joinType != JoinType.LEFTOUTER + && joinType != JoinType.RIGHTOUTER; + JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, joinCondns, joinKeys); desc.setReversedExprs(reversedExprs); - JoinOperator joinOp = (JoinOperator) OperatorFactory.getAndMakeChild(desc, - new RowSchema(outputColumns), childOps); + JoinOperator joinOp = (JoinOperator) OperatorFactory.getAndMakeChild(desc, new RowSchema( + outputColumns), childOps); joinOp.setColumnExprMap(colExprMap); joinOp.setPosToAliasMap(posToAliasMap); @@ -818,32 +794,32 @@ private static JoinType extractJoinType(HiveJoin join) { // OUTER AND INNER JOINS JoinType resultJoinType; switch (join.getJoinType()) { - case FULL: - resultJoinType = JoinType.FULLOUTER; - break; - case LEFT: - resultJoinType = JoinType.LEFTOUTER; - break; - case RIGHT: - resultJoinType = JoinType.RIGHTOUTER; - break; - default: - resultJoinType = JoinType.INNER; - break; + case FULL: + resultJoinType = JoinType.FULLOUTER; + break; + case LEFT: + resultJoinType = JoinType.LEFTOUTER; + break; + case RIGHT: + resultJoinType = JoinType.RIGHTOUTER; + break; + default: + resultJoinType = JoinType.INNER; + break; } return resultJoinType; } - private static Map buildBacktrackFromReduceSink( - ReduceSinkOperator rsOp, Operator inputOp) { - return buildBacktrackFromReduceSink(0, inputOp.getSchema().getColumnNames(), - rsOp.getConf().getOutputKeyColumnNames(), rsOp.getConf().getOutputValueColumnNames(), - rsOp.getValueIndex(), inputOp); + private static Map buildBacktrackFromReduceSink(ReduceSinkOperator rsOp, + Operator inputOp) { + return buildBacktrackFromReduceSink(0, inputOp.getSchema().getColumnNames(), rsOp.getConf() + .getOutputKeyColumnNames(), rsOp.getConf().getOutputValueColumnNames(), + rsOp.getValueIndex(), inputOp); } private static Map buildBacktrackFromReduceSink(int initialPos, - List outputColumnNames, List keyColNames, List valueColNames, - int[] index, Operator inputOp) { + List outputColumnNames, List keyColNames, List valueColNames, + int[] index, Operator inputOp) { Map columnDescriptors = new LinkedHashMap(); for (int i = 0; i < index.length; i++) { ColumnInfo info = new ColumnInfo(inputOp.getSchema().getSignature().get(i)); @@ -853,17 +829,16 @@ private static JoinType extractJoinType(HiveJoin join) { } else { field = Utilities.ReduceField.VALUE + "." + valueColNames.get(-index[i] - 1); } - ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(), - field, info.getTabAlias(), info.getIsVirtualCol()); - columnDescriptors.put(outputColumnNames.get(initialPos+i), desc); + ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(), field, info.getTabAlias(), + info.getIsVirtualCol()); + columnDescriptors.put(outputColumnNames.get(initialPos + i), desc); } return columnDescriptors; } - private static ExprNodeDesc convertToExprNode(RexNode rn, RelNode inputRel, - String tabAlias) { - return (ExprNodeDesc) rn.accept(new ExprNodeConverter(tabAlias, - inputRel.getRowType(), false)); + private static ExprNodeDesc convertToExprNode(RexNode rn, RelNode inputRel, String tabAlias) { + return (ExprNodeDesc) rn.accept(new ExprNodeConverter(tabAlias, inputRel.getRowType(), false, + inputRel.getCluster().getTypeFactory())); } private static ArrayList createColInfos(Operator input) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index bd6a984..e0526d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -611,7 +611,7 @@ Operator getOptimizedHiveOPDag() throws SemanticException { RelNode modifiedOptimizedOptiqPlan = introduceProjectIfNeeded(optimizedOptiqPlan); - Operator hiveRoot = new HiveOpConverter(this, conf, unparseTranslator, topOps, HiveOpConverter.getAggOPMode(conf), + Operator hiveRoot = new HiveOpConverter(this, conf, unparseTranslator, topOps, conf.getVar(HiveConf.ConfVars.HIVEMAPREDMODE).equalsIgnoreCase("strict")).convert(modifiedOptimizedOptiqPlan); RowResolver hiveRootRR = genRowResolver(hiveRoot, getQB()); opParseCtx.put(hiveRoot, new OpParseContext(hiveRootRR)); @@ -1836,10 +1836,40 @@ private AggInfo getHiveAggInfo(ASTNode aggAst, int aggFnLstArgIndx, RowResolver private RelNode genGBLogicalPlan(QB qb, RelNode srcRel) throws SemanticException { RelNode gbRel = null; QBParseInfo qbp = getQBParseInfo(qb); - - // 1. Gather GB Expressions (AST) (GB + Aggregations) // NOTE: Multi Insert is not supported String detsClauseName = qbp.getClauseNames().iterator().next(); + List grpByAstExprs = SemanticAnalyzer.getGroupByForClause(qbp, detsClauseName); + HashMap aggregationTrees = qbp.getAggregationExprsForClause(detsClauseName); + // NOTE: Multi Insert is not supported + boolean cubeRollupGrpSetPresent = (!qbp.getDestRollups().isEmpty() + || !qbp.getDestGroupingSets().isEmpty() || !qbp.getDestCubes().isEmpty()); + + // 0. Sanity check + if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW) + && qbp.getDistinctFuncExprsForClause(detsClauseName).size() > 1) { + throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.getMsg()); + } + if (cubeRollupGrpSetPresent) { + if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) { + throw new SemanticException(ErrorMsg.HIVE_GROUPING_SETS_AGGR_NOMAPAGGR.getMsg()); + } + + if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) { + checkExpressionsForGroupingSet(grpByAstExprs, qb.getParseInfo() + .getDistinctFuncExprsForClause(detsClauseName), aggregationTrees, + this.relToHiveRR.get(srcRel)); + + if (qbp.getDestGroupingSets().size() > conf + .getIntVar(HiveConf.ConfVars.HIVE_NEW_JOB_GROUPING_SET_CARDINALITY)) { + String errorMsg = "The number of rows per input row due to grouping sets is " + + qbp.getDestGroupingSets().size(); + throw new SemanticException( + ErrorMsg.HIVE_GROUPING_SETS_THRESHOLD_NOT_ALLOWED_WITH_SKEW.getMsg(errorMsg)); + } + } + } + + // 1. Gather GB Expressions (AST) (GB + Aggregations) // Check and transform group by *. This will only happen for select distinct *. // Here the "genSelectPlan" is being leveraged. // The main benefits are (1) remove virtual columns that should @@ -1857,8 +1887,6 @@ private RelNode genGBLogicalPlan(QB qb, RelNode srcRel) throws SemanticException qbp.setSelExprForClause(detsClauseName, SemanticAnalyzer.genSelectDIAST(rr)); } } - List grpByAstExprs = SemanticAnalyzer.getGroupByForClause(qbp, detsClauseName); - HashMap aggregationTrees = qbp.getAggregationExprsForClause(detsClauseName); boolean hasGrpByAstExprs = (grpByAstExprs != null && !grpByAstExprs.isEmpty()) ? true : false; boolean hasAggregationTrees = (aggregationTrees != null && !aggregationTrees.isEmpty()) ? true : false; @@ -1890,9 +1918,7 @@ private RelNode genGBLogicalPlan(QB qb, RelNode srcRel) throws SemanticException // 4. GroupingSets, Cube, Rollup int groupingColsSize = gbExprNDescLst.size(); List groupingSets = null; - if (!qbp.getDestRollups().isEmpty() - || !qbp.getDestGroupingSets().isEmpty() - || !qbp.getDestCubes().isEmpty()) { + if (cubeRollupGrpSetPresent) { if (qbp.getDestRollups().contains(detsClauseName)) { groupingSets = getGroupingSetsForRollup(grpByAstExprs.size()); } else if (qbp.getDestCubes().contains(detsClauseName)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index a450df2..67e366e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -3443,7 +3443,7 @@ private boolean checkForNoAggr(List bitmaps) { return ret; } - private int setBit(int bitmap, int bitIdx) { + public static int setBit(int bitmap, int bitIdx) { return bitmap | (1 << bitIdx); } @@ -3979,10 +3979,10 @@ boolean autogenColAliasPrfxIncludeFuncName() { /** * Class to store GenericUDAF related information. */ - static class GenericUDAFInfo { - ArrayList convertedParameters; - GenericUDAFEvaluator genericUDAFEvaluator; - TypeInfo returnType; + public static class GenericUDAFInfo { + public ArrayList convertedParameters; + public GenericUDAFEvaluator genericUDAFEvaluator; + public TypeInfo returnType; } /** @@ -4023,7 +4023,7 @@ boolean autogenColAliasPrfxIncludeFuncName() { * Returns the GenericUDAFEvaluator for the aggregation. This is called once * for each GroupBy aggregation. */ - static GenericUDAFEvaluator getGenericUDAFEvaluator(String aggName, + public static GenericUDAFEvaluator getGenericUDAFEvaluator(String aggName, ArrayList aggParameters, ASTNode aggTree, boolean isDistinct, boolean isAllColumns) throws SemanticException { @@ -4053,7 +4053,7 @@ static GenericUDAFEvaluator getGenericUDAFEvaluator(String aggName, * @throws SemanticException * when the UDAF is not found or has problems. */ - static GenericUDAFInfo getGenericUDAFInfo(GenericUDAFEvaluator evaluator, + public static GenericUDAFInfo getGenericUDAFInfo(GenericUDAFEvaluator evaluator, GenericUDAFEvaluator.Mode emode, ArrayList aggParameters) throws SemanticException { @@ -4082,7 +4082,7 @@ static GenericUDAFInfo getGenericUDAFInfo(GenericUDAFEvaluator evaluator, return r; } - static GenericUDAFEvaluator.Mode groupByDescModeToUDAFMode( + public static GenericUDAFEvaluator.Mode groupByDescModeToUDAFMode( GroupByDesc.Mode mode, boolean isDistinct) { switch (mode) { case COMPLETE: @@ -4125,7 +4125,7 @@ static GenericUDAFInfo getGenericUDAFInfo(GenericUDAFEvaluator evaluator, * @return the ExprNodeDesc of the constant parameter if the given internalName represents * a constant parameter; otherwise, return null */ - private ExprNodeDesc isConstantParameterInAggregationParameters(String internalName, + public static ExprNodeDesc isConstantParameterInAggregationParameters(String internalName, List reduceValues) { // only the pattern of "VALUE._col([0-9]+)" should be handled. @@ -5572,7 +5572,7 @@ static private boolean hasCommonElement(Set set1, Set set2) { return false; } - private void checkExpressionsForGroupingSet(List grpByExprs, + void checkExpressionsForGroupingSet(List grpByExprs, List distinctGrpByExprs, Map aggregationTrees, RowResolver inputRowResolver) throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java index de1f9cd..fb3c4a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; @@ -444,4 +445,42 @@ public static PrimitiveTypeInfo deriveMinArgumentCast( // If the child is also decimal, no cast is needed (we hope - can target type be narrower?). return HiveDecimalUtils.getDecimalTypeForPrimitiveCategory(childTi); } + + /** + * Build ExprNodeColumnDesc for the projections in the input operator from + * sartpos to endpos(both included). Operator must have an associated + * colExprMap. + * + * @param inputOp + * Input Hive Operator + * @param startPos + * starting position in the input operator schema; must be >=0 and <= + * endPos + * @param endPos + * end position in the input operator schema; must be >=0. + * @return List of ExprNodeDesc + */ + public static ArrayList genExprNodeDesc(Operator inputOp, int startPos, int endPos, + boolean addEmptyTabAlias, boolean setColToNonVirtual) { + ArrayList exprColLst = new ArrayList(); + List colInfoLst = inputOp.getSchema().getSignature(); + + String tabAlias; + boolean vc; + ColumnInfo ci; + for (int i = startPos; i <= endPos; i++) { + ci = colInfoLst.get(i); + tabAlias = ci.getTabAlias(); + if (addEmptyTabAlias) { + tabAlias = ""; + } + vc = ci.getIsVirtualCol(); + if (setColToNonVirtual) { + vc = false; + } + exprColLst.add(new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(), tabAlias, vc)); + } + + return exprColLst; + } }