diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7adb383..abd5f0d 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -703,6 +703,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { // CBO related HIVE_CBO_ENABLED("hive.cbo.enable", true, "Flag to control enabling Cost Based Optimizations using Calcite framework."), HIVE_CBO_RETPATH_HIVEOP("hive.cbo.returnpath.hiveop", false, "Flag to control calcite plan to hive operator conversion"), + EXTENDED_COST_MODEL("hive.cbo.costmodel.extended", false, "Flag to control enabling the extended cost model based on" + + "CPU, IO and cardinality. Otherwise, the cost model is based on cardinality."), // hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row, // need to remove by hive .13. Also, do not change default (see SMB operator) diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java index 977313a..2a9474a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java @@ -21,6 +21,10 @@ import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostModel; +import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveDefaultCostModel; +import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveRelMdCost; +import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveOnTezCostModel; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdCollation; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistinctRowCount; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistribution; @@ -44,13 +48,27 @@ public HiveDefaultRelMetadataProvider(HiveConf hiveConf) { public RelMetadataProvider getMetadataProvider() { + // Create cost metadata provider + final HiveCostModel cm; + if (HiveConf.getVar(this.hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") + && HiveConf.getBoolVar(this.hiveConf, HiveConf.ConfVars.EXTENDED_COST_MODEL)) { + final Double maxMemory = (double) HiveConf.getLongVar( + this.hiveConf, + HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + cm = new HiveOnTezCostModel(maxMemory); + } else { + cm = new HiveDefaultCostModel(); + } + // Get max split size for HiveRelMdParallelism final Double maxSplitSize = (double) HiveConf.getLongVar( - this.hiveConf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE); + this.hiveConf, + HiveConf.ConfVars.MAPREDMAXSPLITSIZE); // Return MD provider return ChainedRelMetadataProvider.of(ImmutableList - .of(HiveRelMdDistinctRowCount.SOURCE, + .of(new HiveRelMdCost(cm).getMetadataProvider(), + HiveRelMdDistinctRowCount.SOURCE, HiveRelMdSelectivity.SOURCE, HiveRelMdRowCount.SOURCE, HiveRelMdUniqueKeys.SOURCE, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java index 41604cd..0755943 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java @@ -123,8 +123,8 @@ public RelOptCost minus(RelOptCost other) { return this; } - return new HiveCost(this.rowCount - other.getRows(), this.cpu - other.getCpu(), this.io - - other.getIo()); + return new HiveCost(this.rowCount - other.getRows(), this.cpu - other.getCpu(), + this.io - other.getIo()); } public RelOptCost multiplyBy(double factor) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java new file mode 100644 index 0000000..ea9b20b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.cost; + +import java.util.EnumSet; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Pair; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.JoinAlgorithm; + +import com.google.common.collect.ImmutableList; + +/** + * Cost model interface. + */ +public abstract class HiveCostModel { + + private static final Log LOG = LogFactory.getLog(HiveCostModel.class); + + + public abstract RelOptCost getDefaultCost(); + + public abstract RelOptCost getAggregateCost(HiveAggregate aggregate); + + public RelOptCost getJoinCost(HiveJoin join) { + return getJoinAlgorithmCost(join, getExecutableJoinAlgorithms(join)); + } + + abstract EnumSet getExecutableJoinAlgorithms(HiveJoin join); + + protected RelOptCost getJoinAlgorithmCost(HiveJoin join, EnumSet possibleAlgorithms) { + JoinAlgorithm joinAlgorithm = JoinAlgorithm.NONE; + RelOptCost joinCost = null; + if (LOG.isDebugEnabled()) { + LOG.debug("Join algorithm selection for:\n" + RelOptUtil.toString(join)); + } + for (JoinAlgorithm possibleAlgorithm : possibleAlgorithms) { + switch (possibleAlgorithm) { + case NONE: + RelOptCost joinCardinalityCost = computeJoinCardinalityCost(join); + if (LOG.isDebugEnabled()) { + LOG.debug(JoinAlgorithm.NONE + " cost: " + joinCardinalityCost); + } + if (joinCost == null || joinCardinalityCost.isLt(joinCost) ) { + joinAlgorithm = JoinAlgorithm.NONE; + joinCost = joinCardinalityCost; + } + break; + case COMMON_JOIN: + RelOptCost commonJoinCost = computeCostCommonJoin(join); + if (LOG.isDebugEnabled()) { + LOG.debug(JoinAlgorithm.COMMON_JOIN + " cost: " + commonJoinCost); + } + if (joinCost == null || commonJoinCost.isLt(joinCost) ) { + joinAlgorithm = JoinAlgorithm.COMMON_JOIN; + joinCost = commonJoinCost; + } + break; + case MAP_JOIN: + RelOptCost mapJoinCost = computeCostMapJoin(join); + if (LOG.isDebugEnabled()) { + LOG.debug(JoinAlgorithm.MAP_JOIN + " cost: " + mapJoinCost); + } + if (joinCost == null || mapJoinCost.isLt(joinCost) ) { + joinAlgorithm = JoinAlgorithm.MAP_JOIN; + joinCost = mapJoinCost; + } + break; + case BUCKET_JOIN: + RelOptCost bucketJoinCost = computeCostBucketJoin(join); + if (LOG.isDebugEnabled()) { + LOG.debug(JoinAlgorithm.BUCKET_JOIN + " cost: " + bucketJoinCost); + } + if (joinCost == null || bucketJoinCost.isLt(joinCost) ) { + joinAlgorithm = JoinAlgorithm.BUCKET_JOIN; + joinCost = bucketJoinCost; + } + break; + case SMB_JOIN: + RelOptCost smbJoinCost = computeCostSMBJoin(join); + if (LOG.isDebugEnabled()) { + LOG.debug(JoinAlgorithm.SMB_JOIN + " cost: " + smbJoinCost); + } + if (joinCost == null || smbJoinCost.isLt(joinCost) ) { + joinAlgorithm = JoinAlgorithm.SMB_JOIN; + joinCost = smbJoinCost; + } + break; + } + } + join.setJoinAlgorithm(joinAlgorithm); + join.setJoinCost(joinCost); + if (LOG.isDebugEnabled()) { + LOG.debug(joinAlgorithm + " selected"); + } + return joinCost; + } + + private static RelOptCost computeJoinCardinalityCost(HiveJoin join) { + double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); + double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); + return HiveCost.FACTORY.makeCost(leftRCount + rightRCount, 0.0, 0.0); + } + + private static RelOptCost computeCostCommonJoin(HiveJoin join) { + // 1. Sum of input cardinalities + final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); + final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); + if (leftRCount == null || rightRCount == null) { + return null; + } + final double rCount = leftRCount + rightRCount; + // 2. CPU cost = sorting cost (for each relation) + + // total merge cost + ImmutableList cardinalities = new ImmutableList.Builder(). + add(leftRCount). + add(rightRCount). + build(); + final double cpuCost = HiveCostUtil.computeSortMergeCPUCost(cardinalities, join.getSortedInputs()); + // 3. IO cost = cost of writing intermediary results to local FS + + // cost of reading from local FS for transferring to join + + // cost of transferring map outputs to Join operator + final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); + final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); + if (leftRAverageSize == null || rightRAverageSize == null) { + return null; + } + ImmutableList> relationInfos = new ImmutableList.Builder>(). + add(new Pair(leftRCount,leftRAverageSize)). + add(new Pair(rightRCount,rightRAverageSize)). + build(); + final double ioCost = HiveCostUtil.computeSortMergeIOCost(relationInfos); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + } + + private static RelOptCost computeCostMapJoin(HiveJoin join) { + // 1. Sum of input cardinalities + final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); + final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); + if (leftRCount == null || rightRCount == null) { + return null; + } + final double rCount = leftRCount + rightRCount; + // 2. CPU cost = HashTable construction cost + + // join cost + ImmutableList cardinalities = new ImmutableList.Builder(). + add(leftRCount). + add(rightRCount). + build(); + ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); + switch (join.getMapJoinStreamingSide()) { + case LEFT_RELATION: + streamingBuilder.set(0); + break; + case RIGHT_RELATION: + streamingBuilder.set(1); + break; + default: + return null; + } + ImmutableBitSet streaming = streamingBuilder.build(); + final double cpuCost = HiveCostUtil.computeMapJoinCPUCost(cardinalities, streaming); + // 3. IO cost = cost of transferring small tables to join node * + // degree of parallelism + final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); + final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); + if (leftRAverageSize == null || rightRAverageSize == null) { + return null; + } + ImmutableList> relationInfos = new ImmutableList.Builder>(). + add(new Pair(leftRCount,leftRAverageSize)). + add(new Pair(rightRCount,rightRAverageSize)). + build(); + final int parallelism = RelMetadataQuery.splitCount(join) == null + ? 1 : RelMetadataQuery.splitCount(join); + final double ioCost = HiveCostUtil.computeMapJoinIOCost(relationInfos, streaming, parallelism); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + } + + private static RelOptCost computeCostBucketJoin(HiveJoin join) { + // 1. Sum of input cardinalities + final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); + final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); + if (leftRCount == null || rightRCount == null) { + return null; + } + final double rCount = leftRCount + rightRCount; + // 2. CPU cost = HashTable construction cost + + // join cost + ImmutableList cardinalities = new ImmutableList.Builder(). + add(leftRCount). + add(rightRCount). + build(); + ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); + switch (join.getMapJoinStreamingSide()) { + case LEFT_RELATION: + streamingBuilder.set(0); + break; + case RIGHT_RELATION: + streamingBuilder.set(1); + break; + default: + return null; + } + ImmutableBitSet streaming = streamingBuilder.build(); + final double cpuCost = HiveCostUtil.computeBucketMapJoinCPUCost(cardinalities, streaming); + // 3. IO cost = cost of transferring small tables to join node * + // degree of parallelism + final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); + final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); + if (leftRAverageSize == null || rightRAverageSize == null) { + return null; + } + ImmutableList> relationInfos = new ImmutableList.Builder>(). + add(new Pair(leftRCount,leftRAverageSize)). + add(new Pair(rightRCount,rightRAverageSize)). + build(); + final int parallelism = RelMetadataQuery.splitCount(join) == null + ? 1 : RelMetadataQuery.splitCount(join); + final double ioCost = HiveCostUtil.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + } + + private static RelOptCost computeCostSMBJoin(HiveJoin join) { + // 1. Sum of input cardinalities + final Double leftRCount = RelMetadataQuery.getRowCount(join.getLeft()); + final Double rightRCount = RelMetadataQuery.getRowCount(join.getRight()); + if (leftRCount == null || rightRCount == null) { + return null; + } + final double rCount = leftRCount + rightRCount; + // 2. CPU cost = HashTable construction cost + + // join cost + ImmutableList cardinalities = new ImmutableList.Builder(). + add(leftRCount). + add(rightRCount). + build(); + ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); + switch (join.getMapJoinStreamingSide()) { + case LEFT_RELATION: + streamingBuilder.set(0); + break; + case RIGHT_RELATION: + streamingBuilder.set(1); + break; + default: + return null; + } + ImmutableBitSet streaming = streamingBuilder.build(); + final double cpuCost = HiveCostUtil.computeSMBMapJoinCPUCost(cardinalities); + // 3. IO cost = cost of transferring small tables to join node * + // degree of parallelism + final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); + final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); + if (leftRAverageSize == null || rightRAverageSize == null) { + return null; + } + ImmutableList> relationInfos = new ImmutableList.Builder>(). + add(new Pair(leftRCount,leftRAverageSize)). + add(new Pair(rightRCount,rightRAverageSize)). + build(); + final int parallelism = RelMetadataQuery.splitCount(join) == null + ? 1 : RelMetadataQuery.splitCount(join); + final double ioCost = HiveCostUtil.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java new file mode 100644 index 0000000..df622dd --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.cost; + +import java.util.EnumSet; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.JoinAlgorithm; + +/** + * Default implementation of the cost model. + * Currently used by MR and Spark execution engines. + */ +public class HiveDefaultCostModel extends HiveCostModel { + + @Override + public RelOptCost getDefaultCost() { + return HiveCost.FACTORY.makeZeroCost(); + } + + @Override + public RelOptCost getAggregateCost(HiveAggregate aggregate) { + return HiveCost.FACTORY.makeZeroCost(); + } + + @Override + protected EnumSet getExecutableJoinAlgorithms(HiveJoin join) { + return EnumSet.of(JoinAlgorithm.NONE); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java new file mode 100644 index 0000000..4001d85 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.cost; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelDistribution.Type; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Pair; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.JoinAlgorithm; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; + +/** + * Cost model for Tez execution engine. + */ +public class HiveOnTezCostModel extends HiveCostModel { + + private final Double maxMemory; + + + public HiveOnTezCostModel(Double maxMemory) { + this.maxMemory = maxMemory; + } + + @Override + public RelOptCost getDefaultCost() { + return HiveCost.FACTORY.makeZeroCost(); + } + + @Override + public RelOptCost getAggregateCost(HiveAggregate aggregate) { + if (aggregate.isBucketedInput()) { + return HiveCost.FACTORY.makeZeroCost(); + } else { + // 1. Sum of input cardinalities + final Double rCount = RelMetadataQuery.getRowCount(aggregate.getInput()); + if (rCount == null) { + return null; + } + // 2. CPU cost = sorting cost + final double cpuCost = HiveCostUtil.computeSortCPUCost(rCount); + // 3. IO cost = cost of writing intermediary results to local FS + + // cost of reading from local FS for transferring to GBy + + // cost of transferring map outputs to GBy operator + final Double rAverageSize = RelMetadataQuery.getAverageRowSize(aggregate.getInput()); + if (rAverageSize == null) { + return null; + } + final double ioCost = HiveCostUtil.computeSortIOCost(new Pair(rCount,rAverageSize)); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + } + } + + @Override + protected EnumSet getExecutableJoinAlgorithms(HiveJoin join) { + Set possibleAlgorithms = new HashSet(); + + // Check streaming side + RelNode smallInput; + if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { + smallInput = join.getRight(); + } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { + smallInput = join.getLeft(); + } else { + smallInput = null; + } + + if (smallInput != null) { + // Requirements: + // - For SMB, sorted by their keys on both sides and bucketed. + // - For Bucket, bucketed by their keys on both sides. / Fitting in memory + // - For Map, no additional requirement. / Fitting in memory + + // Get key columns + JoinPredicateInfo joinPredInfo = HiveCalciteUtil.JoinPredicateInfo. + constructJoinPredicateInfo(join); + List joinKeysInChildren = new ArrayList(); + joinKeysInChildren.add( + ImmutableIntList.copyOf( + joinPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema())); + joinKeysInChildren.add( + ImmutableIntList.copyOf( + joinPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema())); + + // Obtain number of buckets + Integer buckets = RelMetadataQuery.splitCount(smallInput); + // Obtain map algorithms for which smallest input fits in memory + boolean bucketFitsMemory = false; + boolean inputFitsMemory = false; + if (buckets != null) { + bucketFitsMemory = isFittingIntoMemory(maxMemory, smallInput, buckets); + } + inputFitsMemory = bucketFitsMemory ? + isFittingIntoMemory(maxMemory, smallInput, 1) : false; + boolean orderedBucketed = true; + boolean bucketed = true; + for (int i=0; i maxSize) { + return false; + } + return true; + } + return false; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java new file mode 100644 index 0000000..fa2ec58 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.cost; + +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider; +import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows; +import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; + +import com.google.common.collect.ImmutableList; + +/** + * HiveRelMdCost supplies the implementation of cost model. + */ +public class HiveRelMdCost { + + private final HiveCostModel hiveCostModel; + + public HiveRelMdCost(HiveCostModel hiveCostModel) { + this.hiveCostModel = hiveCostModel; + } + + public RelMetadataProvider getMetadataProvider() { + return ChainedRelMetadataProvider.of( + ImmutableList.of( + ReflectiveRelMetadataProvider.reflectiveSource(this, + BuiltInMethod.NON_CUMULATIVE_COST.method), + RelMdPercentageOriginalRows.SOURCE)); + } + + public RelOptCost getNonCumulativeCost(HiveAggregate aggregate) { + return hiveCostModel.getAggregateCost(aggregate); + } + + public RelOptCost getNonCumulativeCost(HiveJoin join) { + return hiveCostModel.getJoinCost(join); + } + + // Default case + public RelOptCost getNonCumulativeCost(RelNode rel) { + return hiveCostModel.getDefaultCost(); + } + +} + +// End HiveRelMdCost.java diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java index 9a8a5da..8e7ab3c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java @@ -30,10 +30,7 @@ import org.apache.calcite.rel.core.RelFactories.AggregateFactory; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.Pair; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost; -import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostUtil; import com.google.common.collect.ImmutableList; @@ -49,7 +46,7 @@ public HiveAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, List aggCalls) throws InvalidRelException { super(cluster, TraitsUtil.getDefaultTraitSet(cluster), child, indicator, groupSet, groupSets, aggCalls); - this.bucketedInput = false; + this.bucketedInput = checkInputCorrectBucketing(child, groupSet); } @Override @@ -72,34 +69,12 @@ public void implement(Implementor implementor) { @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - // Check whether input is in correct order - checkInputCorrectBucketing(); - if (this.bucketedInput) { - return HiveCost.FACTORY.makeZeroCost(); - } else { - // 1. Sum of input cardinalities - final Double rCount = RelMetadataQuery.getRowCount(this.getInput()); - if (rCount == null) { - return null; - } - // 2. CPU cost = sorting cost - final double cpuCost = HiveCostUtil.computeSortCPUCost(rCount); - // 3. IO cost = cost of writing intermediary results to local FS + - // cost of reading from local FS for transferring to GBy + - // cost of transferring map outputs to GBy operator - final Double rAverageSize = RelMetadataQuery.getAverageRowSize(this.getInput()); - if (rAverageSize == null) { - return null; - } - final double ioCost = HiveCostUtil.computeSortIOCost(new Pair(rCount,rAverageSize)); - // 4. Result - return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); - } + return RelMetadataQuery.getNonCumulativeCost(this); } - private void checkInputCorrectBucketing() { - this.bucketedInput = RelMetadataQuery.distribution(this.getInput()).getKeys(). - containsAll(this.getGroupSet().asList()); + private static boolean checkInputCorrectBucketing(RelNode child, ImmutableBitSet groupSet) { + return RelMetadataQuery.distribution(child).getKeys(). + containsAll(groupSet.asList()); } @Override @@ -108,6 +83,10 @@ public double getRows() { .makeLiteral(true)); } + public boolean isBucketedInput() { + return this.bucketedInput; + } + private static class HiveAggRelFactory implements AggregateFactory { @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFilter.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFilter.java index 3e45a3f..de61e48 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFilter.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveFilter.java @@ -24,9 +24,9 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.RelFactories.FilterFactory; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost; public class HiveFilter extends Filter implements HiveRelNode { @@ -48,7 +48,7 @@ public void implement(Implementor implementor) { @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - return HiveCost.FACTORY.makeZeroCost(); + return RelMetadataQuery.getNonCumulativeCost(this); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java index f411d90..b78cb01 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java @@ -19,7 +19,6 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -30,8 +29,6 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelCollations; -import org.apache.calcite.rel.RelDistribution; -import org.apache.calcite.rel.RelDistribution.Type; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.Join; @@ -43,21 +40,12 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; -import org.apache.calcite.util.Pair; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost; -import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostUtil; - -import com.google.common.collect.ImmutableList; //TODO: Should we convert MultiJoin to be a child of HiveJoin public class HiveJoin extends Join implements HiveRelNode { - private static final Log LOG = LogFactory.getLog(HiveJoin.class); // NOTE: COMMON_JOIN & SMB_JOIN are Sort Merge Join (in case of COMMON_JOIN // each parallel computation handles multiple splits where as in case of SMB @@ -74,7 +62,6 @@ public static final JoinFactory HIVE_JOIN_FACTORY = new HiveJoinFactoryImpl(); - private final Double maxMemory; private final boolean leftSemiJoin; private JoinAlgorithm joinAlgorithm; private MapJoinStreamingRelation mapJoinStreamingSide; @@ -86,8 +73,10 @@ public static HiveJoin getJoin(RelOptCluster cluster, RelNode left, RelNode righ RexNode condition, JoinRelType joinType, boolean leftSemiJoin) { try { Set variablesStopped = Collections.emptySet(); - return new HiveJoin(cluster, null, left, right, condition, joinType, variablesStopped, - JoinAlgorithm.NONE, MapJoinStreamingRelation.NONE, ImmutableBitSet.of(), leftSemiJoin); + HiveJoin join = new HiveJoin(cluster, null, left, right, condition, joinType, variablesStopped, + JoinAlgorithm.NONE, chooseStreamingSide(left,right), null, leftSemiJoin); + join.sortedInputs = checkInputsCorrectOrder(join); + return join; } catch (InvalidRelException e) { throw new RuntimeException(e); } @@ -103,9 +92,6 @@ protected HiveJoin(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelN this.mapJoinStreamingSide = streamingSideForMapJoin; this.sortedInputs = sortedInputs; this.leftSemiJoin = leftSemiJoin; - this.maxMemory = (double) HiveConf.getLongVar( - cluster.getPlanner().getContext().unwrap(HiveConf.class), - HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); } @Override @@ -130,10 +116,22 @@ public JoinAlgorithm getJoinAlgorithm() { return joinAlgorithm; } + public void setJoinAlgorithm(JoinAlgorithm joinAlgorithm) { + this.joinAlgorithm = joinAlgorithm; + } + public MapJoinStreamingRelation getMapJoinStreamingSide() { return mapJoinStreamingSide; } + public void setJoinCost(RelOptCost joinCost) { + this.joinCost = joinCost; + } + + public ImmutableBitSet getSortedInputs() { + return sortedInputs; + } + public boolean isLeftSemiJoin() { return leftSemiJoin; } @@ -143,93 +141,31 @@ public boolean isLeftSemiJoin() { */ @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - this.joinCost = chooseJoinAlgorithmAndGetCost(); - return this.joinCost; - } - - private RelOptCost chooseJoinAlgorithmAndGetCost() { - // 1. Choose streaming side - chooseStreamingSide(); - // 2. Store order inputs - checkInputsCorrectOrder(); - // 3. Get possible algorithms - Set possibleAlgorithms = obtainJoinAlgorithms(); - // 4. For each possible algorithm, calculate cost, and select best - RelOptCost selfCost = null; - if (LOG.isDebugEnabled()) { - LOG.debug("Join algorithm selection for:\n" + RelOptUtil.toString(this)); - } - for (JoinAlgorithm possibleAlgorithm : possibleAlgorithms) { - switch (possibleAlgorithm) { - case COMMON_JOIN: - RelOptCost commonJoinCost = computeSelfCostCommonJoin(); - if (LOG.isDebugEnabled()) { - LOG.debug(JoinAlgorithm.COMMON_JOIN + " cost: " + commonJoinCost); - } - if (selfCost == null || commonJoinCost.isLt(selfCost) ) { - this.joinAlgorithm = JoinAlgorithm.COMMON_JOIN; - selfCost = commonJoinCost; - } - break; - case MAP_JOIN: - RelOptCost mapJoinCost = computeSelfCostMapJoin(); - if (LOG.isDebugEnabled()) { - LOG.debug(JoinAlgorithm.MAP_JOIN + " cost: " + mapJoinCost); - } - if (selfCost == null || mapJoinCost.isLt(selfCost) ) { - this.joinAlgorithm = JoinAlgorithm.MAP_JOIN; - selfCost = mapJoinCost; - } - break; - case BUCKET_JOIN: - RelOptCost bucketJoinCost = computeSelfCostBucketJoin(); - if (LOG.isDebugEnabled()) { - LOG.debug(JoinAlgorithm.BUCKET_JOIN + " cost: " + bucketJoinCost); - } - if (selfCost == null || bucketJoinCost.isLt(selfCost) ) { - this.joinAlgorithm = JoinAlgorithm.BUCKET_JOIN; - selfCost = bucketJoinCost; - } - break; - case SMB_JOIN: - RelOptCost smbJoinCost = computeSelfCostSMBJoin(); - if (LOG.isDebugEnabled()) { - LOG.debug(JoinAlgorithm.SMB_JOIN + " cost: " + smbJoinCost); - } - if (selfCost == null || smbJoinCost.isLt(selfCost) ) { - this.joinAlgorithm = JoinAlgorithm.SMB_JOIN; - selfCost = smbJoinCost; - } - break; - default: - //TODO: Exception - } - } - if (LOG.isDebugEnabled()) { - LOG.debug(this.joinAlgorithm + " selected"); - } - return selfCost; + return RelMetadataQuery.getNonCumulativeCost(this); } - private void chooseStreamingSide() { - Double leftInputSize = RelMetadataQuery.memory(this.getLeft()); - Double rightInputSize = RelMetadataQuery.memory(this.getRight()); + private static MapJoinStreamingRelation chooseStreamingSide(RelNode left, + RelNode right) { + Double leftInputSize = RelMetadataQuery.memory(left); + Double rightInputSize = RelMetadataQuery.memory(right); if (leftInputSize == null && rightInputSize == null) { - this.mapJoinStreamingSide = MapJoinStreamingRelation.NONE; + return MapJoinStreamingRelation.NONE; } else if (leftInputSize != null && (rightInputSize == null || (leftInputSize < rightInputSize))) { - this.mapJoinStreamingSide = MapJoinStreamingRelation.RIGHT_RELATION; + return MapJoinStreamingRelation.RIGHT_RELATION; } else if (rightInputSize != null && (leftInputSize == null || (rightInputSize <= leftInputSize))) { - this.mapJoinStreamingSide = MapJoinStreamingRelation.LEFT_RELATION; + return MapJoinStreamingRelation.LEFT_RELATION; } + return MapJoinStreamingRelation.NONE; } - private void checkInputsCorrectOrder() { + private static ImmutableBitSet checkInputsCorrectOrder(HiveJoin join) { + ImmutableBitSet.Builder sortedInputs = new ImmutableBitSet.Builder(); JoinPredicateInfo joinPredInfo = HiveCalciteUtil.JoinPredicateInfo. - constructJoinPredicateInfo(this); + constructJoinPredicateInfo(join); List joinKeysInChildren = new ArrayList(); joinKeysInChildren.add( ImmutableIntList.copyOf( @@ -238,274 +174,15 @@ private void checkInputsCorrectOrder() { ImmutableIntList.copyOf( joinPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema())); - for (int i=0; i obtainJoinAlgorithms() { - Set possibleAlgorithms = new HashSet(); - - // Check streaming side - RelNode smallInput; - if (this.mapJoinStreamingSide == MapJoinStreamingRelation.LEFT_RELATION) { - smallInput = this.getRight(); - } else if (this.mapJoinStreamingSide == MapJoinStreamingRelation.RIGHT_RELATION) { - smallInput = this.getLeft(); - } else { - smallInput = null; - } - - if (smallInput != null) { - // Requirements: - // - For SMB, sorted by their keys on both sides and bucketed. - // - For Bucket, bucketed by their keys on both sides. / Fitting in memory - // - For Map, no additional requirement. / Fitting in memory - - // Get key columns - JoinPredicateInfo joinPredInfo = HiveCalciteUtil.JoinPredicateInfo. - constructJoinPredicateInfo(this); - List joinKeysInChildren = new ArrayList(); - joinKeysInChildren.add( - ImmutableIntList.copyOf( - joinPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema())); - joinKeysInChildren.add( - ImmutableIntList.copyOf( - joinPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema())); - - // Obtain number of buckets - Integer buckets = RelMetadataQuery.splitCount(smallInput); - // Obtain map algorithms for which smallest input fits in memory - boolean bucketFitsMemory = false; - boolean inputFitsMemory = false; - if (buckets != null) { - bucketFitsMemory = isFittingIntoMemory(this.maxMemory, smallInput, buckets); - } - inputFitsMemory = bucketFitsMemory ? - isFittingIntoMemory(this.maxMemory, smallInput, 1) : false; - boolean orderedBucketed = true; - boolean bucketed = true; - for (int i=0; i maxSize) { - return false; - } - return true; - } - return false; - } - - private RelOptCost computeSelfCostCommonJoin() { - // 1. Sum of input cardinalities - final Double leftRCount = RelMetadataQuery.getRowCount(getLeft()); - final Double rightRCount = RelMetadataQuery.getRowCount(getRight()); - if (leftRCount == null || rightRCount == null) { - return null; - } - final double rCount = leftRCount + rightRCount; - // 2. CPU cost = sorting cost (for each relation) + - // total merge cost - ImmutableList cardinalities = new ImmutableList.Builder(). - add(leftRCount). - add(rightRCount). - build(); - final double cpuCost = HiveCostUtil.computeSortMergeCPUCost(cardinalities, sortedInputs); - // 3. IO cost = cost of writing intermediary results to local FS + - // cost of reading from local FS for transferring to join + - // cost of transferring map outputs to Join operator - final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(getLeft()); - final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(getRight()); - if (leftRAverageSize == null || rightRAverageSize == null) { - return null; - } - ImmutableList> relationInfos = new ImmutableList.Builder>(). - add(new Pair(leftRCount,leftRAverageSize)). - add(new Pair(rightRCount,rightRAverageSize)). - build(); - final double ioCost = HiveCostUtil.computeSortMergeIOCost(relationInfos); - // 4. Result - return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); - } - - private RelOptCost computeSelfCostMapJoin() { - // 1. Sum of input cardinalities - final Double leftRCount = RelMetadataQuery.getRowCount(getLeft()); - final Double rightRCount = RelMetadataQuery.getRowCount(getRight()); - if (leftRCount == null || rightRCount == null) { - return null; - } - final double rCount = leftRCount + rightRCount; - // 2. CPU cost = HashTable construction cost + - // join cost - ImmutableList cardinalities = new ImmutableList.Builder(). - add(leftRCount). - add(rightRCount). - build(); - ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); - switch (mapJoinStreamingSide) { - case LEFT_RELATION: - streamingBuilder.set(0); - break; - case RIGHT_RELATION: - streamingBuilder.set(1); - break; - default: - return null; - } - ImmutableBitSet streaming = streamingBuilder.build(); - final double cpuCost = HiveCostUtil.computeMapJoinCPUCost(cardinalities, streaming); - // 3. IO cost = cost of transferring small tables to join node * - // degree of parallelism - final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(getLeft()); - final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(getRight()); - if (leftRAverageSize == null || rightRAverageSize == null) { - return null; - } - ImmutableList> relationInfos = new ImmutableList.Builder>(). - add(new Pair(leftRCount,leftRAverageSize)). - add(new Pair(rightRCount,rightRAverageSize)). - build(); - final int parallelism = RelMetadataQuery.splitCount(this) == null - ? 1 : RelMetadataQuery.splitCount(this); - final double ioCost = HiveCostUtil.computeMapJoinIOCost(relationInfos, streaming, parallelism); - // 4. Result - return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); - } - - private RelOptCost computeSelfCostBucketJoin() { - // 1. Sum of input cardinalities - final Double leftRCount = RelMetadataQuery.getRowCount(getLeft()); - final Double rightRCount = RelMetadataQuery.getRowCount(getRight()); - if (leftRCount == null || rightRCount == null) { - return null; - } - final double rCount = leftRCount + rightRCount; - // 2. CPU cost = HashTable construction cost + - // join cost - ImmutableList cardinalities = new ImmutableList.Builder(). - add(leftRCount). - add(rightRCount). - build(); - ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); - switch (mapJoinStreamingSide) { - case LEFT_RELATION: - streamingBuilder.set(0); - break; - case RIGHT_RELATION: - streamingBuilder.set(1); - break; - default: - return null; - } - ImmutableBitSet streaming = streamingBuilder.build(); - final double cpuCost = HiveCostUtil.computeBucketMapJoinCPUCost(cardinalities, streaming); - // 3. IO cost = cost of transferring small tables to join node * - // degree of parallelism - final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(getLeft()); - final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(getRight()); - if (leftRAverageSize == null || rightRAverageSize == null) { - return null; - } - ImmutableList> relationInfos = new ImmutableList.Builder>(). - add(new Pair(leftRCount,leftRAverageSize)). - add(new Pair(rightRCount,rightRAverageSize)). - build(); - final int parallelism = RelMetadataQuery.splitCount(this) == null - ? 1 : RelMetadataQuery.splitCount(this); - final double ioCost = HiveCostUtil.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism); - // 4. Result - return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); - } - - private RelOptCost computeSelfCostSMBJoin() { - // 1. Sum of input cardinalities - final Double leftRCount = RelMetadataQuery.getRowCount(getLeft()); - final Double rightRCount = RelMetadataQuery.getRowCount(getRight()); - if (leftRCount == null || rightRCount == null) { - return null; - } - final double rCount = leftRCount + rightRCount; - // 2. CPU cost = HashTable construction cost + - // join cost - ImmutableList cardinalities = new ImmutableList.Builder(). - add(leftRCount). - add(rightRCount). - build(); - ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); - switch (mapJoinStreamingSide) { - case LEFT_RELATION: - streamingBuilder.set(0); - break; - case RIGHT_RELATION: - streamingBuilder.set(1); - break; - default: - return null; - } - ImmutableBitSet streaming = streamingBuilder.build(); - final double cpuCost = HiveCostUtil.computeSMBMapJoinCPUCost(cardinalities); - // 3. IO cost = cost of transferring small tables to join node * - // degree of parallelism - final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(getLeft()); - final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(getRight()); - if (leftRAverageSize == null || rightRAverageSize == null) { - return null; - } - ImmutableList> relationInfos = new ImmutableList.Builder>(). - add(new Pair(leftRCount,leftRAverageSize)). - add(new Pair(rightRCount,rightRAverageSize)). - build(); - final int parallelism = RelMetadataQuery.splitCount(this) == null - ? 1 : RelMetadataQuery.splitCount(this); - final double ioCost = HiveCostUtil.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism); - // 4. Result - return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + return sortedInputs.build(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveLimit.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveLimit.java index 5fc64f3..74991d6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveLimit.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveLimit.java @@ -25,9 +25,9 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost; public class HiveLimit extends SingleRel implements HiveRelNode { private final RexNode offset; @@ -52,6 +52,6 @@ public void implement(Implementor implementor) { @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - return HiveCost.FACTORY.makeZeroCost(); + return RelMetadataQuery.getNonCumulativeCost(this); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveProject.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveProject.java index 6c215c9..1c0e5e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveProject.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveProject.java @@ -29,6 +29,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.RelFactories.ProjectFactory; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; @@ -40,7 +41,6 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost; import com.google.common.collect.ImmutableList; @@ -171,7 +171,7 @@ public Project copy(RelTraitSet traitSet, RelNode input, List exps, @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - return HiveCost.FACTORY.makeZeroCost(); + return RelMetadataQuery.getNonCumulativeCost(this); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java index fcf09a5..b51672d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java @@ -29,13 +29,12 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.util.ImmutableBitSet; -import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; -import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost; import org.apache.hadoop.hive.ql.plan.ColStatistics; import com.google.common.collect.ImmutableList; @@ -104,7 +103,7 @@ public RelDataType deriveRowType() { @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - return HiveCost.FACTORY.makeZeroCost(); + return RelMetadataQuery.getNonCumulativeCost(this); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java index 4984683..addb3a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql.optimizer.calcite.stats; import org.apache.calcite.rel.RelCollation; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java index f846dd1..a195397 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdDistribution.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql.optimizer.calcite.stats; import org.apache.calcite.rel.RelDistribution; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdRowCount.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdRowCount.java index dabbe28..6948fb1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdRowCount.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdRowCount.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hive.ql.optimizer.calcite.stats; import java.util.ArrayList; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdUniqueKeys.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdUniqueKeys.java index 95515b2..7c22c33 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdUniqueKeys.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdUniqueKeys.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hive.ql.optimizer.calcite.stats; import java.util.BitSet;