diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 33b67dd..05ae4e0 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -744,7 +744,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { // CBO related HIVE_CBO_ENABLED("hive.cbo.enable", true, "Flag to control enabling Cost Based Optimizations using Calcite framework."), - HIVE_CBO_RETPATH_HIVEOP("hive.cbo.returnpath.hiveop", false, "Flag to control calcite plan to hive operator conversion"), + HIVE_CBO_RETPATH_HIVEOP("hive.cbo.returnpath.hiveop", true, "Flag to control calcite plan to hive operator conversion"), HIVE_CBO_EXTENDED_COST_MODEL("hive.cbo.costmodel.extended", false, "Flag to control enabling the extended cost model based on" + "CPU, IO and cardinality. Otherwise, the cost model is based on cardinality."), HIVE_CBO_COST_MODEL_CPU("hive.cbo.costmodel.cpu", "0.000001", "Default cost of a comparison"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinToMultiJoinRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinToMultiJoinRule.java index d0a29a7..3eedb9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinToMultiJoinRule.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinToMultiJoinRule.java @@ -249,12 +249,16 @@ private static boolean isCombinablePredicate(Join join, constructJoinPredicateInfo(join, condition); final JoinPredicateInfo otherJoinPredInfo = HiveCalciteUtil.JoinPredicateInfo. constructJoinPredicateInfo(join, otherCondition); - if (joinPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema(). - equals(otherJoinPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema())) { + if (joinPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema().size() == 0 + || otherJoinPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema().size() == 0 + || joinPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema().equals( + otherJoinPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema())) { return false; } - if (joinPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema(). - equals(otherJoinPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema())) { + if (joinPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema().size() == 0 + || otherJoinPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema().size() == 0 + || joinPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema().equals( + otherJoinPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema())) { return false; } return true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java index 955aa91..b6a79db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ExprNodeConverter.java @@ -44,6 +44,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveVarchar; @@ -223,7 +224,7 @@ public ExprNodeDesc visitLiteral(RexLiteral literal) { return new ExprNodeConstantDesc(TypeInfoFactory.binaryTypeInfo, literal.getValue3()); case DECIMAL: return new ExprNodeConstantDesc(TypeInfoFactory.getDecimalTypeInfo(lType.getPrecision(), - lType.getScale()), literal.getValue3()); + lType.getScale()), HiveDecimal.create((BigDecimal)literal.getValue3())); case VARCHAR: { int varcharLength = lType.getPrecision(); // If we cannot use Varchar due to type length restrictions, we use String diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java index 5f6be9e..5cd3a06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java @@ -169,7 +169,7 @@ private static void convertOpTree(RelNode rel, RelNode parent) { } } - private static RelNode renameTopLevelSelectInResultSchema(final RelNode rootRel, + public static RelNode renameTopLevelSelectInResultSchema(final RelNode rootRel, Pair topSelparentPair, List resultSchema) throws CalciteSemanticException { RelNode parentOforiginalProjRel = topSelparentPair.getKey(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForReturnPath.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForReturnPath.java index 06cf69d..81cc474 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForReturnPath.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForReturnPath.java @@ -28,12 +28,16 @@ public class PlanModifierForReturnPath { - public static RelNode convertOpTree(RelNode rel, List resultSchema) + public static RelNode convertOpTree(RelNode rel, List resultSchema, boolean isCTAS) throws CalciteSemanticException { RelNode newTopNode = rel; Pair topSelparentPair = HiveCalciteUtil.getTopLevelSelect(newTopNode); PlanModifierUtil.fixTopOBSchema(newTopNode, topSelparentPair, resultSchema, false); + if (isCTAS) { + newTopNode = PlanModifierForASTConv.renameTopLevelSelectInResultSchema(newTopNode, + topSelparentPair, resultSchema); + } return newTopNode; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java index 174685b..5b673df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/AbstractCorrelationProcCtx.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONMINREDUCER; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE; import java.util.HashSet; import java.util.Set; @@ -39,11 +40,13 @@ // only one reducer if this configuration does not prevents private final int minReducer; private final Set> removedOps; + private final boolean isMapAggr; public AbstractCorrelationProcCtx(ParseContext pctx) { removedOps = new HashSet>(); trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST); minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER); + isMapAggr = pctx.getConf().getBoolVar(HIVEMAPSIDEAGGREGATE); this.pctx = pctx; } @@ -70,4 +73,8 @@ public boolean hasBeenRemoved(Operator rsOp) { public boolean addRemovedOperator(Operator rsOp) { return removedOps.add(rsOp); } + + public boolean isMapAggr() { + return isMapAggr; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java index 64bef21..7bb49be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationUtilities.java @@ -29,6 +29,7 @@ import java.util.Map.Entry; import java.util.Set; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.ForwardOperator; @@ -44,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.Utilities.ReduceField; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.optimizer.correlation.ReduceSinkDeDuplication.ReduceSinkDeduplicateProcCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; @@ -163,10 +165,10 @@ protected static boolean hasGroupingSet(ReduceSinkOperator cRS) throws SemanticE return type.isInstance(parent) ? (T)parent : null; } - protected static Operator getStartForGroupBy(ReduceSinkOperator cRS) + protected static Operator getStartForGroupBy(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException { Operator parent = getSingleParent(cRS); - return parent instanceof GroupByOperator ? parent : cRS; // skip map-aggr GBY + return parent instanceof GroupByOperator && dedupCtx.isMapAggr() ? parent : cRS; // skip map-aggr GBY } @@ -240,6 +242,7 @@ protected static int indexOf(ExprNodeDesc cexpr, ExprNodeDesc[] pexprs, Operator || cursor instanceof FilterOperator || cursor instanceof ForwardOperator || cursor instanceof ScriptOperator + || cursor instanceof GroupByOperator || cursor instanceof ReduceSinkOperator)) { return null; } @@ -395,7 +398,7 @@ protected static void removeReduceSinkForGroupBy(ReduceSinkOperator cRS, GroupBy Operator parent = getSingleParent(cRS); - if (parent instanceof GroupByOperator) { + if ((parent instanceof GroupByOperator) && procCtx.isMapAggr()) { // pRS-cGBYm-cRS-cGBYr (map aggregation) --> pRS-cGBYr(COMPLETE) // copies desc of cGBYm to cGBYr and remove cGBYm and cRS GroupByOperator cGBYm = (GroupByOperator) parent; @@ -440,7 +443,7 @@ protected static void removeReduceSinkForGroupBy(ReduceSinkOperator cRS, GroupBy removeOperator(cRS, cGBYr, parent, context); procCtx.addRemovedOperator(cRS); - if (parent instanceof GroupByOperator) { + if ((parent instanceof GroupByOperator) && procCtx.isMapAggr()) { removeOperator(parent, cGBYr, getSingleParent(parent), context); procCtx.addRemovedOperator(cGBYr); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java index 7b5f9b2..56334ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java @@ -500,7 +500,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException { - Operator start = CorrelationUtilities.getStartForGroupBy(cRS); + Operator start = CorrelationUtilities.getStartForGroupBy(cRS, dedupCtx); GroupByOperator pGBY = CorrelationUtilities.findPossibleParent( start, GroupByOperator.class, dedupCtx.trustScript()); @@ -547,7 +547,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException { - Operator start = CorrelationUtilities.getStartForGroupBy(cRS); + Operator start = CorrelationUtilities.getStartForGroupBy(cRS, dedupCtx); JoinOperator pJoin = CorrelationUtilities.findPossibleParent( start, JoinOperator.class, dedupCtx.trustScript()); @@ -590,7 +590,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ReduceSinkDeduplicateProcCtx dedupCtx) throws SemanticException { - Operator start = CorrelationUtilities.getStartForGroupBy(cRS); + Operator start = CorrelationUtilities.getStartForGroupBy(cRS, dedupCtx); ReduceSinkOperator pRS = CorrelationUtilities.findPossibleParent( start, ReduceSinkOperator.class, dedupCtx.trustScript()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 5b469e3..5cbc428 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -641,7 +641,8 @@ Operator getOptimizedHiveOPDag() throws SemanticException { } RelNode modifiedOptimizedOptiqPlan = PlanModifierForReturnPath.convertOpTree( - introduceProjectIfNeeded(optimizedOptiqPlan), topLevelFieldSchema); + introduceProjectIfNeeded(optimizedOptiqPlan), topLevelFieldSchema, this.getQB() + .getTableDesc() != null); LOG.debug("Translating the following plan:\n" + RelOptUtil.toString(modifiedOptimizedOptiqPlan)); Operator hiveRoot = new HiveOpConverter(this, conf, unparseTranslator, topOps,