diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java index 09f79bb..0e559e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java @@ -1,13 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql.optimizer.calcite; import org.apache.calcite.plan.Context; -import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveAlgorithmsConf; public class HiveConfigContext implements Context { - private HiveConf config; + private HiveAlgorithmsConf config; - public HiveConfigContext(HiveConf config) { + public HiveConfigContext(HiveAlgorithmsConf config) { this.config = config; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java index 8ede730..0648df8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java @@ -23,8 +23,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel; import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveDefaultCostModel; -import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveRelMdCost; import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveOnTezCostModel; +import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveRelMdCost; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdCollation; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistinctRowCount; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistribution; @@ -52,12 +52,9 @@ public RelMetadataProvider getMetadataProvider() { final HiveCostModel cm; if (HiveConf.getVar(this.hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") && HiveConf.getBoolVar(this.hiveConf, HiveConf.ConfVars.EXTENDED_COST_MODEL)) { - final Double maxMemory = (double) HiveConf.getLongVar( - this.hiveConf, - HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); - cm = new HiveOnTezCostModel(maxMemory); + cm = HiveOnTezCostModel.INSTANCE; } else { - cm = new HiveDefaultCostModel(); + cm = HiveDefaultCostModel.INSTANCE; } // Get max split size for HiveRelMdParallelism diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsConf.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsConf.java new file mode 100644 index 0000000..83454ea --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsConf.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.cost; + +public class HiveAlgorithmsConf { + + private Double maxSplitSize; + private Double maxMemory; + + + public HiveAlgorithmsConf(Double maxSplitSize, Double maxMemory) { + this.maxSplitSize = maxSplitSize; + this.maxMemory = maxMemory; + } + + public Double getMaxSplitSize() { + return maxSplitSize; + } + + public Double getMaxMemory() { + return maxMemory; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java new file mode 100644 index 0000000..8b2d3e6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.cost; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Pair; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelCollation; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelDistribution; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; + +import com.google.common.collect.ImmutableList; + +public class HiveAlgorithmsUtil { + + private static final double CPU_COST = 1.0; + private static final double NET_COST = 150.0 * CPU_COST; + private static final double LOCAL_WRITE_COST = 4.0 * NET_COST; + private static final double LOCAL_READ_COST = 4.0 * NET_COST; + private static final double HDFS_WRITE_COST = 10.0 * LOCAL_WRITE_COST; + private static final double HDFS_READ_COST = 1.5 * LOCAL_READ_COST; + + public static RelOptCost computeCardinalityBasedCost(HiveRelNode hr) { + return new HiveCost(hr.getRows(), 0, 0); + } + + public static HiveCost computeCost(HiveTableScan t) { + double cardinality = t.getRows(); + return new HiveCost(cardinality, 0, HDFS_WRITE_COST * cardinality * 0); + } + + public static double computeSortMergeCPUCost( + ImmutableList cardinalities, + ImmutableBitSet sorted) { + // Sort-merge join + double cpuCost = 0.0; + for (int i=0; i> relationInfos) { + // Sort-merge join + double ioCost = 0.0; + for (Pair relationInfo : relationInfos) { + ioCost += computeSortIOCost(relationInfo); + } + return ioCost; + } + + public static double computeSortIOCost(Pair relationInfo) { + // Sort-merge join + double ioCost = 0.0; + double cardinality = relationInfo.left; + double averageTupleSize = relationInfo.right; + // Write cost + ioCost += cardinality * averageTupleSize * LOCAL_WRITE_COST; + // Read cost + ioCost += cardinality * averageTupleSize * LOCAL_READ_COST; + // Net transfer cost + ioCost += cardinality * averageTupleSize * NET_COST; + return ioCost; + } + + public static double computeMapJoinCPUCost( + ImmutableList cardinalities, + ImmutableBitSet streaming) { + // Hash-join + double cpuCost = 0.0; + for (int i=0; i> relationInfos, + ImmutableBitSet streaming, int parallelism) { + // Hash-join + double ioCost = 0.0; + for (int i=0; i cardinalities, + ImmutableBitSet streaming) { + // Hash-join + double cpuCost = 0.0; + for (int i=0; i> relationInfos, + ImmutableBitSet streaming, int parallelism) { + // Hash-join + double ioCost = 0.0; + for (int i=0; i cardinalities) { + // Hash-join + double cpuCost = 0.0; + for (int i=0; i> relationInfos, + ImmutableBitSet streaming, int parallelism) { + // Hash-join + double ioCost = 0.0; + for (int i=0; i maxSize) { + return false; + } + return true; + } + return false; + } + + public static ImmutableList getJoinCollation(JoinPredicateInfo joinPredInfo, + MapJoinStreamingRelation streamingRelation) { + // Compute collations + ImmutableList.Builder collationListBuilder = + new ImmutableList.Builder(); + ImmutableList.Builder leftCollationListBuilder = + new ImmutableList.Builder(); + ImmutableList.Builder rightCollationListBuilder = + new ImmutableList.Builder(); + for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) { + JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo. + getEquiJoinPredicateElements().get(i); + for (int leftPos : joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema()) { + final RelFieldCollation leftFieldCollation = new RelFieldCollation(leftPos); + collationListBuilder.add(leftFieldCollation); + leftCollationListBuilder.add(leftFieldCollation); + } + for (int rightPos : joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema()) { + final RelFieldCollation rightFieldCollation = new RelFieldCollation(rightPos); + collationListBuilder.add(rightFieldCollation); + rightCollationListBuilder.add(rightFieldCollation); + } + } + + // Return join collations + final ImmutableList collation; + switch (streamingRelation) { + case LEFT_RELATION: + collation = ImmutableList.of( + RelCollationTraitDef.INSTANCE.canonize( + new HiveRelCollation(leftCollationListBuilder.build()))); + break; + case RIGHT_RELATION: + collation = ImmutableList.of( + RelCollationTraitDef.INSTANCE.canonize( + new HiveRelCollation(rightCollationListBuilder.build()))); + break; + default: + collation = ImmutableList.of( + RelCollationTraitDef.INSTANCE.canonize( + new HiveRelCollation(collationListBuilder.build()))); + break; + } + return collation; + } + + public static RelDistribution getJoinRedistribution(JoinPredicateInfo joinPredInfo) { + // Compute distribution + ImmutableList.Builder keysListBuilder = + new ImmutableList.Builder(); + for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) { + JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo. + getEquiJoinPredicateElements().get(i); + for (int leftPos : joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema()) { + keysListBuilder.add(leftPos); + } + for (int rightPos : joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema()) { + keysListBuilder.add(rightPos); + } + } + return new HiveRelDistribution( + RelDistribution.Type.HASH_DISTRIBUTED, keysListBuilder.build()); + } + + public static RelDistribution getJoinDistribution(JoinPredicateInfo joinPredInfo, + MapJoinStreamingRelation streamingRelation) { + // Compute distribution + ImmutableList.Builder leftKeysListBuilder = + new ImmutableList.Builder(); + ImmutableList.Builder rightKeysListBuilder = + new ImmutableList.Builder(); + for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) { + JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo. + getEquiJoinPredicateElements().get(i); + for (int leftPos : joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema()) { + leftKeysListBuilder.add(leftPos); + } + for (int rightPos : joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema()) { + rightKeysListBuilder.add(rightPos); + } + } + + RelDistribution distribution = null; + // Keep buckets from the streaming relation + if (streamingRelation == MapJoinStreamingRelation.LEFT_RELATION) { + distribution = new HiveRelDistribution( + RelDistribution.Type.HASH_DISTRIBUTED, leftKeysListBuilder.build()); + } else if (streamingRelation == MapJoinStreamingRelation.RIGHT_RELATION) { + distribution = new HiveRelDistribution( + RelDistribution.Type.HASH_DISTRIBUTED, rightKeysListBuilder.build()); + } + + return distribution; + } + + public static Double getJoinMemory(HiveJoin join) { + return getJoinMemory(join, join.getStreamingSide()); + } + + public static Double getJoinMemory(HiveJoin join, MapJoinStreamingRelation streamingSide) { + Double memory = 0.0; + if (streamingSide == MapJoinStreamingRelation.NONE || + streamingSide == MapJoinStreamingRelation.RIGHT_RELATION) { + // Left side + final Double leftAvgRowSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); + final Double leftRowCount = RelMetadataQuery.getRowCount(join.getLeft()); + if (leftAvgRowSize == null || leftRowCount == null) { + return null; + } + memory += leftAvgRowSize * leftRowCount; + } + if (streamingSide == MapJoinStreamingRelation.NONE || + streamingSide == MapJoinStreamingRelation.LEFT_RELATION) { + // Right side + final Double rightAvgRowSize = RelMetadataQuery.getAverageRowSize(join.getRight()); + final Double rightRowCount = RelMetadataQuery.getRowCount(join.getRight()); + if (rightAvgRowSize == null || rightRowCount == null) { + return null; + } + memory += rightAvgRowSize * rightRowCount; + } + return memory; + } + + public static Integer getSplitCountWithRepartition(HiveJoin join) { + final Double maxSplitSize = join.getCluster().getPlanner().getContext(). + unwrap(HiveAlgorithmsConf.class).getMaxSplitSize(); + // We repartition: new number of splits + final Double averageRowSize = RelMetadataQuery.getAverageRowSize(join); + final Double rowCount = RelMetadataQuery.getRowCount(join); + if (averageRowSize == null || rowCount == null) { + return null; + } + final Double totalSize = averageRowSize * rowCount; + final Double splitCount = totalSize / maxSplitSize; + return splitCount.intValue(); + } + + public static Integer getSplitCountWithoutRepartition(HiveJoin join) { + RelNode largeInput; + if (join.getStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { + largeInput = join.getLeft(); + } else if (join.getStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { + largeInput = join.getRight(); + } else { + return null; + } + return RelMetadataQuery.splitCount(largeInput); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java index 834922b..2f349ab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java @@ -17,15 +17,19 @@ */ package org.apache.hadoop.hive.ql.optimizer.calcite.cost; -import java.util.EnumSet; +import java.util.Set; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import com.google.common.collect.ImmutableList; + /** * Cost model interface. */ @@ -33,13 +37,11 @@ private static final Log LOG = LogFactory.getLog(HiveCostModel.class); - // NOTE: COMMON_JOIN & SMB_JOIN are Sort Merge Join (in case of COMMON_JOIN - // each parallel computation handles multiple splits where as in case of SMB - // each parallel computation handles one bucket). MAP_JOIN and BUCKET_JOIN is - // hash joins where MAP_JOIN keeps the whole data set of non streaming tables - // in memory where as BUCKET_JOIN keeps only the b - public enum JoinAlgorithm { - NONE, COMMON_JOIN, MAP_JOIN, BUCKET_JOIN, SMB_JOIN + private final Set joinAlgorithms; + + + public HiveCostModel(Set joinAlgorithms) { + this.joinAlgorithms = joinAlgorithms; } public abstract RelOptCost getDefaultCost(); @@ -47,17 +49,19 @@ public abstract RelOptCost getAggregateCost(HiveAggregate aggregate); public RelOptCost getJoinCost(HiveJoin join) { - // Retrieve algorithms - EnumSet possibleAlgorithms = getExecutableJoinAlgorithms(join); - // Select algorithm with min cost JoinAlgorithm joinAlgorithm = null; RelOptCost minJoinCost = null; + if (LOG.isDebugEnabled()) { LOG.debug("Join algorithm selection for:\n" + RelOptUtil.toString(join)); } - for (JoinAlgorithm possibleAlgorithm : possibleAlgorithms) { - RelOptCost joinCost = getJoinCost(join, possibleAlgorithm); + + for (JoinAlgorithm possibleAlgorithm : this.joinAlgorithms) { + if (!possibleAlgorithm.isExecutable(join)) { + continue; + } + RelOptCost joinCost = possibleAlgorithm.getCost(join); if (LOG.isDebugEnabled()) { LOG.debug(possibleAlgorithm + " cost: " + joinCost); } @@ -66,31 +70,30 @@ public RelOptCost getJoinCost(HiveJoin join) { minJoinCost = joinCost; } } - join.setJoinAlgorithm(joinAlgorithm); - join.setJoinCost(minJoinCost); + if (LOG.isDebugEnabled()) { LOG.debug(joinAlgorithm + " selected"); } + join.setJoinAlgorithm(joinAlgorithm); + join.setJoinCost(minJoinCost); + return minJoinCost; } /** - * Returns the possible algorithms for a given join operator. - * - * @param join the join operator - * @return a set containing all the possible join algorithms that can be - * executed for this join operator + * Interface for join algorithm. */ - abstract EnumSet getExecutableJoinAlgorithms(HiveJoin join); + public interface JoinAlgorithm { + public String getName(); + public boolean isExecutable(HiveJoin join); + public RelOptCost getCost(HiveJoin join); + public ImmutableList getCollation(HiveJoin join); + public RelDistribution getDistribution(HiveJoin join); + public Double getMemory(HiveJoin join); + public Double getCumulativeMemoryWithinPhaseSplit(HiveJoin join); + public Boolean isPhaseTransition(HiveJoin join); + public Integer getSplitCount(HiveJoin join); + } - /** - * Returns the cost for a given algorithm and execution engine. - * - * @param join the join operator - * @param algorithm the join algorithm - * @return the cost for the given algorithm, or null if the algorithm is not - * defined for this execution engine - */ - abstract RelOptCost getJoinCost(HiveJoin join, JoinAlgorithm algorithm); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java deleted file mode 100644 index 47ecd47..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.optimizer.calcite.cost; - -import org.apache.calcite.plan.RelOptCost; -import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.Pair; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; - -import com.google.common.collect.ImmutableList; - -// Use this once we have Join Algorithm selection -public class HiveCostUtil { - - private static final double CPU_COST = 1.0; - private static final double NET_COST = 150.0 * CPU_COST; - private static final double LOCAL_WRITE_COST = 4.0 * NET_COST; - private static final double LOCAL_READ_COST = 4.0 * NET_COST; - private static final double HDFS_WRITE_COST = 10.0 * LOCAL_WRITE_COST; - private static final double HDFS_READ_COST = 1.5 * LOCAL_READ_COST; - - public static RelOptCost computCardinalityBasedCost(HiveRelNode hr) { - return new HiveCost(hr.getRows(), 0, 0); - } - - public static HiveCost computeCost(HiveTableScan t) { - double cardinality = t.getRows(); - return new HiveCost(cardinality, 0, HDFS_WRITE_COST * cardinality * 0); - } - - public static double computeSortMergeCPUCost( - ImmutableList cardinalities, - ImmutableBitSet sorted) { - // Sort-merge join - double cpuCost = 0.0; - for (int i=0; i> relationInfos) { - // Sort-merge join - double ioCost = 0.0; - for (Pair relationInfo : relationInfos) { - ioCost += computeSortIOCost(relationInfo); - } - return ioCost; - } - - public static double computeSortIOCost(Pair relationInfo) { - // Sort-merge join - double ioCost = 0.0; - double cardinality = relationInfo.left; - double averageTupleSize = relationInfo.right; - // Write cost - ioCost += cardinality * averageTupleSize * LOCAL_WRITE_COST; - // Read cost - ioCost += cardinality * averageTupleSize * LOCAL_READ_COST; - // Net transfer cost - ioCost += cardinality * averageTupleSize * NET_COST; - return ioCost; - } - - public static double computeMapJoinCPUCost( - ImmutableList cardinalities, - ImmutableBitSet streaming) { - // Hash-join - double cpuCost = 0.0; - for (int i=0; i> relationInfos, - ImmutableBitSet streaming, int parallelism) { - // Hash-join - double ioCost = 0.0; - for (int i=0; i cardinalities, - ImmutableBitSet streaming) { - // Hash-join - double cpuCost = 0.0; - for (int i=0; i> relationInfos, - ImmutableBitSet streaming, int parallelism) { - // Hash-join - double ioCost = 0.0; - for (int i=0; i cardinalities) { - // Hash-join - double cpuCost = 0.0; - for (int i=0; i> relationInfos, - ImmutableBitSet streaming, int parallelism) { - // Hash-join - double ioCost = 0.0; - for (int i=0; i getExecutableJoinAlgorithms(HiveJoin join) { - return EnumSet.of(JoinAlgorithm.NONE); - } - @Override - protected RelOptCost getJoinCost(HiveJoin join, JoinAlgorithm algorithm) { - RelOptCost algorithmCost; - switch (algorithm) { - case NONE: - algorithmCost = computeJoinCardinalityCost(join); - break; - default: - algorithmCost = null; + /** + * Default join algorithm. Cost is based on cardinality. + */ + public static class DefaultJoinAlgorithm implements JoinAlgorithm { + + public static final JoinAlgorithm INSTANCE = new DefaultJoinAlgorithm(); + private static final String ALGORITHM_NAME = "None"; + + + @Override + public String getName() { + return ALGORITHM_NAME; + } + + @Override + public boolean isExecutable(HiveJoin join) { + return true; } - return algorithmCost; - } - private static RelOptCost computeJoinCardinalityCost(HiveJoin join) { - double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); - double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); - return HiveCost.FACTORY.makeCost(leftRCount + rightRCount, 0.0, 0.0); + @Override + public RelOptCost getCost(HiveJoin join) { + double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); + double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); + return HiveCost.FACTORY.makeCost(leftRCount + rightRCount, 0.0, 0.0); + } + + @Override + public ImmutableList getCollation(HiveJoin join) { + return null; + } + + @Override + public RelDistribution getDistribution(HiveJoin join) { + return null; + } + + @Override + public Double getMemory(HiveJoin join) { + return null; + } + + @Override + public Double getCumulativeMemoryWithinPhaseSplit(HiveJoin join) { + return null; + } + + @Override + public Boolean isPhaseTransition(HiveJoin join) { + return false; + } + + @Override + public Integer getSplitCount(HiveJoin join) { + return null; + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java index 80014f8..3df8955 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java @@ -18,12 +18,10 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.cost; import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelDistribution.Type; import org.apache.calcite.rel.RelNode; @@ -31,24 +29,28 @@ import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; import org.apache.calcite.util.Pair; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; /** * Cost model for Tez execution engine. */ public class HiveOnTezCostModel extends HiveCostModel { - private final Double maxMemory; + public static final HiveOnTezCostModel INSTANCE = + new HiveOnTezCostModel(); - - public HiveOnTezCostModel(Double maxMemory) { - this.maxMemory = maxMemory; + private HiveOnTezCostModel() { + super(Sets.newHashSet( + TezCommonJoinAlgorithm.INSTANCE, + TezMapJoinAlgorithm.INSTANCE, + TezBucketJoinAlgorithm.INSTANCE, + TezSMBJoinAlgorithm.INSTANCE)); } @Override @@ -67,7 +69,7 @@ public RelOptCost getAggregateCost(HiveAggregate aggregate) { return null; } // 2. CPU cost = sorting cost - final double cpuCost = HiveCostUtil.computeSortCPUCost(rCount); + final double cpuCost = HiveAlgorithmsUtil.computeSortCPUCost(rCount); // 3. IO cost = cost of writing intermediary results to local FS + // cost of reading from local FS for transferring to GBy + // cost of transferring map outputs to GBy operator @@ -75,35 +77,253 @@ public RelOptCost getAggregateCost(HiveAggregate aggregate) { if (rAverageSize == null) { return null; } - final double ioCost = HiveCostUtil.computeSortIOCost(new Pair(rCount,rAverageSize)); + final double ioCost = HiveAlgorithmsUtil.computeSortIOCost(new Pair(rCount,rAverageSize)); // 4. Result return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); } } - @Override - protected EnumSet getExecutableJoinAlgorithms(HiveJoin join) { - Set possibleAlgorithms = new HashSet(); - - // Check streaming side - RelNode smallInput; - if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { - smallInput = join.getRight(); - } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { - smallInput = join.getLeft(); - } else { - smallInput = null; + /** + * COMMON_JOIN is Sort Merge Join. Each parallel computation handles multiple + * splits. + */ + public static class TezCommonJoinAlgorithm implements JoinAlgorithm { + + public static final JoinAlgorithm INSTANCE = new TezCommonJoinAlgorithm(); + private static final String ALGORITHM_NAME = "CommonJoin"; + + + @Override + public String getName() { + return ALGORITHM_NAME; + } + + @Override + public boolean isExecutable(HiveJoin join) { + return true; + } + + @Override + public RelOptCost getCost(HiveJoin join) { + // 1. Sum of input cardinalities + final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); + final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); + if (leftRCount == null || rightRCount == null) { + return null; + } + final double rCount = leftRCount + rightRCount; + // 2. CPU cost = sorting cost (for each relation) + + // total merge cost + ImmutableList cardinalities = new ImmutableList.Builder(). + add(leftRCount). + add(rightRCount). + build(); + final double cpuCost = HiveAlgorithmsUtil.computeSortMergeCPUCost(cardinalities, join.getSortedInputs()); + // 3. IO cost = cost of writing intermediary results to local FS + + // cost of reading from local FS for transferring to join + + // cost of transferring map outputs to Join operator + final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); + final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); + if (leftRAverageSize == null || rightRAverageSize == null) { + return null; + } + ImmutableList> relationInfos = new ImmutableList.Builder>(). + add(new Pair(leftRCount,leftRAverageSize)). + add(new Pair(rightRCount,rightRAverageSize)). + build(); + final double ioCost = HiveAlgorithmsUtil.computeSortMergeIOCost(relationInfos); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); } - if (smallInput != null) { - // Requirements: - // - For SMB, sorted by their keys on both sides and bucketed. - // - For Bucket, bucketed by their keys on both sides. / Fitting in memory - // - For Map, no additional requirement. / Fitting in memory + @Override + public ImmutableList getCollation(HiveJoin join) { + return HiveAlgorithmsUtil.getJoinCollation(join.getJoinPredicateInfo(), + MapJoinStreamingRelation.NONE); + } + + @Override + public RelDistribution getDistribution(HiveJoin join) { + return HiveAlgorithmsUtil.getJoinRedistribution(join.getJoinPredicateInfo()); + } + + @Override + public Double getMemory(HiveJoin join) { + return HiveAlgorithmsUtil.getJoinMemory(join, MapJoinStreamingRelation.NONE); + } + @Override + public Double getCumulativeMemoryWithinPhaseSplit(HiveJoin join) { + final Double memoryWithinPhase = + RelMetadataQuery.cumulativeMemoryWithinPhase(join); + final Integer splitCount = RelMetadataQuery.splitCount(join); + if (memoryWithinPhase == null || splitCount == null) { + return null; + } + return memoryWithinPhase / splitCount; + } + + @Override + public Boolean isPhaseTransition(HiveJoin join) { + return true; + } + + @Override + public Integer getSplitCount(HiveJoin join) { + return HiveAlgorithmsUtil.getSplitCountWithRepartition(join); + } + } + + /** + * MAP_JOIN a hash join that keeps the whole data set of non streaming tables + * in memory. + */ + public static class TezMapJoinAlgorithm implements JoinAlgorithm { + + public static final JoinAlgorithm INSTANCE = new TezMapJoinAlgorithm(); + private static final String ALGORITHM_NAME = "MapJoin"; + + + @Override + public String getName() { + return ALGORITHM_NAME; + } + + @Override + public boolean isExecutable(HiveJoin join) { + final Double maxMemory = join.getCluster().getPlanner().getContext(). + unwrap(HiveAlgorithmsConf.class).getMaxMemory(); + // Check streaming side + RelNode smallInput = join.getStreamingInput(); + if (smallInput == null) { + return false; + } + return HiveAlgorithmsUtil.isFittingIntoMemory(maxMemory, smallInput, 1); + } + + @Override + public RelOptCost getCost(HiveJoin join) { + // 1. Sum of input cardinalities + final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); + final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); + if (leftRCount == null || rightRCount == null) { + return null; + } + final double rCount = leftRCount + rightRCount; + // 2. CPU cost = HashTable construction cost + + // join cost + ImmutableList cardinalities = new ImmutableList.Builder(). + add(leftRCount). + add(rightRCount). + build(); + ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); + switch (join.getStreamingSide()) { + case LEFT_RELATION: + streamingBuilder.set(0); + break; + case RIGHT_RELATION: + streamingBuilder.set(1); + break; + default: + return null; + } + ImmutableBitSet streaming = streamingBuilder.build(); + final double cpuCost = HiveAlgorithmsUtil.computeMapJoinCPUCost(cardinalities, streaming); + // 3. IO cost = cost of transferring small tables to join node * + // degree of parallelism + final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); + final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); + if (leftRAverageSize == null || rightRAverageSize == null) { + return null; + } + ImmutableList> relationInfos = new ImmutableList.Builder>(). + add(new Pair(leftRCount,leftRAverageSize)). + add(new Pair(rightRCount,rightRAverageSize)). + build(); + final int parallelism = RelMetadataQuery.splitCount(join) == null + ? 1 : RelMetadataQuery.splitCount(join); + final double ioCost = HiveAlgorithmsUtil.computeMapJoinIOCost(relationInfos, streaming, parallelism); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + } + + @Override + public ImmutableList getCollation(HiveJoin join) { + if (join.getStreamingSide() != MapJoinStreamingRelation.LEFT_RELATION + || join.getStreamingSide() != MapJoinStreamingRelation.RIGHT_RELATION) { + return null; + } + return HiveAlgorithmsUtil.getJoinCollation(join.getJoinPredicateInfo(), + join.getStreamingSide()); + } + + @Override + public RelDistribution getDistribution(HiveJoin join) { + if (join.getStreamingSide() != MapJoinStreamingRelation.LEFT_RELATION + || join.getStreamingSide() != MapJoinStreamingRelation.RIGHT_RELATION) { + return null; + } + return HiveAlgorithmsUtil.getJoinDistribution(join.getJoinPredicateInfo(), + join.getStreamingSide()); + } + + @Override + public Double getMemory(HiveJoin join) { + return HiveAlgorithmsUtil.getJoinMemory(join); + } + + @Override + public Double getCumulativeMemoryWithinPhaseSplit(HiveJoin join) { + // Check streaming side + RelNode inMemoryInput; + if (join.getStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { + inMemoryInput = join.getRight(); + } else if (join.getStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { + inMemoryInput = join.getLeft(); + } else { + return null; + } + // If simple map join, the whole relation goes in memory + return RelMetadataQuery.cumulativeMemoryWithinPhase(inMemoryInput); + } + + @Override + public Boolean isPhaseTransition(HiveJoin join) { + return false; + } + + @Override + public Integer getSplitCount(HiveJoin join) { + return HiveAlgorithmsUtil.getSplitCountWithoutRepartition(join); + } + } + + /** + * BUCKET_JOIN is a hash joins where one bucket of the non streaming tables + * is kept in memory at the time. + */ + public static class TezBucketJoinAlgorithm implements JoinAlgorithm { + + public static final JoinAlgorithm INSTANCE = new TezBucketJoinAlgorithm(); + private static final String ALGORITHM_NAME = "BucketJoin"; + + + @Override + public String getName() { + return ALGORITHM_NAME; + } + + @Override + public boolean isExecutable(HiveJoin join) { + final Double maxMemory = join.getCluster().getPlanner().getContext(). + unwrap(HiveAlgorithmsConf.class).getMaxMemory(); + // Check streaming side + RelNode smallInput = join.getStreamingInput(); + if (smallInput == null) { + return false; + } // Get key columns - JoinPredicateInfo joinPredInfo = HiveCalciteUtil.JoinPredicateInfo. - constructJoinPredicateInfo(join); + JoinPredicateInfo joinPredInfo = join.getJoinPredicateInfo(); List joinKeysInChildren = new ArrayList(); joinKeysInChildren.add( ImmutableIntList.copyOf( @@ -112,256 +332,255 @@ public RelOptCost getAggregateCost(HiveAggregate aggregate) { ImmutableIntList.copyOf( joinPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema())); + // Requirements: for Bucket, bucketed by their keys on both sides and fitting in memory // Obtain number of buckets Integer buckets = RelMetadataQuery.splitCount(smallInput); - // Obtain map algorithms for which smallest input fits in memory - boolean bucketFitsMemory = false; - boolean inputFitsMemory = false; - if (buckets != null) { - bucketFitsMemory = isFittingIntoMemory(maxMemory, smallInput, buckets); + if (buckets == null) { + return false; + } + if (!HiveAlgorithmsUtil.isFittingIntoMemory(maxMemory, smallInput, buckets)) { + return false; } - inputFitsMemory = bucketFitsMemory ? - isFittingIntoMemory(maxMemory, smallInput, 1) : false; - boolean orderedBucketed = true; - boolean bucketed = true; for (int i=0; i cardinalities = new ImmutableList.Builder(). + add(leftRCount). + add(rightRCount). + build(); + ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); + switch (join.getStreamingSide()) { + case LEFT_RELATION: + streamingBuilder.set(0); + break; + case RIGHT_RELATION: + streamingBuilder.set(1); + break; + default: + return null; } - if (inputFitsMemory) { - possibleAlgorithms.add(JoinAlgorithm.MAP_JOIN); + ImmutableBitSet streaming = streamingBuilder.build(); + final double cpuCost = HiveAlgorithmsUtil.computeBucketMapJoinCPUCost(cardinalities, streaming); + // 3. IO cost = cost of transferring small tables to join node * + // degree of parallelism + final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); + final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); + if (leftRAverageSize == null || rightRAverageSize == null) { + return null; } + ImmutableList> relationInfos = new ImmutableList.Builder>(). + add(new Pair(leftRCount,leftRAverageSize)). + add(new Pair(rightRCount,rightRAverageSize)). + build(); + final int parallelism = RelMetadataQuery.splitCount(join) == null + ? 1 : RelMetadataQuery.splitCount(join); + final double ioCost = HiveAlgorithmsUtil.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); } - // A reduce side (common) join does not have special - // requirements. - possibleAlgorithms.add(JoinAlgorithm.COMMON_JOIN); - return EnumSet.copyOf(possibleAlgorithms); - } - private static boolean isFittingIntoMemory(Double maxSize, RelNode input, int buckets) { - Double currentMemory = RelMetadataQuery.cumulativeMemoryWithinPhase(input); - if (currentMemory != null) { - if(currentMemory / buckets > maxSize) { - return false; + @Override + public ImmutableList getCollation(HiveJoin join) { + if (join.getStreamingSide() != MapJoinStreamingRelation.LEFT_RELATION + || join.getStreamingSide() != MapJoinStreamingRelation.RIGHT_RELATION) { + return null; } - return true; + return HiveAlgorithmsUtil.getJoinCollation(join.getJoinPredicateInfo(), + join.getStreamingSide()); } - return false; - } - @Override - protected RelOptCost getJoinCost(HiveJoin join, JoinAlgorithm algorithm) { - RelOptCost algorithmCost; - switch (algorithm) { - case COMMON_JOIN: - algorithmCost = computeCostCommonJoin(join); - break; - case MAP_JOIN: - algorithmCost = computeCostMapJoin(join); - break; - case BUCKET_JOIN: - algorithmCost = computeCostBucketJoin(join); - break; - case SMB_JOIN: - algorithmCost = computeCostSMBJoin(join); - break; - default: - algorithmCost = null; - } - return algorithmCost; - } + @Override + public RelDistribution getDistribution(HiveJoin join) { + return HiveAlgorithmsUtil.getJoinRedistribution(join.getJoinPredicateInfo()); + } - private static RelOptCost computeCostCommonJoin(HiveJoin join) { - // 1. Sum of input cardinalities - final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); - final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); - if (leftRCount == null || rightRCount == null) { - return null; - } - final double rCount = leftRCount + rightRCount; - // 2. CPU cost = sorting cost (for each relation) + - // total merge cost - ImmutableList cardinalities = new ImmutableList.Builder(). - add(leftRCount). - add(rightRCount). - build(); - final double cpuCost = HiveCostUtil.computeSortMergeCPUCost(cardinalities, join.getSortedInputs()); - // 3. IO cost = cost of writing intermediary results to local FS + - // cost of reading from local FS for transferring to join + - // cost of transferring map outputs to Join operator - final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); - final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); - if (leftRAverageSize == null || rightRAverageSize == null) { - return null; - } - ImmutableList> relationInfos = new ImmutableList.Builder>(). - add(new Pair(leftRCount,leftRAverageSize)). - add(new Pair(rightRCount,rightRAverageSize)). - build(); - final double ioCost = HiveCostUtil.computeSortMergeIOCost(relationInfos); - // 4. Result - return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); - } + @Override + public Double getMemory(HiveJoin join) { + return HiveAlgorithmsUtil.getJoinMemory(join); + } - private static RelOptCost computeCostMapJoin(HiveJoin join) { - // 1. Sum of input cardinalities - final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); - final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); - if (leftRCount == null || rightRCount == null) { - return null; - } - final double rCount = leftRCount + rightRCount; - // 2. CPU cost = HashTable construction cost + - // join cost - ImmutableList cardinalities = new ImmutableList.Builder(). - add(leftRCount). - add(rightRCount). - build(); - ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); - switch (join.getMapJoinStreamingSide()) { - case LEFT_RELATION: - streamingBuilder.set(0); - break; - case RIGHT_RELATION: - streamingBuilder.set(1); - break; - default: + @Override + public Double getCumulativeMemoryWithinPhaseSplit(HiveJoin join) { + // Check streaming side + RelNode inMemoryInput; + if (join.getStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { + inMemoryInput = join.getRight(); + } else if (join.getStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { + inMemoryInput = join.getLeft(); + } else { return null; + } + // If bucket map join, only a split goes in memory + final Double memoryInput = + RelMetadataQuery.cumulativeMemoryWithinPhase(inMemoryInput); + final Integer splitCount = RelMetadataQuery.splitCount(inMemoryInput); + if (memoryInput == null || splitCount == null) { + return null; + } + return memoryInput / splitCount; + } + + @Override + public Boolean isPhaseTransition(HiveJoin join) { + return false; + } + + @Override + public Integer getSplitCount(HiveJoin join) { + return HiveAlgorithmsUtil.getSplitCountWithoutRepartition(join); } - ImmutableBitSet streaming = streamingBuilder.build(); - final double cpuCost = HiveCostUtil.computeMapJoinCPUCost(cardinalities, streaming); - // 3. IO cost = cost of transferring small tables to join node * - // degree of parallelism - final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); - final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); - if (leftRAverageSize == null || rightRAverageSize == null) { - return null; - } - ImmutableList> relationInfos = new ImmutableList.Builder>(). - add(new Pair(leftRCount,leftRAverageSize)). - add(new Pair(rightRCount,rightRAverageSize)). - build(); - final int parallelism = RelMetadataQuery.splitCount(join) == null - ? 1 : RelMetadataQuery.splitCount(join); - final double ioCost = HiveCostUtil.computeMapJoinIOCost(relationInfos, streaming, parallelism); - // 4. Result - return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); } - private static RelOptCost computeCostBucketJoin(HiveJoin join) { - // 1. Sum of input cardinalities - final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); - final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); - if (leftRCount == null || rightRCount == null) { - return null; - } - final double rCount = leftRCount + rightRCount; - // 2. CPU cost = HashTable construction cost + - // join cost - ImmutableList cardinalities = new ImmutableList.Builder(). - add(leftRCount). - add(rightRCount). - build(); - ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); - switch (join.getMapJoinStreamingSide()) { - case LEFT_RELATION: - streamingBuilder.set(0); - break; - case RIGHT_RELATION: - streamingBuilder.set(1); - break; - default: + /** + * SMB_JOIN is a Sort Merge Join. Each parallel computation handles one bucket. + */ + public static class TezSMBJoinAlgorithm implements JoinAlgorithm { + + public static final JoinAlgorithm INSTANCE = new TezSMBJoinAlgorithm(); + private static final String ALGORITHM_NAME = "SMBJoin"; + + + @Override + public String getName() { + return ALGORITHM_NAME; + } + + @Override + public boolean isExecutable(HiveJoin join) { + // Requirements: for SMB, sorted by their keys on both sides and bucketed. + // Get key columns + JoinPredicateInfo joinPredInfo = join.getJoinPredicateInfo(); + List joinKeysInChildren = new ArrayList(); + joinKeysInChildren.add( + ImmutableIntList.copyOf( + joinPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema())); + joinKeysInChildren.add( + ImmutableIntList.copyOf( + joinPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema())); + + for (int i=0; i cardinalities = new ImmutableList.Builder(). + add(leftRCount). + add(rightRCount). + build(); + ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); + switch (join.getStreamingSide()) { + case LEFT_RELATION: + streamingBuilder.set(0); + break; + case RIGHT_RELATION: + streamingBuilder.set(1); + break; + default: + return null; + } + ImmutableBitSet streaming = streamingBuilder.build(); + final double cpuCost = HiveAlgorithmsUtil.computeSMBMapJoinCPUCost(cardinalities); + // 3. IO cost = cost of transferring small tables to join node * + // degree of parallelism + final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); + final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); + if (leftRAverageSize == null || rightRAverageSize == null) { + return null; + } + ImmutableList> relationInfos = new ImmutableList.Builder>(). + add(new Pair(leftRCount,leftRAverageSize)). + add(new Pair(rightRCount,rightRAverageSize)). + build(); + final int parallelism = RelMetadataQuery.splitCount(join) == null + ? 1 : RelMetadataQuery.splitCount(join); + final double ioCost = HiveAlgorithmsUtil.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + } + + @Override + public ImmutableList getCollation(HiveJoin join) { + return HiveAlgorithmsUtil.getJoinCollation(join.getJoinPredicateInfo(), + MapJoinStreamingRelation.NONE); } - ImmutableBitSet streaming = streamingBuilder.build(); - final double cpuCost = HiveCostUtil.computeBucketMapJoinCPUCost(cardinalities, streaming); - // 3. IO cost = cost of transferring small tables to join node * - // degree of parallelism - final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); - final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); - if (leftRAverageSize == null || rightRAverageSize == null) { - return null; - } - ImmutableList> relationInfos = new ImmutableList.Builder>(). - add(new Pair(leftRCount,leftRAverageSize)). - add(new Pair(rightRCount,rightRAverageSize)). - build(); - final int parallelism = RelMetadataQuery.splitCount(join) == null - ? 1 : RelMetadataQuery.splitCount(join); - final double ioCost = HiveCostUtil.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism); - // 4. Result - return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); - } - private static RelOptCost computeCostSMBJoin(HiveJoin join) { - // 1. Sum of input cardinalities - final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); - final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); - if (leftRCount == null || rightRCount == null) { - return null; - } - final double rCount = leftRCount + rightRCount; - // 2. CPU cost = HashTable construction cost + - // join cost - ImmutableList cardinalities = new ImmutableList.Builder(). - add(leftRCount). - add(rightRCount). - build(); - ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); - switch (join.getMapJoinStreamingSide()) { - case LEFT_RELATION: - streamingBuilder.set(0); - break; - case RIGHT_RELATION: - streamingBuilder.set(1); - break; - default: + @Override + public RelDistribution getDistribution(HiveJoin join) { + return HiveAlgorithmsUtil.getJoinRedistribution(join.getJoinPredicateInfo()); + } + + @Override + public Double getMemory(HiveJoin join) { + return 0.0; + } + + @Override + public Double getCumulativeMemoryWithinPhaseSplit(HiveJoin join) { + final Double memoryWithinPhase = + RelMetadataQuery.cumulativeMemoryWithinPhase(join); + final Integer splitCount = RelMetadataQuery.splitCount(join); + if (memoryWithinPhase == null || splitCount == null) { return null; + } + return memoryWithinPhase / splitCount; + } + + @Override + public Boolean isPhaseTransition(HiveJoin join) { + return false; + } + + @Override + public Integer getSplitCount(HiveJoin join) { + return HiveAlgorithmsUtil.getSplitCountWithoutRepartition(join); } - ImmutableBitSet streaming = streamingBuilder.build(); - final double cpuCost = HiveCostUtil.computeSMBMapJoinCPUCost(cardinalities); - // 3. IO cost = cost of transferring small tables to join node * - // degree of parallelism - final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); - final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); - if (leftRAverageSize == null || rightRAverageSize == null) { - return null; - } - ImmutableList> relationInfos = new ImmutableList.Builder>(). - add(new Pair(leftRCount,leftRAverageSize)). - add(new Pair(rightRCount,rightRAverageSize)). - build(); - final int parallelism = RelMetadataQuery.splitCount(join) == null - ? 1 : RelMetadataQuery.splitCount(join); - final double ioCost = HiveCostUtil.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism); - // 4. Result - return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java index b937fde..f28cbc4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java @@ -27,7 +27,9 @@ import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.InvalidRelException; +import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.Join; @@ -43,17 +45,21 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel.JoinAlgorithm; +import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveDefaultCostModel.DefaultJoinAlgorithm; + +import com.google.common.collect.ImmutableList; //TODO: Should we convert MultiJoin to be a child of HiveJoin public class HiveJoin extends Join implements HiveRelNode { + + public static final JoinFactory HIVE_JOIN_FACTORY = new HiveJoinFactoryImpl(); public enum MapJoinStreamingRelation { NONE, LEFT_RELATION, RIGHT_RELATION } - public static final JoinFactory HIVE_JOIN_FACTORY = new HiveJoinFactoryImpl(); - private final boolean leftSemiJoin; + private final JoinPredicateInfo joinPredInfo; private JoinAlgorithm joinAlgorithm; private RelOptCost joinCost; @@ -63,7 +69,7 @@ public static HiveJoin getJoin(RelOptCluster cluster, RelNode left, RelNode righ try { Set variablesStopped = Collections.emptySet(); HiveJoin join = new HiveJoin(cluster, null, left, right, condition, joinType, variablesStopped, - JoinAlgorithm.NONE, leftSemiJoin); + DefaultJoinAlgorithm.INSTANCE, leftSemiJoin); return join; } catch (InvalidRelException e) { throw new RuntimeException(e); @@ -75,6 +81,7 @@ protected HiveJoin(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelN JoinAlgorithm joinAlgo, boolean leftSemiJoin) throws InvalidRelException { super(cluster, TraitsUtil.getDefaultTraitSet(cluster), left, right, condition, joinType, variablesStopped); + this.joinPredInfo = HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(this); this.joinAlgorithm = joinAlgo; this.leftSemiJoin = leftSemiJoin; } @@ -97,15 +104,43 @@ public final HiveJoin copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode } } - public JoinAlgorithm getJoinAlgorithm() { - return joinAlgorithm; + public JoinPredicateInfo getJoinPredicateInfo() { + return joinPredInfo; } public void setJoinAlgorithm(JoinAlgorithm joinAlgorithm) { this.joinAlgorithm = joinAlgorithm; } - public MapJoinStreamingRelation getMapJoinStreamingSide() { + public String getJoinAlgorithmName() { + return joinAlgorithm.getName(); + } + + public ImmutableList getCollation() { + return joinAlgorithm.getCollation(this); + } + + public RelDistribution getDistribution() { + return joinAlgorithm.getDistribution(this); + } + + public Double getMemory() { + return joinAlgorithm.getMemory(this); + } + + public Double getCumulativeMemoryWithinPhaseSplit() { + return joinAlgorithm.getCumulativeMemoryWithinPhaseSplit(this); + } + + public Boolean isPhaseTransition() { + return joinAlgorithm.isPhaseTransition(this); + } + + public Integer getSplitCount() { + return joinAlgorithm.getSplitCount(this); + } + + public MapJoinStreamingRelation getStreamingSide() { Double leftInputSize = RelMetadataQuery.memory(left); Double rightInputSize = RelMetadataQuery.memory(right); if (leftInputSize == null && rightInputSize == null) { @@ -122,8 +157,17 @@ public MapJoinStreamingRelation getMapJoinStreamingSide() { return MapJoinStreamingRelation.NONE; } - public void setJoinCost(RelOptCost joinCost) { - this.joinCost = joinCost; + public RelNode getStreamingInput() { + MapJoinStreamingRelation mapJoinStreamingSide = getStreamingSide(); + RelNode smallInput; + if (mapJoinStreamingSide == MapJoinStreamingRelation.LEFT_RELATION) { + smallInput = this.getRight(); + } else if (mapJoinStreamingSide == MapJoinStreamingRelation.RIGHT_RELATION) { + smallInput = this.getLeft(); + } else { + smallInput = null; + } + return smallInput; } public ImmutableBitSet getSortedInputs() { @@ -149,6 +193,10 @@ public ImmutableBitSet getSortedInputs() { return sortedInputsBuilder.build(); } + public void setJoinCost(RelOptCost joinCost) { + this.joinCost = joinCost; + } + public boolean isLeftSemiJoin() { return leftSemiJoin; } @@ -164,7 +212,8 @@ public RelOptCost computeSelfCost(RelOptPlanner planner) { @Override public RelWriter explainTerms(RelWriter pw) { return super.explainTerms(pw) - .item("joinAlgorithm", joinAlgorithm.name().toLowerCase()) + .item("joinAlgorithm", joinAlgorithm == null ? + "None" : joinAlgorithm.getName()) .item("cost", joinCost); } @@ -206,4 +255,5 @@ public RelNode createJoin(RelNode left, RelNode right, RexNode condition, JoinRe return getJoin(left.getCluster(), left, right, condition, joinType, false); } } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java index addb3a4..84fa518 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java @@ -25,13 +25,9 @@ import org.apache.calcite.rel.metadata.RelMdCollation; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.util.BuiltInMethod; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelCollation; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; import com.google.common.collect.ImmutableList; @@ -65,58 +61,7 @@ private HiveRelMdCollation() {} } public ImmutableList collations(HiveJoin join) { - // Compute collations - ImmutableList.Builder collationListBuilder = - new ImmutableList.Builder(); - ImmutableList.Builder leftCollationListBuilder = - new ImmutableList.Builder(); - ImmutableList.Builder rightCollationListBuilder = - new ImmutableList.Builder(); - JoinPredicateInfo joinPredInfo = - HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join); - for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) { - JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo. - getEquiJoinPredicateElements().get(i); - for (int leftPos : joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema()) { - final RelFieldCollation leftFieldCollation = new RelFieldCollation(leftPos); - collationListBuilder.add(leftFieldCollation); - leftCollationListBuilder.add(leftFieldCollation); - } - for (int rightPos : joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema()) { - final RelFieldCollation rightFieldCollation = new RelFieldCollation(rightPos); - collationListBuilder.add(rightFieldCollation); - rightCollationListBuilder.add(rightFieldCollation); - } - } - - // Return join collations - final ImmutableList collation; - switch (join.getJoinAlgorithm()) { - case SMB_JOIN: - case COMMON_JOIN: - collation = ImmutableList.of( - RelCollationTraitDef.INSTANCE.canonize( - new HiveRelCollation(collationListBuilder.build()))); - break; - case BUCKET_JOIN: - case MAP_JOIN: - // Keep order from the streaming relation - if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { - collation = ImmutableList.of( - RelCollationTraitDef.INSTANCE.canonize( - new HiveRelCollation(leftCollationListBuilder.build()))); - } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { - collation = ImmutableList.of( - RelCollationTraitDef.INSTANCE.canonize( - new HiveRelCollation(rightCollationListBuilder.build()))); - } else { - collation = null; - } - break; - default: - collation = null; - } - return collation; + return join.getCollation(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java index a195397..b83f240 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java @@ -23,13 +23,9 @@ import org.apache.calcite.rel.metadata.RelMdDistribution; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.util.BuiltInMethod; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelDistribution; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; import com.google.common.collect.ImmutableList; @@ -54,52 +50,7 @@ public RelDistribution distribution(HiveAggregate aggregate) { } public RelDistribution distribution(HiveJoin join) { - // Compute distribution - ImmutableList.Builder keysListBuilder = - new ImmutableList.Builder(); - ImmutableList.Builder leftKeysListBuilder = - new ImmutableList.Builder(); - ImmutableList.Builder rightKeysListBuilder = - new ImmutableList.Builder(); - JoinPredicateInfo joinPredInfo = - HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join); - for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) { - JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo. - getEquiJoinPredicateElements().get(i); - for (int leftPos : joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema()) { - keysListBuilder.add(leftPos); - leftKeysListBuilder.add(leftPos); - } - for (int rightPos : joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema()) { - keysListBuilder.add(rightPos); - rightKeysListBuilder.add(rightPos); - } - } - - RelDistribution distribution; - switch (join.getJoinAlgorithm()) { - case SMB_JOIN: - case BUCKET_JOIN: - case COMMON_JOIN: - distribution = new HiveRelDistribution( - RelDistribution.Type.HASH_DISTRIBUTED, keysListBuilder.build()); - break; - case MAP_JOIN: - // Keep buckets from the streaming relation - if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { - distribution = new HiveRelDistribution( - RelDistribution.Type.HASH_DISTRIBUTED, leftKeysListBuilder.build()); - } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { - distribution = new HiveRelDistribution( - RelDistribution.Type.HASH_DISTRIBUTED, rightKeysListBuilder.build()); - } else { - distribution = null; - } - break; - default: - distribution = null; - } - return distribution; + return join.getDistribution(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java index 94c22e3..1a2e6d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java @@ -18,17 +18,14 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.stats; import org.apache.calcite.rel.RelCollations; -import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMdMemory; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.BuiltInMethod; -import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel.JoinAlgorithm; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveLimit; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; @@ -69,73 +66,11 @@ public Double memory(HiveFilter filter) { } public Double memory(HiveJoin join) { - Double memory = 0.0; - if (join.getJoinAlgorithm() == JoinAlgorithm.COMMON_JOIN) { - // Left side - final Double leftAvgRowSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); - final Double leftRowCount = RelMetadataQuery.getRowCount(join.getLeft()); - if (leftAvgRowSize == null || leftRowCount == null) { - return null; - } - memory += leftAvgRowSize * leftRowCount; - // Right side - final Double rightAvgRowSize = RelMetadataQuery.getAverageRowSize(join.getRight()); - final Double rightRowCount = RelMetadataQuery.getRowCount(join.getRight()); - if (rightAvgRowSize == null || rightRowCount == null) { - return null; - } - memory += rightAvgRowSize * rightRowCount; - } else if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN || - join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN) { - RelNode inMemoryInput; - if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { - inMemoryInput = join.getRight(); - } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { - inMemoryInput = join.getLeft(); - } else { - return null; - } - // Result - final Double avgRowSize = RelMetadataQuery.getAverageRowSize(inMemoryInput); - final Double rowCount = RelMetadataQuery.getRowCount(inMemoryInput); - if (avgRowSize == null || rowCount == null) { - return null; - } - memory = avgRowSize * rowCount; - } - return memory; + return join.getMemory(); } public Double cumulativeMemoryWithinPhaseSplit(HiveJoin join) { - if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN || - join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN) { - // Check streaming side - RelNode inMemoryInput; - if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { - inMemoryInput = join.getRight(); - } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { - inMemoryInput = join.getLeft(); - } else { - return null; - } - - if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN) { - // If simple map join, the whole relation goes in memory - return RelMetadataQuery.cumulativeMemoryWithinPhase(inMemoryInput); - } - else if (join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN) { - // If bucket map join, only a split goes in memory - final Double memoryInput = - RelMetadataQuery.cumulativeMemoryWithinPhase(inMemoryInput); - final Integer splitCount = RelMetadataQuery.splitCount(inMemoryInput); - if (memoryInput == null || splitCount == null) { - return null; - } - return memoryInput / splitCount; - } - } - // Else, we fall back to default - return super.cumulativeMemoryWithinPhaseSplit(join); + return join.getCumulativeMemoryWithinPhaseSplit(); } public Double memory(HiveLimit limit) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java index 3bc35f7..96ca5ec 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java @@ -24,9 +24,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.BuiltInMethod; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; -import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel.JoinAlgorithm; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; @@ -49,12 +47,7 @@ public RelMetadataProvider getMetadataProvider() { //~ Methods ---------------------------------------------------------------- public Boolean isPhaseTransition(HiveJoin join) { - // As Exchange operator is introduced later on, we make a - // common join operator create a new stage for the moment - if (join.getJoinAlgorithm() == JoinAlgorithm.COMMON_JOIN) { - return true; - } - return false; + return join.isPhaseTransition(); } public Boolean isPhaseTransition(HiveSort sort) { @@ -64,23 +57,7 @@ public Boolean isPhaseTransition(HiveSort sort) { } public Integer splitCount(HiveJoin join) { - if (join.getJoinAlgorithm() == JoinAlgorithm.COMMON_JOIN) { - return splitCountRepartition(join); - } - else if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN || - join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN || - join.getJoinAlgorithm() == JoinAlgorithm.SMB_JOIN) { - RelNode largeInput; - if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { - largeInput = join.getLeft(); - } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { - largeInput = join.getRight(); - } else { - return null; - } - return splitCount(largeInput); - } - return null; + return join.getSplitCount(); } public Integer splitCount(HiveTableScan scan) { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 2f6a26f..60d4677 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -118,13 +118,14 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveConfigContext; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveDefaultRelMetadataProvider; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTypeSystemImpl; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature; +import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveAlgorithmsConf; import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveVolcanoPlanner; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; @@ -780,7 +781,12 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu /* * recreate cluster, so that it picks up the additional traitDef */ - HiveConfigContext confContext = new HiveConfigContext(conf); + final Double maxSplitSize = (double) HiveConf.getLongVar( + conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE); + final Double maxMemory = (double) HiveConf.getLongVar( + conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + HiveAlgorithmsConf algorithmsConf = new HiveAlgorithmsConf(maxSplitSize, maxMemory); + HiveConfigContext confContext = new HiveConfigContext(algorithmsConf); RelOptPlanner planner = HiveVolcanoPlanner.createPlanner(confContext); final RelOptQuery query = new RelOptQuery(planner); final RexBuilder rexBuilder = cluster.getRexBuilder();