diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java index 4825a61..bfa3a31 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java @@ -66,8 +66,6 @@ import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; @@ -83,8 +81,6 @@ public class HiveCalciteUtil { - private static final Logger LOG = LoggerFactory.getLogger(HiveCalciteUtil.class); - /** * Get list of virtual columns from the given list of projections. diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java index c0609d7..0ed1c3a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdCollation; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistinctRowCount; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistribution; +import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDeterministicSelectivityCost; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdMemory; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdParallelism; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdPredicates; @@ -80,4 +81,11 @@ public RelMetadataProvider getMetadataProvider() { new DefaultRelMetadataProvider())); } + public RelMetadataProvider getDummyMetadataProvider() { + return ChainedRelMetadataProvider.of(ImmutableList + .of( + HiveRelMdDeterministicSelectivityCost.SOURCE, + new DefaultRelMetadataProvider())); + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java index 3c5cac2..4baaa9f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java @@ -64,6 +64,13 @@ public String toString() { } }; + public static final HiveCost LARGE = new HiveCost(1000.0, 1000.0, 0.0) { + @Override + public String toString() { + return "{large}"; + } + }; + public static final RelOptCostFactory FACTORY = new Factory(); // ~ Instance fields -------------------------------------------------------- diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java index d15d885..4af1f8d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java @@ -56,8 +56,8 @@ public RelOptCost getJoinCost(HiveJoin join) { JoinAlgorithm joinAlgorithm = null; RelOptCost minJoinCost = null; - if (LOG.isDebugEnabled()) { - LOG.debug("Join algorithm selection for:\n" + RelOptUtil.toString(join)); + if (LOG.isTraceEnabled()) { + LOG.trace("Join algorithm selection for:\n" + RelOptUtil.toString(join)); } for (JoinAlgorithm possibleAlgorithm : this.joinAlgorithms) { @@ -65,8 +65,8 @@ public RelOptCost getJoinCost(HiveJoin join) { continue; } RelOptCost joinCost = possibleAlgorithm.getCost(join); - if (LOG.isDebugEnabled()) { - LOG.debug(possibleAlgorithm + " cost: " + joinCost); + if (LOG.isTraceEnabled()) { + LOG.trace(possibleAlgorithm + " cost: " + joinCost); } if (minJoinCost == null || joinCost.isLt(minJoinCost) ) { joinAlgorithm = possibleAlgorithm; @@ -74,8 +74,8 @@ public RelOptCost getJoinCost(HiveJoin join) { } } - if (LOG.isDebugEnabled()) { - LOG.debug(joinAlgorithm + " selected"); + if (LOG.isTraceEnabled()) { + LOG.trace(joinAlgorithm + " selected"); } join.setJoinAlgorithm(joinAlgorithm); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveUnion.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveUnion.java index 8b57b35..3435e63 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveUnion.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveUnion.java @@ -26,7 +26,7 @@ import org.apache.calcite.rel.core.Union; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode.Implementor; -public class HiveUnion extends Union { +public class HiveUnion extends Union implements HiveRelNode { public HiveUnion(RelOptCluster cluster, RelTraitSet traits, List inputs) { super(cluster, traits, inputs, true); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinAddNotNullRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinAddNotNullRule.java index de880ce..c151619 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinAddNotNullRule.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinAddNotNullRule.java @@ -17,9 +17,9 @@ */ package org.apache.hadoop.hive.ql.optimizer.calcite.rules; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -32,21 +32,19 @@ import org.apache.calcite.rel.core.RelFactories.FilterFactory; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; -import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; -public final class HiveJoinAddNotNullRule extends RelOptRule { +import com.google.common.collect.Lists; + - private static final String NOT_NULL_FUNC_NAME = "isnotnull"; +public final class HiveJoinAddNotNullRule extends RelOptRule { /** The singleton. */ public static final HiveJoinAddNotNullRule INSTANCE = @@ -102,51 +100,42 @@ public void onMatch(RelOptRuleCall call) { final RelOptCluster cluster = join.getCluster(); final RexBuilder rexBuilder = join.getCluster().getRexBuilder(); - final Map newLeftConditions = getNotNullConditions(cluster, + final List newLeftConditions = getNotNullConditions(cluster, rexBuilder, leftInput, joinLeftKeyPositions); - final Map newRightConditions = getNotNullConditions(cluster, + final List newRightConditions = getNotNullConditions(cluster, rexBuilder, rightInput, joinRightKeyPositions); + RexNode newLeftPredicate = RexUtil.composeConjunction(rexBuilder, newLeftConditions, false); + RexNode newRightPredicate = RexUtil.composeConjunction(rexBuilder, newRightConditions, false); // Nothing will be added to the expression - if (newLeftConditions == null && newRightConditions == null) { + if (newLeftPredicate.isAlwaysTrue() && newRightPredicate.isAlwaysTrue()) { return; } - if (newLeftConditions != null) { - if (leftInput instanceof HiveFilter) { - leftInput = leftInput.getInput(0); - } - leftInput = createHiveFilterConjunctiveCondition(filterFactory, rexBuilder, - leftInput, newLeftConditions.values()); + RelNode newLeftInput; + if (!newLeftPredicate.isAlwaysTrue()) { + newLeftInput = filterFactory.createFilter(join.getLeft(), newLeftPredicate); + } else { + newLeftInput = join.getLeft(); } - if (newRightConditions != null) { - if (rightInput instanceof HiveFilter) { - rightInput = rightInput.getInput(0); - } - rightInput = createHiveFilterConjunctiveCondition(filterFactory, rexBuilder, - rightInput, newRightConditions.values()); + RelNode newRightInput; + if (!newRightPredicate.isAlwaysTrue()) { + newRightInput = filterFactory.createFilter(join.getRight(), newRightPredicate); + } else { + newRightInput = join.getRight(); } Join newJoin = join.copy(join.getTraitSet(), join.getCondition(), - leftInput, rightInput, join.getJoinType(), join.isSemiJoinDone()); + newLeftInput, newRightInput, join.getJoinType(), join.isSemiJoinDone()); call.getPlanner().onCopy(join, newJoin); call.transformTo(newJoin); } - private static Map getNotNullConditions(RelOptCluster cluster, + private static List getNotNullConditions(RelOptCluster cluster, RexBuilder rexBuilder, RelNode input, Set inputKeyPositions) { - - boolean added = false; - - final Map newConditions; - if (input instanceof HiveFilter) { - newConditions = splitCondition(((HiveFilter) input).getCondition()); - } - else { - newConditions = new HashMap(); - } + final Map newConditions = new HashMap(); for (int pos : inputKeyPositions) { RelDataType keyType = input.getRowType().getFieldList().get(pos).getType(); // Nothing to do if key cannot be null @@ -155,35 +144,9 @@ public void onMatch(RelOptRuleCall call) { } RexNode cond = rexBuilder.makeCall(SqlStdOperatorTable.IS_NOT_NULL, rexBuilder.makeInputRef(input, pos)); - String digest = cond.toString(); - if (!newConditions.containsKey(digest)) { - newConditions.put(digest,cond); - added = true; - } - } - // Nothing will be added to the expression - if (!added) { - return null; + newConditions.put(cond.toString(),cond); } - return newConditions; + return HiveCalciteUtil.getPredsNotPushedAlready(input, Lists.newArrayList(newConditions.values())); } - private static Map splitCondition(RexNode condition) { - Map newConditions = new HashMap(); - if (condition.getKind() == SqlKind.AND) { - for (RexNode node : ((RexCall) condition).getOperands()) { - newConditions.put(node.toString(), node); - } - } - else { - newConditions.put(condition.toString(), condition); - } - return newConditions; - } - - private static RelNode createHiveFilterConjunctiveCondition(FilterFactory filterFactory, - RexBuilder rexBuilder, RelNode input, Collection conditions) { - final RexNode newCondition = RexUtil.composeConjunction(rexBuilder, conditions, false); - return filterFactory.createFilter(input, newCondition); - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinPushTransitivePredicatesRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinPushTransitivePredicatesRule.java index 703c8c6..fd5e447 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinPushTransitivePredicatesRule.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinPushTransitivePredicatesRule.java @@ -55,6 +55,7 @@ * and applies them appropriately. */ public class HiveJoinPushTransitivePredicatesRule extends RelOptRule { + private final RelFactories.FilterFactory filterFactory; /** The singleton. */ @@ -72,12 +73,6 @@ public HiveJoinPushTransitivePredicatesRule(Class clazz, @Override public void onMatch(RelOptRuleCall call) { Join join = call.rel(0); - // Register that we have visited this operator in this rule - HiveRulesRegistry registry = call.getPlanner().getContext().unwrap(HiveRulesRegistry.class); - if (registry != null) { - registry.registerVisited(this, join); - } - RelOptPredicateList preds = RelMetadataQuery.getPulledUpPredicates(join); RexBuilder rB = join.getCluster().getRexBuilder(); @@ -87,19 +82,21 @@ public HiveJoinPushTransitivePredicatesRule(Class clazz, List leftPreds = getValidPreds(join.getCluster(), lChild, preds.leftInferredPredicates, lChild.getRowType()); List rightPreds = getValidPreds(join.getCluster(), rChild, preds.rightInferredPredicates, rChild.getRowType()); - if (leftPreds.isEmpty() && rightPreds.isEmpty()) { + RexNode newLeftPredicate = RexUtil.composeConjunction(rB, leftPreds, false); + RexNode newRightPredicate = RexUtil.composeConjunction(rB, rightPreds, false); + if (newLeftPredicate.isAlwaysTrue() && newRightPredicate.isAlwaysTrue()) { return; } - if (leftPreds.size() > 0) { + if (!newLeftPredicate.isAlwaysTrue()) { RelNode curr = lChild; - lChild = filterFactory.createFilter(lChild, RexUtil.composeConjunction(rB, leftPreds, false)); + lChild = filterFactory.createFilter(lChild, newLeftPredicate); call.getPlanner().onCopy(curr, lChild); } - if (rightPreds.size() > 0) { + if (!newRightPredicate.isAlwaysTrue()) { RelNode curr = rChild; - rChild = filterFactory.createFilter(rChild, RexUtil.composeConjunction(rB, rightPreds, false)); + rChild = filterFactory.createFilter(rChild, newRightPredicate); call.getPlanner().onCopy(curr, rChild); } @@ -107,11 +104,6 @@ public HiveJoinPushTransitivePredicatesRule(Class clazz, lChild, rChild, join.getJoinType(), join.isSemiJoinDone()); call.getPlanner().onCopy(join, newRel); - // We register new Join rel so we do not fire the rule on them again - if (registry != null) { - registry.registerVisited(this, newRel); - } - call.transformTo(newRel); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java index d37fc0e..44925a5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HivePreFilteringRule.java @@ -169,13 +169,13 @@ public void onMatch(RelOptRuleCall call) { // 3. If the new conjuncts are already present in the plan, we bail out final List newConjuncts = HiveCalciteUtil.getPredsNotPushedAlready(filter.getInput(), operandsToPushDown); - if (newConjuncts.isEmpty()) { + RexNode newPredicate = RexUtil.composeConjunction(rexBuilder, newConjuncts, false); + if (newPredicate.isAlwaysTrue()) { return; } // 4. Otherwise, we create a new condition - final RexNode newChildFilterCondition = RexUtil.pullFactors(rexBuilder, - RexUtil.composeConjunction(rexBuilder, newConjuncts, false)); + final RexNode newChildFilterCondition = RexUtil.pullFactors(rexBuilder, newPredicate); // 5. We create the new filter that might be pushed down RelNode newChildFilter = filterFactory.createFilter(filter.getInput(), newChildFilterCondition); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDeterministicSelectivityCost.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDeterministicSelectivityCost.java new file mode 100644 index 0000000..4fc2f1d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDeterministicSelectivityCost.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.stats; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider; +import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; +import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost; + +import com.google.common.collect.ImmutableList; + + +public class HiveRelMdDeterministicSelectivityCost { + + private static final HiveRelMdDeterministicSelectivityCost INSTANCE = + new HiveRelMdDeterministicSelectivityCost(); + + public static final RelMetadataProvider SOURCE = + ChainedRelMetadataProvider.of( + ImmutableList.of( + ReflectiveRelMetadataProvider.reflectiveSource( + BuiltInMethod.NON_CUMULATIVE_COST.method, INSTANCE), + ReflectiveRelMetadataProvider.reflectiveSource( + BuiltInMethod.DISTINCT_ROW_COUNT.method, INSTANCE), + ReflectiveRelMetadataProvider.reflectiveSource( + BuiltInMethod.ROW_COUNT.method, INSTANCE))); + + //~ Methods ---------------------------------------------------------------- + + private HiveRelMdDeterministicSelectivityCost() {} + + public RelOptCost getNonCumulativeCost(Join join) { + double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); + double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); + double rowCount = leftRCount * rightRCount * + RelMetadataQuery.getSelectivity(join, join.getCondition()); + return HiveCost.FACTORY.makeCost(rowCount, rowCount, 0.0); + } + + public RelOptCost getNonCumulativeCost(Filter filter) { + double dRows = RelMetadataQuery.getRowCount(filter); + double dCpu = RelMetadataQuery.getRowCount(filter.getInput()); + return HiveCost.FACTORY.makeCost(dRows, dCpu, 0.0); + } + + public RelOptCost getNonCumulativeCost(Project project) { + double dRows = RelMetadataQuery.getRowCount(project.getInput()); + double dCpu = dRows * project.getChildExps().size(); + return HiveCost.FACTORY.makeCost(dRows, dCpu, 0.0); + } + + public RelOptCost getNonCumulativeCost(Aggregate aggregate) { + double rowCount = RelMetadataQuery.getRowCount(aggregate); + // Aggregates with more aggregate functions cost a bit more + float multiplier = 1f + (float) aggregate.getAggCallList().size() * 0.125f; + for (AggregateCall aggCall : aggregate.getAggCallList()) { + if (aggCall.getAggregation().getName().equals("SUM")) { + // Pretend that SUM costs a little bit more than $SUM0, + // to make things deterministic. + multiplier += 0.0125f; + } + } + return HiveCost.FACTORY.makeCost(rowCount * multiplier, 0.0, 0.0); + } + + public RelOptCost getNonCumulativeCost(Sort sort) { + // Higher cost if rows are wider discourages pushing a project through a + // sort. + double rowCount = RelMetadataQuery.getRowCount(sort); + double bytesPerRow = sort.getRowType().getFieldCount() * 4; + return HiveCost.FACTORY.makeCost(Util.nLogN(rowCount) * bytesPerRow, rowCount, 0); + } + + public RelOptCost getNonCumulativeCost(TableScan tableScan) { + return HiveCost.LARGE; + } + + public RelOptCost getNonCumulativeCost(RelNode rel) { + // by default, assume cost is proportional to number of rows + double rowCount = RelMetadataQuery.getRowCount(rel); + return HiveCost.FACTORY.makeCost(rowCount, rowCount, 0); + } + + public Double getDistinctRowCount(TableScan rel, ImmutableBitSet groupKey, + RexNode predicate) { + return HiveCost.LARGE.getRows(); + } + + public Double getRowCount(TableScan rel) { + return HiveCost.LARGE.getRows(); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdPredicates.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdPredicates.java index b7244fd..ef4fbe2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdPredicates.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdPredicates.java @@ -17,6 +17,15 @@ */ package org.apache.hadoop.hive.ql.optimizer.calcite.stats; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; + import org.apache.calcite.linq4j.Linq4j; import org.apache.calcite.linq4j.Ord; import org.apache.calcite.linq4j.function.Predicate1; @@ -39,6 +48,7 @@ import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexPermutationShuttle; import org.apache.calcite.rex.RexPermuteInputsShuttle; import org.apache.calcite.rex.RexShuttle; import org.apache.calcite.rex.RexUtil; @@ -52,6 +62,8 @@ import org.apache.calcite.util.mapping.MappingType; import org.apache.calcite.util.mapping.Mappings; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.collect.HashMultimap; @@ -61,18 +73,12 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; - //TODO: Move this to calcite public class HiveRelMdPredicates extends RelMdPredicates { + + private static final Logger LOG = LoggerFactory.getLogger(HiveRelMdPredicates.class); + public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource( BuiltInMethod.PREDICATES.method, new HiveRelMdPredicates()); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 21423c1..77e3534 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -41,7 +41,6 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelOptPlanner.Executor; -import org.apache.calcite.plan.RelOptQuery; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.plan.RelOptUtil; @@ -77,14 +76,12 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexExecutorImpl; import org.apache.calcite.rex.RexFieldCollation; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; import org.apache.calcite.rex.RexWindowBound; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.Schemas; import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlExplainLevel; @@ -859,9 +856,10 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu HiveAlgorithmsConf algorithmsConf = new HiveAlgorithmsConf(maxSplitSize, maxMemory); HiveVolcanoPlannerContext confContext = new HiveVolcanoPlannerContext(algorithmsConf); RelOptPlanner planner = HiveVolcanoPlanner.createPlanner(confContext); - final RelOptQuery query = new RelOptQuery(planner); final RexBuilder rexBuilder = cluster.getRexBuilder(); - cluster = query.createCluster(rexBuilder.getTypeFactory(), rexBuilder); + cluster = RelOptCluster.create(planner, rexBuilder); + Executor executorProvider = new HiveRexExecutorImpl(cluster); + planner.setExecutor(executorProvider); this.cluster = cluster; this.relOptSchema = relOptSchema; @@ -884,12 +882,9 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu // Create MD provider HiveDefaultRelMetadataProvider mdProvider = new HiveDefaultRelMetadataProvider(conf); - // Create executor - Executor executorProvider = new HiveRexExecutorImpl(cluster); - // 2. Apply pre-join order optimizations calcitePreCboPlan = applyPreJoinOrderingTransforms(calciteGenPlan, - mdProvider.getMetadataProvider(), executorProvider); + mdProvider.getDummyMetadataProvider()); // 3. Apply join order optimizations: reordering MST algorithm // If join optimizations failed because of missing stats, we continue with @@ -941,7 +936,7 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu // 4. Run other optimizations that do not need stats perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), HepMatchOrder.BOTTOM_UP, ProjectRemoveRule.INSTANCE, UnionMergeRule.INSTANCE, new ProjectMergeRule(false, HiveRelFactories.HIVE_PROJECT_FACTORY), @@ -985,7 +980,7 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu // aggregation columns (HIVE-10627) if (profilesCBO.contains(ExtendedCBOProfile.WINDOWING_POSTPROCESSING)) { perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), HepMatchOrder.BOTTOM_UP, HiveWindowingFixRule.INSTANCE); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Window fixing rule"); } @@ -994,7 +989,7 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); // 8.1. Merge join into multijoin operators (if possible) - calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, true, mdProvider.getMetadataProvider(), null, + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, true, mdProvider.getMetadataProvider(), HepMatchOrder.BOTTOM_UP, HiveJoinProjectTransposeRule.BOTH_PROJECT_INCLUDE_OUTER, HiveJoinProjectTransposeRule.LEFT_PROJECT_INCLUDE_OUTER, HiveJoinProjectTransposeRule.RIGHT_PROJECT_INCLUDE_OUTER, @@ -1004,15 +999,15 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null, HiveRelFactories.HIVE_BUILDER.create(cluster, null)); calciteOptimizedPlan = fieldTrimmer.trim(calciteOptimizedPlan); - calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), HepMatchOrder.BOTTOM_UP, ProjectRemoveRule.INSTANCE, new ProjectMergeRule(false, HiveRelFactories.HIVE_PROJECT_FACTORY)); - calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, true, mdProvider.getMetadataProvider(), null, + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, true, mdProvider.getMetadataProvider(), new HiveFilterProjectTSTransposeRule(Filter.class, HiveRelFactories.HIVE_FILTER_FACTORY, HiveProject.class, HiveRelFactories.HIVE_PROJECT_FACTORY, HiveTableScan.class)); // 8.2. Introduce exchange operators below join/multijoin operators - calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), HepMatchOrder.BOTTOM_UP, HiveInsertExchange4JoinRule.EXCHANGE_BELOW_JOIN, HiveInsertExchange4JoinRule.EXCHANGE_BELOW_MULTIJOIN); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Translation from Calcite tree to Hive tree"); @@ -1041,7 +1036,7 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu * executor * @return */ - private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProvider mdProvider, Executor executorProvider) { + private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProvider mdProvider) { // TODO: Decorelation of subquery should be done before attempting // Partition Pruning; otherwise Expression evaluation may try to execute // corelated sub query. @@ -1056,7 +1051,7 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv // Its not clear, if this rewrite is always performant on MR, since extra map phase // introduced for 2nd MR job may offset gains of this multi-stage aggregation. // We need a cost model for MR to enable this on MR. - basePlan = hepPlan(basePlan, true, mdProvider, null, HiveExpandDistinctAggregatesRule.INSTANCE); + basePlan = hepPlan(basePlan, true, mdProvider, HiveExpandDistinctAggregatesRule.INSTANCE); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Prejoin ordering transformation, Distinct aggregate rewrite"); } @@ -1067,39 +1062,25 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv // Ex: select * from R1 join R2 where ((R1.x=R2.x) and R1.y<10) or // ((R1.x=R2.x) and R1.z=10)) and rand(1) < 0.1 perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - basePlan = hepPlan(basePlan, false, mdProvider, null, HepMatchOrder.ARBITRARY, + basePlan = hepPlan(basePlan, false, mdProvider, HepMatchOrder.ARBITRARY, HivePreFilteringRule.INSTANCE); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Prejoin ordering transformation, factor out common filter elements and separating deterministic vs non-deterministic UDF"); - // 3. PPD for old Join Syntax - // NOTE: PPD needs to run before adding not null filters in order to - // support old style join syntax (so that on-clauses will get filled up). - // TODO: Add in ReduceExpressionrules (Constant folding) to below once - // HIVE-11927 is fixed. + // 3. PPD, join transitive inference, constant folding perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - basePlan = hepPlan(basePlan, true, mdProvider, null, HiveFilterProjectTransposeRule.INSTANCE_DETERMINISTIC, + basePlan = volcanoPlan(basePlan, mdProvider, HiveFilterProjectTransposeRule.INSTANCE_DETERMINISTIC, HiveFilterSetOpTransposeRule.INSTANCE, HiveFilterSortTransposeRule.INSTANCE, HiveFilterJoinRule.JOIN, HiveFilterJoinRule.FILTER_ON_JOIN, new HiveFilterAggregateTransposeRule(Filter.class, - HiveRelFactories.HIVE_FILTER_FACTORY, Aggregate.class), new FilterMergeRule( - HiveRelFactories.HIVE_FILTER_FACTORY)); - perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, - "Calcite: Prejoin ordering transformation, PPD for old join syntax"); - - - // TODO: Transitive inference, constant prop & Predicate push down has to - // do multiple passes till no more inference is left - // Currently doing so would result in a spin. Just checking for if inferred - // pred is present below may not be sufficient as inferred & pushed pred - // could have been mutated by constant folding/prop - // 4. Transitive inference for join on clauses - perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - basePlan = hepPlan(basePlan, true, mdProvider, null, new HiveJoinPushTransitivePredicatesRule( - Join.class, HiveRelFactories.HIVE_FILTER_FACTORY)); + HiveRelFactories.HIVE_FILTER_FACTORY, Aggregate.class), + new FilterMergeRule(HiveRelFactories.HIVE_FILTER_FACTORY), + new HiveJoinPushTransitivePredicatesRule(Join.class, HiveRelFactories.HIVE_FILTER_FACTORY), + HiveReduceExpressionsRule.PROJECT_INSTANCE, HiveReduceExpressionsRule.FILTER_INSTANCE, + HiveReduceExpressionsRule.JOIN_INSTANCE, HiveJoinAddNotNullRule.INSTANCE); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, - "Calcite: Prejoin ordering transformation, Transitive inference for join on clauses"); + "Calcite: Prejoin ordering transformation, PPD, join transitive inference, constant folding"); - // 5. Push down limit through outer join + // 4. Push down limit through outer join // NOTE: We run this after PPD to support old style join syntax. // Ex: select * from R1 left outer join R2 where ((R1.x=R2.x) and R1.y<10) or // ((R1.x=R2.x) and R1.z=10)) and rand(1) < 0.1 order by R1.x limit 10 @@ -1111,56 +1092,30 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv HiveConf.ConfVars.HIVE_OPTIMIZE_LIMIT_TRANSPOSE_REDUCTION_PERCENTAGE); final long reductionTuples = HiveConf.getLongVar(conf, HiveConf.ConfVars.HIVE_OPTIMIZE_LIMIT_TRANSPOSE_REDUCTION_TUPLES); - basePlan = hepPlan(basePlan, true, mdProvider, null, HiveSortMergeRule.INSTANCE, + basePlan = hepPlan(basePlan, true, mdProvider, HiveSortMergeRule.INSTANCE, HiveSortProjectTransposeRule.INSTANCE, HiveSortJoinReduceRule.INSTANCE, HiveSortUnionReduceRule.INSTANCE); - basePlan = hepPlan(basePlan, true, mdProvider, null, HepMatchOrder.BOTTOM_UP, + basePlan = hepPlan(basePlan, true, mdProvider, HepMatchOrder.BOTTOM_UP, new HiveSortRemoveRule(reductionProportion, reductionTuples), HiveProjectSortTransposeRule.INSTANCE); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Prejoin ordering transformation, Push down limit through outer join"); } - // 6. Add not null filters + // 5. Push Down Semi Joins perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - basePlan = hepPlan(basePlan, true, mdProvider, null, HiveJoinAddNotNullRule.INSTANCE); - perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, - "Calcite: Prejoin ordering transformation, Add not null filters"); - - // 7. Rerun Constant propagation and PPD now that we have added Not NULL filters & did transitive inference - // TODO: Add in ReduceExpressionrules (Constant folding) to below once - // HIVE-11927 is fixed. - perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - basePlan = hepPlan(basePlan, true, mdProvider, null, HiveFilterProjectTransposeRule.INSTANCE_DETERMINISTIC, - HiveFilterSetOpTransposeRule.INSTANCE, HiveFilterSortTransposeRule.INSTANCE, HiveFilterJoinRule.JOIN, - HiveFilterJoinRule.FILTER_ON_JOIN, new HiveFilterAggregateTransposeRule(Filter.class, - HiveRelFactories.HIVE_FILTER_FACTORY, Aggregate.class), new FilterMergeRule( - HiveRelFactories.HIVE_FILTER_FACTORY)); - perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, - "Calcite: Prejoin ordering transformation, Constant propagation and PPD"); - - // 8. Push Down Semi Joins - perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - basePlan = hepPlan(basePlan, true, mdProvider, null, SemiJoinJoinTransposeRule.INSTANCE, + basePlan = hepPlan(basePlan, true, mdProvider, SemiJoinJoinTransposeRule.INSTANCE, SemiJoinFilterTransposeRule.INSTANCE, SemiJoinProjectTransposeRule.INSTANCE); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Prejoin ordering transformation, Push Down Semi Joins"); - // 9. Constant folding - perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - basePlan = hepPlan(basePlan, true, mdProvider, executorProvider, - HiveReduceExpressionsRule.PROJECT_INSTANCE, HiveReduceExpressionsRule.FILTER_INSTANCE, - HiveReduceExpressionsRule.JOIN_INSTANCE); - perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, - "Calcite: Prejoin ordering transformation, Constant folding"); - - // 10. Apply Partition Pruning + // 6. Apply Partition Pruning perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - basePlan = hepPlan(basePlan, false, mdProvider, null, new HivePartitionPruneRule(conf)); + basePlan = hepPlan(basePlan, false, mdProvider, new HivePartitionPruneRule(conf)); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Prejoin ordering transformation, Partition Pruning"); - // 11. Projection Pruning (this introduces select above TS & hence needs to be run last due to PP) + // 7. Projection Pruning (this introduces select above TS & hence needs to be run last due to PP) perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null, HiveRelFactories.HIVE_BUILDER.create(cluster, null)); @@ -1168,19 +1123,19 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Prejoin ordering transformation, Projection Pruning"); - // 12. Merge Project-Project if possible + // 8. Merge Project-Project if possible perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - basePlan = hepPlan(basePlan, false, mdProvider, null, new ProjectMergeRule(true, + basePlan = hepPlan(basePlan, false, mdProvider, new ProjectMergeRule(true, HiveRelFactories.HIVE_PROJECT_FACTORY)); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Prejoin ordering transformation, Merge Project-Project"); - // 13. Rerun PPD through Project as column pruning would have introduced + // 9. Rerun PPD through Project as column pruning would have introduced // DT above scans; By pushing filter just above TS, Hive can push it into // storage (incase there are filters on non partition cols). This only // matches FIL-PROJ-TS perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); - basePlan = hepPlan(basePlan, true, mdProvider, null, new HiveFilterProjectTSTransposeRule( + basePlan = hepPlan(basePlan, true, mdProvider, new HiveFilterProjectTSTransposeRule( Filter.class, HiveRelFactories.HIVE_FILTER_FACTORY, HiveProject.class, HiveRelFactories.HIVE_PROJECT_FACTORY, HiveTableScan.class)); perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, @@ -1200,8 +1155,8 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv * @return optimized RelNode */ private RelNode hepPlan(RelNode basePlan, boolean followPlanChanges, - RelMetadataProvider mdProvider, Executor executorProvider, RelOptRule... rules) { - return hepPlan(basePlan, followPlanChanges, mdProvider, executorProvider, + RelMetadataProvider mdProvider, RelOptRule... rules) { + return hepPlan(basePlan, followPlanChanges, mdProvider, HepMatchOrder.TOP_DOWN, rules); } @@ -1217,7 +1172,7 @@ private RelNode hepPlan(RelNode basePlan, boolean followPlanChanges, * @return optimized RelNode */ private RelNode hepPlan(RelNode basePlan, boolean followPlanChanges, - RelMetadataProvider mdProvider, Executor executorProvider, HepMatchOrder order, + RelMetadataProvider mdProvider, HepMatchOrder order, RelOptRule... rules) { RelNode optimizedRelNode = basePlan; @@ -1242,16 +1197,41 @@ private RelNode hepPlan(RelNode basePlan, boolean followPlanChanges, basePlan.getCluster().setMetadataProvider( new CachingRelMetadataProvider(chainedProvider, planner)); - if (executorProvider != null) { - basePlan.getCluster().getPlanner().setExecutor(executorProvider); - } - planner.setRoot(basePlan); optimizedRelNode = planner.findBestExp(); return optimizedRelNode; } + private RelNode volcanoPlan(RelNode basePlan, RelMetadataProvider mdProvider, + RelOptRule... rules) { + + RelNode optimizedRelNode = basePlan; + + // Metadata providers + List list = Lists.newArrayList(); + list.add(mdProvider); + basePlan.getCluster().getPlanner().registerMetadataProviders(list); + RelMetadataProvider chainedProvider = ChainedRelMetadataProvider.of(list); + basePlan.getCluster().setMetadataProvider( + new CachingRelMetadataProvider(chainedProvider, basePlan.getCluster().getPlanner())); + + // Register rules + for (int i = 0; i < rules.length; i++) { + basePlan.getCluster().getPlanner().addRule(rules[i]); + } + + basePlan.getCluster().getPlanner().setRoot(basePlan); + optimizedRelNode = basePlan.getCluster().getPlanner().findBestExp(); + + // Unregister rules + for (int i = 0; i < rules.length; i++) { + basePlan.getCluster().getPlanner().removeRule(rules[i]); + } + + return optimizedRelNode; + } + @SuppressWarnings("nls") private RelNode genUnionLogicalPlan(String unionalias, String leftalias, RelNode leftRel, String rightalias, RelNode rightRel) throws SemanticException {