diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java index 598fdd0..47ecd47 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java @@ -44,17 +44,16 @@ public static HiveCost computeCost(HiveTableScan t) { return new HiveCost(cardinality, 0, HDFS_WRITE_COST * cardinality * 0); } - public static double computeCommonJoinCPUCost( + public static double computeSortMergeCPUCost( ImmutableList cardinalities, ImmutableBitSet sorted) { // Sort-merge join - assert cardinalities.size() == sorted.length(); double cpuCost = 0.0; for (int i=0; i> relationInfos) { // Sort-merge join double ioCost = 0.0; for (Pair relationInfo : relationInfos) { - double cardinality = relationInfo.left; - double averageTupleSize = relationInfo.right; - // Write cost - ioCost += cardinality * averageTupleSize * LOCAL_WRITE_COST; - // Read cost - ioCost += cardinality * averageTupleSize * LOCAL_READ_COST; - // Net transfer cost - ioCost += cardinality * averageTupleSize * NET_COST; + ioCost += computeSortIOCost(relationInfo); } return ioCost; } + public static double computeSortIOCost(Pair relationInfo) { + // Sort-merge join + double ioCost = 0.0; + double cardinality = relationInfo.left; + double averageTupleSize = relationInfo.right; + // Write cost + ioCost += cardinality * averageTupleSize * LOCAL_WRITE_COST; + // Read cost + ioCost += cardinality * averageTupleSize * LOCAL_READ_COST; + // Net transfer cost + ioCost += cardinality * averageTupleSize * NET_COST; + return ioCost; + } + public static double computeMapJoinCPUCost( ImmutableList cardinalities, ImmutableBitSet streaming) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java index 21ddc99..9a8a5da 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveAggregate.java @@ -30,8 +30,10 @@ import org.apache.calcite.rel.core.RelFactories.AggregateFactory; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Pair; import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCost; +import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveCostUtil; import com.google.common.collect.ImmutableList; @@ -39,11 +41,15 @@ public static final HiveAggRelFactory HIVE_AGGR_REL_FACTORY = new HiveAggRelFactory(); + // Whether input is already sorted + private boolean bucketedInput; + public HiveAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, boolean indicator, ImmutableBitSet groupSet, List groupSets, List aggCalls) throws InvalidRelException { super(cluster, TraitsUtil.getDefaultTraitSet(cluster), child, indicator, groupSet, groupSets, aggCalls); + this.bucketedInput = false; } @Override @@ -66,7 +72,34 @@ public void implement(Implementor implementor) { @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - return HiveCost.FACTORY.makeZeroCost(); + // Check whether input is in correct order + checkInputCorrectBucketing(); + if (this.bucketedInput) { + return HiveCost.FACTORY.makeZeroCost(); + } else { + // 1. Sum of input cardinalities + final Double rCount = RelMetadataQuery.getRowCount(this.getInput()); + if (rCount == null) { + return null; + } + // 2. CPU cost = sorting cost + final double cpuCost = HiveCostUtil.computeSortCPUCost(rCount); + // 3. IO cost = cost of writing intermediary results to local FS + + // cost of reading from local FS for transferring to GBy + + // cost of transferring map outputs to GBy operator + final Double rAverageSize = RelMetadataQuery.getAverageRowSize(this.getInput()); + if (rAverageSize == null) { + return null; + } + final double ioCost = HiveCostUtil.computeSortIOCost(new Pair(rCount,rAverageSize)); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + } + } + + private void checkInputCorrectBucketing() { + this.bucketedInput = RelMetadataQuery.distribution(this.getInput()).getKeys(). + containsAll(this.getGroupSet().asList()); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java index 7f6cb2d..e2b010b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveJoin.java @@ -78,13 +78,15 @@ private JoinAlgorithm joinAlgorithm; private MapJoinStreamingRelation mapJoinStreamingSide; private RelOptCost joinCost; + // Whether inputs are already sorted + private ImmutableBitSet sortedInputs; public static HiveJoin getJoin(RelOptCluster cluster, RelNode left, RelNode right, RexNode condition, JoinRelType joinType, boolean leftSemiJoin) { try { Set variablesStopped = Collections.emptySet(); return new HiveJoin(cluster, null, left, right, condition, joinType, variablesStopped, - JoinAlgorithm.NONE, MapJoinStreamingRelation.NONE, leftSemiJoin); + JoinAlgorithm.NONE, MapJoinStreamingRelation.NONE, ImmutableBitSet.of(), leftSemiJoin); } catch (InvalidRelException e) { throw new RuntimeException(e); } @@ -92,12 +94,13 @@ public static HiveJoin getJoin(RelOptCluster cluster, RelNode left, RelNode righ protected HiveJoin(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, JoinRelType joinType, Set variablesStopped, - JoinAlgorithm joinAlgo, MapJoinStreamingRelation streamingSideForMapJoin, boolean leftSemiJoin) - throws InvalidRelException { + JoinAlgorithm joinAlgo, MapJoinStreamingRelation streamingSideForMapJoin, + ImmutableBitSet sortedInputs, boolean leftSemiJoin) throws InvalidRelException { super(cluster, TraitsUtil.getDefaultTraitSet(cluster), left, right, condition, joinType, variablesStopped); this.joinAlgorithm = joinAlgo; this.mapJoinStreamingSide = streamingSideForMapJoin; + this.sortedInputs = sortedInputs; this.leftSemiJoin = leftSemiJoin; this.maxMemory = (double) HiveConf.getLongVar( cluster.getPlanner().getContext().unwrap(HiveConf.class), @@ -114,7 +117,7 @@ public final HiveJoin copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode try { Set variablesStopped = Collections.emptySet(); return new HiveJoin(getCluster(), traitSet, left, right, conditionExpr, joinType, - variablesStopped, joinAlgorithm, mapJoinStreamingSide, leftSemiJoin); + variablesStopped, joinAlgorithm, mapJoinStreamingSide, sortedInputs, leftSemiJoin); } catch (InvalidRelException e) { // Semantic error not possible. Must be a bug. Convert to // internal error. @@ -146,9 +149,11 @@ public RelOptCost computeSelfCost(RelOptPlanner planner) { private RelOptCost chooseJoinAlgorithmAndGetCost() { // 1. Choose streaming side chooseStreamingSide(); - // 2. Get possible algorithms + // 2. Store order inputs + checkInputsCorrectOrder(); + // 3. Get possible algorithms Set possibleAlgorithms = obtainJoinAlgorithms(); - // 3. For each possible algorithm, calculate cost, and select best + // 4. For each possible algorithm, calculate cost, and select best RelOptCost selfCost = null; for (JoinAlgorithm possibleAlgorithm : possibleAlgorithms) { switch (possibleAlgorithm) { @@ -219,6 +224,27 @@ private void chooseStreamingSide() { } } + private void checkInputsCorrectOrder() { + JoinPredicateInfo joinPredInfo = HiveCalciteUtil.JoinPredicateInfo. + constructJoinPredicateInfo(this); + List joinKeysInChildren = new ArrayList(); + joinKeysInChildren.add( + ImmutableIntList.copyOf( + joinPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema())); + joinKeysInChildren.add( + ImmutableIntList.copyOf( + joinPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema())); + + for (int i=0; i obtainJoinAlgorithms() { Set possibleAlgorithms = new HashSet(); @@ -265,8 +291,7 @@ private void chooseStreamingSide() { RelNode input = getInputs().get(i); // Is smbJoin possible? We need correct order if (orderedBucketed) { - boolean orderFound = RelCollations.contains( - RelMetadataQuery.collations(input), joinKeysInChildren.get(i)); + boolean orderFound = sortedInputs.get(i); if (!orderFound) { orderedBucketed = false; } @@ -278,8 +303,7 @@ private void chooseStreamingSide() { orderedBucketed = false; bucketed = false; } - if (!(joinKeysInChildren.get(i).containsAll(distribution.getKeys()) - && distribution.getKeys().containsAll(joinKeysInChildren.get(i)))) { + if (!distribution.getKeys().containsAll(joinKeysInChildren.get(i))) { orderedBucketed = false; bucketed = false; } @@ -328,9 +352,7 @@ private RelOptCost computeSelfCostCommonJoin() { add(leftRCount). add(rightRCount). build(); - // TODO: Check whether inputs are already sorted; currently we assume - // we need to sort all of them - final double cpuCost = HiveCostUtil.computeCommonJoinCPUCost(cardinalities,ImmutableBitSet.range(2)); + final double cpuCost = HiveCostUtil.computeSortMergeCPUCost(cardinalities, sortedInputs); // 3. IO cost = cost of writing intermediary results to local FS + // cost of reading from local FS for transferring to join + // cost of transferring map outputs to Join operator @@ -343,7 +365,7 @@ private RelOptCost computeSelfCostCommonJoin() { add(new Pair(leftRCount,leftRAverageSize)). add(new Pair(rightRCount,rightRAverageSize)). build(); - final double ioCost = HiveCostUtil.computeCommonJoinIOCost(relationInfos); + final double ioCost = HiveCostUtil.computeSortMergeIOCost(relationInfos); // 4. Result return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); }