diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java index 6840418..f839acf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import com.google.common.collect.ImmutableList; @@ -66,7 +65,15 @@ public static RelOptCost computeCardinalityBasedCost(HiveRelNode hr) { } public HiveCost computeScanCost(double cardinality, double avgTupleSize) { - return new HiveCost(cardinality, 0, hdfsRead * cardinality * avgTupleSize); + return new HiveCost(cardinality, 0, computeHDFSReadIOCost(cardinality, avgTupleSize)); + } + + public double computeHDFSReadIOCost(double cardinality, double avgTupleSize) { + return hdfsRead * cardinality * avgTupleSize; + } + + public double computeHDFSWriteIOCost(double cardinality, double avgTupleSize) { + return hdfsWrite * cardinality * avgTupleSize; } public double computeSortMergeCPUCost( @@ -114,6 +121,23 @@ public double computeSortIOCost(Pair relationInfo) { return ioCost; } + public double computeSortLimitIOCost(double inputCardinality, double limit, + double averageTupleSize) { + double ioCost = 0.0; + // Write cost + ioCost += inputCardinality * averageTupleSize * localFSWrite; + // Read cost + ioCost += limit * averageTupleSize * localFSRead; + // Net transfer cost + ioCost += inputCardinality * averageTupleSize * netCost; + return ioCost; + } + + public double computeLimitIOCost(double limit, double averageTupleSize) { + // Read cost + return limit * averageTupleSize * localFSRead; + } + public static double computeMapJoinCPUCost( ImmutableList cardinalities, ImmutableBitSet streaming) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java index 4e3b654..6b8c4c8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import com.google.common.collect.ImmutableList; @@ -49,6 +50,8 @@ public HiveCostModel(Set joinAlgorithms) { public abstract RelOptCost getAggregateCost(HiveAggregate aggregate); + public abstract RelOptCost getSortLimitCost(HiveSort sortLimit); + public abstract RelOptCost getScanCost(HiveTableScan ts); public RelOptCost getJoinCost(HiveJoin join) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java index 6669d32..187e00d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java @@ -23,6 +23,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import com.google.common.collect.ImmutableList; @@ -63,6 +64,11 @@ public RelOptCost getAggregateCost(HiveAggregate aggregate) { return HiveCost.FACTORY.makeZeroCost(); } + @Override + public RelOptCost getSortLimitCost(HiveSort sortLimit) { + return HiveCost.FACTORY.makeZeroCost(); + } + /** * Default join algorithm. Cost is based on cardinality. */ diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java index e9f1d96..9d3cd74 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java @@ -22,10 +22,12 @@ import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelDistribution.Type; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; import org.apache.calcite.util.Pair; @@ -36,8 +38,9 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; -import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; @@ -106,6 +109,73 @@ public RelOptCost getAggregateCost(HiveAggregate aggregate) { } } + @Override + public RelOptCost getSortLimitCost(HiveSort sortLimit) { + // 1. Cardinality: for the output, choose the smaller between the row count + // of the child or the limit specified in the operator (if present) + final Double inputRCount = RelMetadataQuery.getRowCount(sortLimit.getInput()); + if (inputRCount == null) { + return null; + } + final double outputRCount; + if (sortLimit.fetch != null) { + int limit = RexLiteral.intValue(sortLimit.fetch); + if (limit < inputRCount) { + outputRCount = Double.valueOf(limit); + } else { + outputRCount = inputRCount; + } + } else { + outputRCount = inputRCount; + } + // 2. CPU cost = sorting cost (if necessary) + final boolean sorted = RelCollations.contains( + RelMetadataQuery.collations(sortLimit.getInput()), + ImmutableIntList.copyOf(RelCollations.ordinals(sortLimit.collation))); + double cpuCost; + if (!sorted) { + cpuCost = algoUtils.computeSortCPUCost(inputRCount); + } else { + cpuCost = 0.0; + } + // 3. IO cost = if sorting is necessary: + // cost of transferring outputs to Sort operator + + // cost of writing intermediary results to local FS + + // cost of reading from local FS + + // cost of writing result to HDFS + // otherwise: + // cost of reading from local FS + + // cost of writing result to HDFS + // in both cases: + // if it is a new phase, add costs of reading from HDFS + final Double rAverageSize = RelMetadataQuery.getAverageRowSize(sortLimit.getInput()); + if (rAverageSize == null) { + return null; + } + // If it is a new phase, we need to add the HDFS read cost + final Boolean inputTransition = RelMetadataQuery.isPhaseTransition(sortLimit.getInput()); + if (inputTransition == null) { + return null; + } + final double hdfsReadCost; + if (inputTransition) { + hdfsReadCost = algoUtils.computeHDFSReadIOCost(inputRCount, rAverageSize); + } else { + hdfsReadCost = 0.0; + } + final double sortLimitIOCost; + if (!sorted) { + sortLimitIOCost = algoUtils.computeSortLimitIOCost(inputRCount, outputRCount, rAverageSize); + } else { + sortLimitIOCost = algoUtils.computeLimitIOCost(outputRCount, rAverageSize); + } + final double hdfsWriteCost = algoUtils.computeHDFSWriteIOCost(outputRCount, rAverageSize); + final double ioCost = hdfsReadCost + sortLimitIOCost + hdfsWriteCost; + + // 4. Result + return HiveCost.FACTORY.makeCost(outputRCount, cpuCost, ioCost); + } + /** * COMMON_JOIN is Sort Merge Join. Each parallel computation handles multiple * splits. diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java index 90f6a7e..407080e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java @@ -26,6 +26,7 @@ import org.apache.calcite.util.BuiltInMethod; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import com.google.common.collect.ImmutableList; @@ -57,6 +58,10 @@ public RelOptCost getNonCumulativeCost(HiveJoin join) { return hiveCostModel.getJoinCost(join); } + public RelOptCost getNonCumulativeCost(HiveSort sortLimit) { + return hiveCostModel.getSortLimitCost(sortLimit); + } + public RelOptCost getNonCumulativeCost(HiveTableScan ts) { return hiveCostModel.getScanCost(ts); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSort.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSort.java index 1df6542..0c050a5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSort.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSort.java @@ -20,12 +20,15 @@ import java.util.Map; import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; @@ -90,6 +93,11 @@ public void setInputRefToCallMap(ImmutableMap refToCall) { } @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return RelMetadataQuery.getNonCumulativeCost(this); + } + + @Override public void implement(Implementor implementor) { }