diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java index 8c9ff76..37fad0a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java @@ -86,27 +86,50 @@ public RelOptCost getScanCost(HiveTableScan ts) { @Override public RelOptCost getAggregateCost(HiveAggregate aggregate) { + // 1. Input cardinality + final Double inputRCount = RelMetadataQuery.getRowCount(aggregate.getInput()); + final Double outputRCount = RelMetadataQuery.getRowCount(aggregate); + if (inputRCount == null || outputRCount == null) { + return null; + } + // 2. CPU cost = sorting cost + final double cpuCost; if (aggregate.isBucketedInput()) { - return HiveCost.FACTORY.makeZeroCost(); + cpuCost = 0.0; } else { - // 1. Sum of input cardinalities - final Double rCount = RelMetadataQuery.getRowCount(aggregate.getInput()); - if (rCount == null) { - return null; - } - // 2. CPU cost = sorting cost - final double cpuCost = algoUtils.computeSortCPUCost(rCount); - // 3. IO cost = cost of writing intermediary results to local FS + - // cost of reading from local FS for transferring to GBy + - // cost of transferring map outputs to GBy operator - final Double rAverageSize = RelMetadataQuery.getAverageRowSize(aggregate.getInput()); - if (rAverageSize == null) { - return null; - } - final double ioCost = algoUtils.computeSortIOCost(new Pair(rCount,rAverageSize)); - // 4. Result - return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + cpuCost = algoUtils.computeSortCPUCost(inputRCount); + } + // 3. IO cost = cost of writing intermediary results to local FS + + // cost of reading from local FS for transferring to GBy + + // cost of transferring map outputs to GBy operator + + // cost of writing result to HDFS + // if it is a new phase, add costs of reading from HDFS + final Double inputRAverageSize = RelMetadataQuery.getAverageRowSize(aggregate.getInput()); + final Double outputRAverageSize = RelMetadataQuery.getAverageRowSize(aggregate); + if (inputRAverageSize == null || outputRAverageSize == null) { + return null; } + // If it is a new phase, we need to add the HDFS read cost + final Boolean inputTransition = RelMetadataQuery.isPhaseTransition(aggregate.getInput()); + if (inputTransition == null) { + return null; + } + final double hdfsReadCost; + if (inputTransition) { + hdfsReadCost = algoUtils.computeHDFSReadIOCost(inputRCount, inputRAverageSize); + } else { + hdfsReadCost = 0.0; + } + final double sortIOCost; + if (aggregate.isBucketedInput()) { + sortIOCost = 0.0; + } else { + sortIOCost = algoUtils.computeSortIOCost(new Pair(inputRCount,inputRAverageSize)); + } + final double hdfsWriteCost = algoUtils.computeHDFSWriteIOCost(outputRCount, outputRAverageSize); + final double ioCost = hdfsReadCost + sortIOCost + hdfsWriteCost; + // 4. Result + return HiveCost.FACTORY.makeCost(outputRCount, cpuCost, ioCost); } @Override @@ -215,16 +238,45 @@ public RelOptCost getCost(HiveJoin join) { // 3. IO cost = cost of writing intermediary results to local FS + // cost of reading from local FS for transferring to join + // cost of transferring map outputs to Join operator + // If any of the inputs is a new phase, add costs of reading from HDFS final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(join.getRight()); if (leftRAverageSize == null || rightRAverageSize == null) { return null; } + // If it is a new phase, we need to add the HDFS read cost + final Boolean leftTransition = RelMetadataQuery.isPhaseTransition(join.getLeft()); + if (leftTransition == null) { + return null; + } + final double leftHDFSReadCost; + if (leftTransition) { + leftHDFSReadCost = algoUtils.computeHDFSReadIOCost(leftRCount, leftRAverageSize); + } else { + leftHDFSReadCost = 0.0; + } + final Boolean rightTransition = RelMetadataQuery.isPhaseTransition(join.getRight()); + if (rightTransition == null) { + return null; + } + final double rightHDFSReadCost; + if (rightTransition) { + rightHDFSReadCost = algoUtils.computeHDFSReadIOCost(rightRCount, rightRAverageSize); + } else { + rightHDFSReadCost = 0.0; + } ImmutableList> relationInfos = new ImmutableList.Builder>(). add(new Pair(leftRCount,leftRAverageSize)). add(new Pair(rightRCount,rightRAverageSize)). build(); - final double ioCost = algoUtils.computeSortMergeIOCost(relationInfos); + final double sortMergeIOCost = algoUtils.computeSortMergeIOCost(relationInfos); + final Double outputRCount = RelMetadataQuery.getRowCount(join); + final Double outputRAverageSize = RelMetadataQuery.getAverageRowSize(join); + if (outputRCount == null || outputRAverageSize == null) { + return null; + } + final double joinHDFSWriteCost = algoUtils.computeHDFSWriteIOCost(outputRCount, outputRAverageSize); + final double ioCost = leftHDFSReadCost + rightHDFSReadCost + sortMergeIOCost + joinHDFSWriteCost; // 4. Result return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); } @@ -335,6 +387,27 @@ public RelOptCost getCost(HiveJoin join) { if (leftRAverageSize == null || rightRAverageSize == null) { return null; } + // If it is a new phase, we need to add the HDFS read cost + final Boolean leftTransition = RelMetadataQuery.isPhaseTransition(join.getLeft()); + if (leftTransition == null) { + return null; + } + final double leftHDFSReadCost; + if (leftTransition) { + leftHDFSReadCost = algoUtils.computeHDFSReadIOCost(leftRCount, leftRAverageSize); + } else { + leftHDFSReadCost = 0.0; + } + final Boolean rightTransition = RelMetadataQuery.isPhaseTransition(join.getRight()); + if (rightTransition == null) { + return null; + } + final double rightHDFSReadCost; + if (rightTransition) { + rightHDFSReadCost = algoUtils.computeHDFSReadIOCost(rightRCount, rightRAverageSize); + } else { + rightHDFSReadCost = 0.0; + } ImmutableList> relationInfos = new ImmutableList.Builder>(). add(new Pair(leftRCount,leftRAverageSize)). add(new Pair(rightRCount,rightRAverageSize)). @@ -344,7 +417,8 @@ public RelOptCost getCost(HiveJoin join) { final int parallelism = RelMetadataQuery.splitCount(join) == null ? 1 : RelMetadataQuery.splitCount(join); join.setJoinAlgorithm(oldAlgo); - final double ioCost = algoUtils.computeMapJoinIOCost(relationInfos, streaming, parallelism); + final double mapJoinIOCost = algoUtils.computeMapJoinIOCost(relationInfos, streaming, parallelism); + final double ioCost = leftHDFSReadCost + rightHDFSReadCost + mapJoinIOCost; // 4. Result return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); } @@ -498,6 +572,27 @@ public RelOptCost getCost(HiveJoin join) { if (leftRAverageSize == null || rightRAverageSize == null) { return null; } + // If it is a new phase, we need to add the HDFS read cost + final Boolean leftTransition = RelMetadataQuery.isPhaseTransition(join.getLeft()); + if (leftTransition == null) { + return null; + } + final double leftHDFSReadCost; + if (leftTransition) { + leftHDFSReadCost = algoUtils.computeHDFSReadIOCost(leftRCount, leftRAverageSize); + } else { + leftHDFSReadCost = 0.0; + } + final Boolean rightTransition = RelMetadataQuery.isPhaseTransition(join.getRight()); + if (rightTransition == null) { + return null; + } + final double rightHDFSReadCost; + if (rightTransition) { + rightHDFSReadCost = algoUtils.computeHDFSReadIOCost(rightRCount, rightRAverageSize); + } else { + rightHDFSReadCost = 0.0; + } ImmutableList> relationInfos = new ImmutableList.Builder>(). add(new Pair(leftRCount,leftRAverageSize)). add(new Pair(rightRCount,rightRAverageSize)). @@ -508,8 +603,9 @@ public RelOptCost getCost(HiveJoin join) { final int parallelism = RelMetadataQuery.splitCount(join) == null ? 1 : RelMetadataQuery.splitCount(join); join.setJoinAlgorithm(oldAlgo); - - final double ioCost = algoUtils.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism); + final double bucketMapJoinIOCost = algoUtils.computeBucketMapJoinIOCost( + relationInfos, streaming, parallelism); + final double ioCost = leftHDFSReadCost + rightHDFSReadCost + bucketMapJoinIOCost; // 4. Result return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); } @@ -653,6 +749,27 @@ public RelOptCost getCost(HiveJoin join) { if (leftRAverageSize == null || rightRAverageSize == null) { return null; } + // If it is a new phase, we need to add the HDFS read cost + final Boolean leftTransition = RelMetadataQuery.isPhaseTransition(join.getLeft()); + if (leftTransition == null) { + return null; + } + final double leftHDFSReadCost; + if (leftTransition) { + leftHDFSReadCost = algoUtils.computeHDFSReadIOCost(leftRCount, leftRAverageSize); + } else { + leftHDFSReadCost = 0.0; + } + final Boolean rightTransition = RelMetadataQuery.isPhaseTransition(join.getRight()); + if (rightTransition == null) { + return null; + } + final double rightHDFSReadCost; + if (rightTransition) { + rightHDFSReadCost = algoUtils.computeHDFSReadIOCost(rightRCount, rightRAverageSize); + } else { + rightHDFSReadCost = 0.0; + } ImmutableList> relationInfos = new ImmutableList.Builder>(). add(new Pair(leftRCount,leftRAverageSize)). add(new Pair(rightRCount,rightRAverageSize)). @@ -664,8 +781,9 @@ public RelOptCost getCost(HiveJoin join) { final int parallelism = RelMetadataQuery.splitCount(join) == null ? 1 : RelMetadataQuery .splitCount(join); join.setJoinAlgorithm(oldAlgo); - - final double ioCost = algoUtils.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism); + final double smbMapJoinIOCost = algoUtils.computeSMBMapJoinIOCost( + relationInfos, streaming, parallelism); + final double ioCost = leftHDFSReadCost + rightHDFSReadCost + smbMapJoinIOCost; // 4. Result return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java index c7afea9..08d005b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java @@ -26,6 +26,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.BuiltInMethod; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; @@ -52,6 +53,10 @@ public Boolean isPhaseTransition(HiveJoin join) { return join.isPhaseTransition(); } + public Boolean isPhaseTransition(HiveAggregate aggregate) { + return true; + } + public Boolean isPhaseTransition(HiveSort sort) { // As Exchange operator is introduced later on, we make a // sort operator create a new stage for the moment