diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java index 1bf41c3..6840418 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveAlgorithmsUtil.java @@ -65,9 +65,8 @@ public static RelOptCost computeCardinalityBasedCost(HiveRelNode hr) { return new HiveCost(hr.getRows(), 0, 0); } - public HiveCost computeCost(HiveTableScan t) { - double cardinality = t.getRows(); - return new HiveCost(cardinality, 0, hdfsWrite * cardinality * 0); + public HiveCost computeScanCost(double cardinality, double avgTupleSize) { + return new HiveCost(cardinality, 0, hdfsRead * cardinality * avgTupleSize); } public double computeSortMergeCPUCost( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java index 95fb4d7..4e3b654 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostModel.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import com.google.common.collect.ImmutableList; @@ -48,6 +49,8 @@ public HiveCostModel(Set joinAlgorithms) { public abstract RelOptCost getAggregateCost(HiveAggregate aggregate); + public abstract RelOptCost getScanCost(HiveTableScan ts); + public RelOptCost getJoinCost(HiveJoin join) { // Select algorithm with min cost JoinAlgorithm joinAlgorithm = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java index 27fba04..6669d32 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveDefaultCostModel.java @@ -23,6 +23,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; @@ -53,10 +54,14 @@ public RelOptCost getDefaultCost() { } @Override - public RelOptCost getAggregateCost(HiveAggregate aggregate) { + public RelOptCost getScanCost(HiveTableScan ts) { return HiveCost.FACTORY.makeZeroCost(); } + @Override + public RelOptCost getAggregateCost(HiveAggregate aggregate) { + return HiveCost.FACTORY.makeZeroCost(); + } /** * Default join algorithm. Cost is based on cardinality. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java index d8caef4..fb67309 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveOnTezCostModel.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; import com.google.common.collect.ImmutableList; @@ -71,6 +72,11 @@ public RelOptCost getDefaultCost() { } @Override + public RelOptCost getScanCost(HiveTableScan ts) { + return algoUtils.computeScanCost(ts.getRows(), RelMetadataQuery.getAverageRowSize(ts)); + } + + @Override public RelOptCost getAggregateCost(HiveAggregate aggregate) { if (aggregate.isBucketedInput()) { return HiveCost.FACTORY.makeZeroCost(); @@ -166,12 +172,18 @@ public Double getMemory(HiveJoin join) { @Override public Double getCumulativeMemoryWithinPhaseSplit(HiveJoin join) { + JoinAlgorithm oldAlgo = join.getJoinAlgorithm(); + join.setJoinAlgorithm(TezCommonJoinAlgorithm.INSTANCE); + final Double memoryWithinPhase = RelMetadataQuery.cumulativeMemoryWithinPhase(join); final Integer splitCount = RelMetadataQuery.splitCount(join); + join.setJoinAlgorithm(oldAlgo); + if (memoryWithinPhase == null || splitCount == null) { return null; } + return memoryWithinPhase / splitCount; } @@ -252,8 +264,11 @@ public RelOptCost getCost(HiveJoin join) { add(new Pair(leftRCount,leftRAverageSize)). add(new Pair(rightRCount,rightRAverageSize)). build(); + JoinAlgorithm oldAlgo = join.getJoinAlgorithm(); + join.setJoinAlgorithm(TezMapJoinAlgorithm.INSTANCE); final int parallelism = RelMetadataQuery.splitCount(join) == null ? 1 : RelMetadataQuery.splitCount(join); + join.setJoinAlgorithm(oldAlgo); final double ioCost = algoUtils.computeMapJoinIOCost(relationInfos, streaming, parallelism); // 4. Result return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); @@ -346,7 +361,13 @@ public boolean isExecutable(HiveJoin join) { // Requirements: for Bucket, bucketed by their keys on both sides and fitting in memory // Obtain number of buckets + //TODO: Incase of non bucketed splits would be computed based on data size/max part size + // What we need is a way to get buckets not splits + JoinAlgorithm oldAlgo = join.getJoinAlgorithm(); + join.setJoinAlgorithm(TezBucketJoinAlgorithm.INSTANCE); Integer buckets = RelMetadataQuery.splitCount(smallInput); + join.setJoinAlgorithm(oldAlgo); + if (buckets == null) { return false; } @@ -406,8 +427,13 @@ public RelOptCost getCost(HiveJoin join) { add(new Pair(leftRCount,leftRAverageSize)). add(new Pair(rightRCount,rightRAverageSize)). build(); + //TODO: No Of buckets is not same as no of splits + JoinAlgorithm oldAlgo = join.getJoinAlgorithm(); + join.setJoinAlgorithm(TezBucketJoinAlgorithm.INSTANCE); final int parallelism = RelMetadataQuery.splitCount(join) == null ? 1 : RelMetadataQuery.splitCount(join); + join.setJoinAlgorithm(oldAlgo); + final double ioCost = algoUtils.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism); // 4. Result return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); @@ -550,8 +576,14 @@ public RelOptCost getCost(HiveJoin join) { add(new Pair(leftRCount,leftRAverageSize)). add(new Pair(rightRCount,rightRAverageSize)). build(); - final int parallelism = RelMetadataQuery.splitCount(join) == null - ? 1 : RelMetadataQuery.splitCount(join); + + // TODO: Split count is not the same as no of buckets + JoinAlgorithm oldAlgo = join.getJoinAlgorithm(); + join.setJoinAlgorithm(TezSMBJoinAlgorithm.INSTANCE); + final int parallelism = RelMetadataQuery.splitCount(join) == null ? 1 : RelMetadataQuery + .splitCount(join); + join.setJoinAlgorithm(oldAlgo); + final double ioCost = algoUtils.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism); // 4. Result return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); @@ -575,9 +607,14 @@ public Double getMemory(HiveJoin join) { @Override public Double getCumulativeMemoryWithinPhaseSplit(HiveJoin join) { - final Double memoryWithinPhase = - RelMetadataQuery.cumulativeMemoryWithinPhase(join); + // TODO: Split count is not same as no of buckets + JoinAlgorithm oldAlgo = join.getJoinAlgorithm(); + join.setJoinAlgorithm(TezSMBJoinAlgorithm.INSTANCE); + + final Double memoryWithinPhase = RelMetadataQuery.cumulativeMemoryWithinPhase(join); final Integer splitCount = RelMetadataQuery.splitCount(join); + join.setJoinAlgorithm(oldAlgo); + if (memoryWithinPhase == null || splitCount == null) { return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java index fa2ec58..90f6a7e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveRelMdCost.java @@ -26,6 +26,7 @@ import org.apache.calcite.util.BuiltInMethod; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; import com.google.common.collect.ImmutableList; @@ -56,6 +57,10 @@ public RelOptCost getNonCumulativeCost(HiveJoin join) { return hiveCostModel.getJoinCost(join); } + public RelOptCost getNonCumulativeCost(HiveTableScan ts) { + return hiveCostModel.getScanCost(ts); + } + // Default case public RelOptCost getNonCumulativeCost(RelNode rel) { return hiveCostModel.getDefaultCost(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java index 375d47c..668960e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java @@ -112,6 +112,10 @@ public void setJoinAlgorithm(JoinAlgorithm joinAlgorithm) { this.joinAlgorithm = joinAlgorithm; } + public JoinAlgorithm getJoinAlgorithm() { + return this.joinAlgorithm; + } + public ImmutableList getCollation() { return joinAlgorithm.getCollation(this); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java index 96ca5ec..c7afea9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.optimizer.calcite.stats; +import java.util.List; + import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; import org.apache.calcite.rel.metadata.RelMdParallelism; @@ -61,8 +63,21 @@ public Integer splitCount(HiveJoin join) { } public Integer splitCount(HiveTableScan scan) { + Integer splitCount; + RelOptHiveTable table = (RelOptHiveTable) scan.getTable(); - return table.getHiveTableMD().getNumBuckets(); + List bucketCols = table.getHiveTableMD().getBucketCols(); + if (bucketCols != null && !bucketCols.isEmpty()) { + splitCount = table.getHiveTableMD().getNumBuckets(); + } else { + splitCount = splitCountRepartition(scan); + if (splitCount == null) { + throw new RuntimeException("Could not get split count for table: " + + scan.getTable().getQualifiedName()); + } + } + + return splitCount; } public Integer splitCount(RelNode rel) {