diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java index 298855a..062ec9e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveCalciteUtil.java @@ -28,6 +28,7 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelOptUtil.InputReferencedVisitor; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.RelFactories.ProjectFactory; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.type.RelDataTypeField; @@ -42,7 +43,6 @@ import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Pair; import org.apache.calcite.util.Util; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.parse.ASTNode; @@ -311,11 +311,11 @@ public JoinPredicateInfo(List nonEquiJoinPredicateElement return this.mapOfProjIndxInJoinSchemaToLeafPInfo; } - public static JoinPredicateInfo constructJoinPredicateInfo(HiveJoin j) { + public static JoinPredicateInfo constructJoinPredicateInfo(Join j) { return constructJoinPredicateInfo(j, j.getCondition()); } - public static JoinPredicateInfo constructJoinPredicateInfo(HiveJoin j, RexNode predicate) { + public static JoinPredicateInfo constructJoinPredicateInfo(Join j, RexNode predicate) { JoinPredicateInfo jpi = null; JoinLeafPredicateInfo jlpi = null; List equiLPIList = new ArrayList(); @@ -453,7 +453,7 @@ public JoinLeafPredicateInfo(SqlKind comparisonType, List joinKeyExprsF return this.projsFromRightPartOfJoinKeysInJoinSchema; } - private static JoinLeafPredicateInfo constructJoinLeafPredicateInfo(HiveJoin j, RexNode pe) { + private static JoinLeafPredicateInfo constructJoinLeafPredicateInfo(Join j, RexNode pe) { JoinLeafPredicateInfo jlpi = null; List filterNulls = new ArrayList(); List joinKeyExprsFromLeft = new ArrayList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinAddNotNullRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinAddNotNullRule.java new file mode 100644 index 0000000..9db9d1d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveJoinAddNotNullRule.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.RelFactories.FilterFactory; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter; +import org.apache.hadoop.hive.ql.parse.SemanticException; + +import com.google.common.collect.ImmutableList; + +public final class HiveJoinAddNotNullRule extends RelOptRule { + + private static final String NOT_NULL_FUNC_NAME = "isnotnull"; + + /** The singleton. */ + public static final HiveJoinAddNotNullRule INSTANCE = + new HiveJoinAddNotNullRule(HiveFilter.DEFAULT_FILTER_FACTORY); + + private final FilterFactory filterFactory; + + //~ Constructors ----------------------------------------------------------- + + /** + * Creates an HiveJoinAddNotNullRule. + */ + public HiveJoinAddNotNullRule(FilterFactory filterFactory) { + super(operand(Join.class, + operand(RelNode.class, any()), + operand(RelNode.class, any()))); + this.filterFactory = filterFactory; + } + + //~ Methods ---------------------------------------------------------------- + + public void onMatch(RelOptRuleCall call) { + final Join join = call.rel(0); + RelNode leftInput = call.rel(1); + RelNode rightInput = call.rel(2); + + if (join.getJoinType() != JoinRelType.INNER) { + return; + } + + if (join.getCondition().isAlwaysTrue()) { + return; + } + + JoinPredicateInfo joinPredInfo = + HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join); + + Set joinLeftKeyPositions = new HashSet(); + Set joinRightKeyPositions = new HashSet(); + for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) { + JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo. + getEquiJoinPredicateElements().get(i); + joinLeftKeyPositions.addAll(joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema()); + joinRightKeyPositions.addAll(joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema()); + } + + // Build not null conditions + final RelOptCluster cluster = join.getCluster(); + final RexBuilder rexBuilder = join.getCluster().getRexBuilder(); + + final Map newLeftConditions = getNotNullConditions(cluster, + rexBuilder, leftInput, joinLeftKeyPositions); + final Map newRightConditions = getNotNullConditions(cluster, + rexBuilder, rightInput, joinRightKeyPositions); + + // Nothing will be added to the expression + if (newLeftConditions == null && newRightConditions == null) { + return; + } + + if (newLeftConditions != null) { + if (leftInput instanceof HiveFilter) { + leftInput = leftInput.getInput(0); + } + leftInput = createHiveFilterConjunctiveCondition(filterFactory, rexBuilder, + leftInput, newLeftConditions.values()); + } + if (newRightConditions != null) { + if (rightInput instanceof HiveFilter) { + rightInput = rightInput.getInput(0); + } + rightInput = createHiveFilterConjunctiveCondition(filterFactory, rexBuilder, + rightInput, newRightConditions.values()); + } + + Join newJoin = join.copy(join.getTraitSet(), join.getCondition(), + leftInput, rightInput, join.getJoinType(), join.isSemiJoinDone()); + + call.getPlanner().onCopy(join, newJoin); + + call.transformTo(newJoin); + } + + private static Map getNotNullConditions(RelOptCluster cluster, + RexBuilder rexBuilder, RelNode input, Set inputKeyPositions) { + + boolean added = false; + + final RelDataType returnType = cluster.getTypeFactory(). + createSqlType(SqlTypeName.BOOLEAN); + + final Map newConditions; + if (input instanceof HiveFilter) { + newConditions = splitCondition(((HiveFilter) input).getCondition()); + } + else { + newConditions = new HashMap(); + } + for (int pos : inputKeyPositions) { + try { + RelDataType keyType = input.getRowType().getFieldList().get(pos).getType(); + if (!keyType.isNullable()) { + continue; + } + SqlOperator funcCall = SqlFunctionConverter.getCalciteOperator(NOT_NULL_FUNC_NAME, + FunctionRegistry.getFunctionInfo(NOT_NULL_FUNC_NAME).getGenericUDF(), + ImmutableList.of(keyType), returnType); + RexNode cond = rexBuilder.makeCall(funcCall, rexBuilder.makeInputRef(input, pos)); + String digest = cond.toString(); + if (!newConditions.containsKey(digest)) { + newConditions.put(digest,cond); + added = true; + } + } catch (SemanticException e) { + throw new AssertionError(e.getMessage()); + } + } + // Nothing will be added to the expression + if (!added) { + return null; + } + return newConditions; + } + + private static Map splitCondition(RexNode condition) { + Map newConditions = new HashMap(); + if (condition.getKind() == SqlKind.AND) { + for (RexNode node : ((RexCall) condition).getOperands()) { + newConditions.put(node.toString(), node); + } + } + else { + newConditions.put(condition.toString(), condition); + } + return newConditions; + } + + private static RelNode createHiveFilterConjunctiveCondition(FilterFactory filterFactory, + RexBuilder rexBuilder, RelNode input, Collection conditions) { + final RexNode newCondition = RexUtil.composeConjunction(rexBuilder, conditions, false); + return filterFactory.createFilter(input, newCondition); + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 47a209f..b2fc1e6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -132,6 +132,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterJoinRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterProjectTransposeRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterSetOpTransposeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinAddNotNullRule; import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePartitionPruneRule; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinCondTypeCheckProcFactory; @@ -768,7 +769,10 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv basePlan = hepPlan(basePlan, true, mdProvider, SemiJoinJoinTransposeRule.INSTANCE, SemiJoinFilterTransposeRule.INSTANCE, SemiJoinProjectTransposeRule.INSTANCE); - // 2. PPD + // 2. Add not null filters + basePlan = hepPlan(basePlan, true, mdProvider, HiveJoinAddNotNullRule.INSTANCE); + + // 3. PPD basePlan = hepPlan(basePlan, true, mdProvider, ReduceExpressionsRule.PROJECT_INSTANCE, ReduceExpressionsRule.FILTER_INSTANCE, @@ -781,21 +785,21 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv HiveFilterJoinRule.FILTER_ON_JOIN, new FilterAggregateTransposeRule(Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY, Aggregate.class)); - // 3. Transitive inference & Partition Pruning + // 4. Transitive inference & Partition Pruning basePlan = hepPlan(basePlan, false, mdProvider, new JoinPushTransitivePredicatesRule( Join.class, HiveFilter.DEFAULT_FILTER_FACTORY), // TODO: Enable it after CALCITE-407 is fixed // RemoveTrivialProjectRule.INSTANCE, new HivePartitionPruneRule(conf)); - // 4. Projection Pruning + // 5. Projection Pruning RelFieldTrimmer fieldTrimmer = new RelFieldTrimmer(null, HiveProject.DEFAULT_PROJECT_FACTORY, HiveFilter.DEFAULT_FILTER_FACTORY, HiveJoin.HIVE_JOIN_FACTORY, RelFactories.DEFAULT_SEMI_JOIN_FACTORY, HiveSort.HIVE_SORT_REL_FACTORY, HiveAggregate.HIVE_AGGR_REL_FACTORY, HiveUnion.UNION_REL_FACTORY, true); basePlan = fieldTrimmer.trim(basePlan); - // 5. Rerun PPD through Project as column pruning would have introduced DT + // 6. Rerun PPD through Project as column pruning would have introduced DT // above scans basePlan = hepPlan(basePlan, true, mdProvider, new FilterProjectTransposeRule(Filter.class, HiveFilter.DEFAULT_FILTER_FACTORY,