diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java index 78e8367..95c2b0b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java @@ -57,9 +57,11 @@ @Override public ParseContext transform(ParseContext pctx) throws SemanticException { + String SEL = SelectOperator.getOperatorName(); String FIL = FilterOperator.getOperatorName(); Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp("R1", FIL + "%" + FIL + "%"), new FilterDedup()); + opRules.put(new RuleRegExp("R1", SEL + "%" + SEL + "%"), new SelectDedup(pctx)); + opRules.put(new RuleRegExp("R2", FIL + "%" + FIL + "%"), new FilterDedup()); Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -70,12 +72,8 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { return pctx; } - @Deprecated private class SelectDedup implements NodeProcessor { - // This is taken care of now by - // {@link org.apache.hadoop.hive.ql.optimizer.IdentityProjectRemover} - private ParseContext pctx; public SelectDedup (ParseContext pctx) { @@ -244,4 +242,4 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } } -} +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 3482a47..81cadd1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -138,7 +138,9 @@ public void initialize(HiveConf hiveConf) { if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { transformations.add(new ReduceSinkDeDuplication()); } - transformations.add(new NonBlockingOpDeDupProc()); + if(!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { + transformations.add(new NonBlockingOpDeDupProc()); + } if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEIDENTITYPROJECTREMOVER)) { transformations.add(new IdentityProjectRemover()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java new file mode 100644 index 0000000..09f79bb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveConfigContext.java @@ -0,0 +1,20 @@ +package org.apache.hadoop.hive.ql.optimizer.calcite; + +import org.apache.calcite.plan.Context; +import org.apache.hadoop.hive.conf.HiveConf; + + +public class HiveConfigContext implements Context { + private HiveConf config; + + public HiveConfigContext(HiveConf config) { + this.config = config; + } + + public T unwrap(Class clazz) { + if (clazz.isInstance(config)) { + return clazz.cast(config); + } + return null; + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java index 837399b..aa4ce17 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveDefaultRelMetadataProvider.java @@ -19,22 +19,49 @@ import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider; import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMdDistribution; import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdCollation; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdDistinctRowCount; +import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdMemory; +import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdParallelism; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdRowCount; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdSelectivity; +import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdSize; import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveRelMdUniqueKeys; import com.google.common.collect.ImmutableList; public class HiveDefaultRelMetadataProvider { - private HiveDefaultRelMetadataProvider() { + + private final HiveConf hiveConf; + + + public HiveDefaultRelMetadataProvider(HiveConf hiveConf) { + this.hiveConf = hiveConf; + } + + public RelMetadataProvider getMetadataProvider() { + + // Init HiveRelMdParallelism with max split size + Double maxSplitSize = (double) HiveConf.getLongVar( + this.hiveConf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE); + HiveRelMdParallelism hiveRelMdParallelism = + new HiveRelMdParallelism(maxSplitSize); + + // Return MD provider + return ChainedRelMetadataProvider.of(ImmutableList + .of(HiveRelMdDistinctRowCount.SOURCE, + HiveRelMdSelectivity.SOURCE, + HiveRelMdRowCount.SOURCE, + HiveRelMdUniqueKeys.SOURCE, + HiveRelMdSize.SOURCE, + HiveRelMdMemory.SOURCE, + hiveRelMdParallelism.getMetadataProvider(), + RelMdDistribution.SOURCE, + HiveRelMdCollation.SOURCE, + new DefaultRelMetadataProvider())); } - public static final RelMetadataProvider INSTANCE = ChainedRelMetadataProvider.of(ImmutableList - .of(HiveRelMdDistinctRowCount.SOURCE, - HiveRelMdSelectivity.SOURCE, - HiveRelMdRowCount.SOURCE, - HiveRelMdUniqueKeys.SOURCE, - new DefaultRelMetadataProvider())); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelCollation.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelCollation.java new file mode 100644 index 0000000..a40135f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelCollation.java @@ -0,0 +1,16 @@ +package org.apache.hadoop.hive.ql.optimizer.calcite; + +import org.apache.calcite.rel.RelCollationImpl; +import org.apache.calcite.rel.RelFieldCollation; + +import com.google.common.collect.ImmutableList; + +public class HiveRelCollation extends RelCollationImpl { + + public HiveRelCollation(ImmutableList fieldCollations) { + super(fieldCollations); + } + +} + + diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java index 71b6680..41604cd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCost.java @@ -90,22 +90,17 @@ public double getIo() { return io; } - // TODO: If two cost is equal, could we do any better than comparing - // cardinality (may be some other heuristics to break the tie) public boolean isLe(RelOptCost other) { - return this == other || this.rowCount <= other.getRows(); - /* - * if (((this.dCpu + this.dIo) < (other.getCpu() + other.getIo())) || - * ((this.dCpu + this.dIo) == (other.getCpu() + other.getIo()) && this.dRows - * <= other.getRows())) { return true; } else { return false; } - */ + if ( (this.cpu + this.io < other.getCpu() + other.getIo()) || + ((this.cpu + this.io == other.getCpu() + other.getIo()) && + (this.rowCount <= other.getRows()))) { + return true; + } + return false; } public boolean isLt(RelOptCost other) { - return this.rowCount < other.getRows(); - /* - * return isLe(other) && !equals(other); - */ + return isLe(other) && !equals(other); } public double getRows() { @@ -113,21 +108,14 @@ public double getRows() { } public boolean equals(RelOptCost other) { - return (this == other) || ((this.rowCount) == (other.getRows())); - - /* - * //TODO: should we consider cardinality as well? return (this == other) || - * ((this.dCpu + this.dIo) == (other.getCpu() + other.getIo())); - */ + return (this == other) || + ((this.cpu + this.io == other.getCpu() + other.getIo()) && + (this.rowCount == other.getRows())); } public boolean isEqWithEpsilon(RelOptCost other) { - return (this == other) || (Math.abs((this.rowCount) - (other.getRows())) < RelOptUtil.EPSILON); - // Turn this one once we do the Algorithm selection in CBO - /* - * return (this == other) || (Math.abs((this.dCpu + this.dIo) - - * (other.getCpu() + other.getIo())) < RelOptUtil.EPSILON); - */ + return (this == other) || (Math.abs((this.cpu + this.io) - + (other.getCpu() + other.getIo())) < RelOptUtil.EPSILON); } public RelOptCost minus(RelOptCost other) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java index c7e9217..598fdd0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/cost/HiveCostUtil.java @@ -18,19 +18,22 @@ package org.apache.hadoop.hive.ql.optimizer.calcite.cost; import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Pair; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import com.google.common.collect.ImmutableList; + // Use this once we have Join Algorithm selection public class HiveCostUtil { - private static final double cpuCostInNanoSec = 1.0; - private static final double netCostInNanoSec = 150 * cpuCostInNanoSec; - private static final double localFSWriteCostInNanoSec = 4 * netCostInNanoSec; - private static final double localFSReadCostInNanoSec = 4 * netCostInNanoSec; - private static final double hDFSWriteCostInNanoSec = 10 * localFSWriteCostInNanoSec; - @SuppressWarnings("unused") -//Use this once we have Join Algorithm selection - private static final double hDFSReadCostInNanoSec = 1.5 * localFSReadCostInNanoSec; + + private static final double CPU_COST = 1.0; + private static final double NET_COST = 150.0 * CPU_COST; + private static final double LOCAL_WRITE_COST = 4.0 * NET_COST; + private static final double LOCAL_READ_COST = 4.0 * NET_COST; + private static final double HDFS_WRITE_COST = 10.0 * LOCAL_WRITE_COST; + private static final double HDFS_READ_COST = 1.5 * LOCAL_READ_COST; public static RelOptCost computCardinalityBasedCost(HiveRelNode hr) { return new HiveCost(hr.getRows(), 0, 0); @@ -38,6 +41,127 @@ public static RelOptCost computCardinalityBasedCost(HiveRelNode hr) { public static HiveCost computeCost(HiveTableScan t) { double cardinality = t.getRows(); - return new HiveCost(cardinality, 0, hDFSWriteCostInNanoSec * cardinality * 0); + return new HiveCost(cardinality, 0, HDFS_WRITE_COST * cardinality * 0); + } + + public static double computeCommonJoinCPUCost( + ImmutableList cardinalities, + ImmutableBitSet sorted) { + // Sort-merge join + assert cardinalities.size() == sorted.length(); + double cpuCost = 0.0; + for (int i=0; i> relationInfos) { + // Sort-merge join + double ioCost = 0.0; + for (Pair relationInfo : relationInfos) { + double cardinality = relationInfo.left; + double averageTupleSize = relationInfo.right; + // Write cost + ioCost += cardinality * averageTupleSize * LOCAL_WRITE_COST; + // Read cost + ioCost += cardinality * averageTupleSize * LOCAL_READ_COST; + // Net transfer cost + ioCost += cardinality * averageTupleSize * NET_COST; + } + return ioCost; + } + + public static double computeMapJoinCPUCost( + ImmutableList cardinalities, + ImmutableBitSet streaming) { + // Hash-join + double cpuCost = 0.0; + for (int i=0; i> relationInfos, + ImmutableBitSet streaming, int parallelism) { + // Hash-join + double ioCost = 0.0; + for (int i=0; i cardinalities, + ImmutableBitSet streaming) { + // Hash-join + double cpuCost = 0.0; + for (int i=0; i> relationInfos, + ImmutableBitSet streaming, int parallelism) { + // Hash-join + double ioCost = 0.0; + for (int i=0; i cardinalities) { + // Hash-join + double cpuCost = 0.0; + for (int i=0; i> relationInfos, + ImmutableBitSet streaming, int parallelism) { + // Hash-join + double ioCost = 0.0; + for (int i=0; i variablesStopped = Collections.emptySet(); return new HiveJoin(cluster, null, left, right, condition, joinType, variablesStopped, - JoinAlgorithm.NONE, null, leftSemiJoin); + JoinAlgorithm.NONE, MapJoinStreamingRelation.NONE, leftSemiJoin); } catch (InvalidRelException e) { throw new RuntimeException(e); } @@ -77,7 +98,11 @@ protected HiveJoin(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelN super(cluster, TraitsUtil.getDefaultTraitSet(cluster), left, right, condition, joinType, variablesStopped); this.joinAlgorithm = joinAlgo; + this.mapJoinStreamingSide = streamingSideForMapJoin; this.leftSemiJoin = leftSemiJoin; + this.maxMemory = (double) HiveConf.getLongVar( + cluster.getPlanner().getContext().unwrap(HiveConf.class), + HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); } @Override @@ -90,7 +115,7 @@ public final HiveJoin copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode try { Set variablesStopped = Collections.emptySet(); return new HiveJoin(getCluster(), traitSet, left, right, conditionExpr, joinType, - variablesStopped, JoinAlgorithm.NONE, null, leftSemiJoin); + variablesStopped, joinAlgorithm, mapJoinStreamingSide, leftSemiJoin); } catch (InvalidRelException e) { // Semantic error not possible. Must be a bug. Convert to // internal error. @@ -102,6 +127,10 @@ public JoinAlgorithm getJoinAlgorithm() { return joinAlgorithm; } + public MapJoinStreamingRelation getMapJoinStreamingSide() { + return mapJoinStreamingSide; + } + public boolean isLeftSemiJoin() { return leftSemiJoin; } @@ -111,9 +140,355 @@ public boolean isLeftSemiJoin() { */ @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - double leftRCount = RelMetadataQuery.getRowCount(getLeft()); - double rightRCount = RelMetadataQuery.getRowCount(getRight()); - return HiveCost.FACTORY.makeCost(leftRCount + rightRCount, 0.0, 0.0); + this.joinCost = chooseJoinAlgorithmAndGetCost(); + return this.joinCost; + } + + private RelOptCost chooseJoinAlgorithmAndGetCost() { + // 1. Choose streaming side + chooseStreamingSide(); + // 2. Get possible algorithms + Set possibleAlgorithms = obtainJoinAlgorithms(); + // 3. For each possible algorithm, calculate cost, and select best + RelOptCost selfCost = null; + for (JoinAlgorithm possibleAlgorithm : possibleAlgorithms) { + switch (possibleAlgorithm) { + case COMMON_JOIN: + RelOptCost commonJoinCost = computeSelfCostCommonJoin(); + if (LOG.isDebugEnabled()) { + LOG.debug("COMMONJOIN possible"); + LOG.debug("COMMONJOIN cost: " + commonJoinCost); + } + if (selfCost == null || commonJoinCost.isLt(selfCost) ) { + this.joinAlgorithm = JoinAlgorithm.COMMON_JOIN; + selfCost = commonJoinCost; + } + break; + case MAP_JOIN: + RelOptCost mapJoinCost = computeSelfCostMapJoin(); + if (LOG.isDebugEnabled()) { + LOG.debug("MAPJOIN possible"); + LOG.debug("MAPJOIN cost: " + mapJoinCost); + } + if (selfCost == null || mapJoinCost.isLt(selfCost) ) { + this.joinAlgorithm = JoinAlgorithm.MAP_JOIN; + selfCost = mapJoinCost; + } + break; + case BUCKET_JOIN: + RelOptCost bucketJoinCost = computeSelfCostBucketJoin(); + if (LOG.isDebugEnabled()) { + LOG.debug("BUCKETJOIN possible"); + LOG.debug("BUCKETJOIN cost: " + bucketJoinCost); + } + if (selfCost == null || bucketJoinCost.isLt(selfCost) ) { + this.joinAlgorithm = JoinAlgorithm.BUCKET_JOIN; + selfCost = bucketJoinCost; + } + break; + case SMB_JOIN: + RelOptCost smbJoinCost = computeSelfCostSMBJoin(); + if (LOG.isDebugEnabled()) { + LOG.debug("SMBJOIN possible"); + LOG.debug("SMBJOIN cost: " + smbJoinCost); + } + if (selfCost == null || smbJoinCost.isLt(selfCost) ) { + this.joinAlgorithm = JoinAlgorithm.SMB_JOIN; + selfCost = smbJoinCost; + } + break; + default: + //TODO: Exception + } + } + return selfCost; + } + + private void chooseStreamingSide() { + Double leftInputSize = RelMetadataQuery.memory(this.getLeft()); + Double rightInputSize = RelMetadataQuery.memory(this.getRight()); + if (leftInputSize == null && rightInputSize == null) { + this.mapJoinStreamingSide = MapJoinStreamingRelation.NONE; + } else if (leftInputSize != null && + (rightInputSize == null || + (leftInputSize < rightInputSize))) { + this.mapJoinStreamingSide = MapJoinStreamingRelation.RIGHT_RELATION; + } else if (rightInputSize != null && + (leftInputSize == null || + (rightInputSize <= leftInputSize))) { + this.mapJoinStreamingSide = MapJoinStreamingRelation.LEFT_RELATION; + } + } + + private Set obtainJoinAlgorithms() { + Set possibleAlgorithms = new HashSet(); + + // Check streaming side + RelNode smallInput; + if (this.mapJoinStreamingSide == MapJoinStreamingRelation.LEFT_RELATION) { + smallInput = this.getRight(); + } else if (this.mapJoinStreamingSide == MapJoinStreamingRelation.RIGHT_RELATION) { + smallInput = this.getLeft(); + } else { + smallInput = null; + } + + if (smallInput != null) { + // Requirements: + // - For SMB, sorted by their keys on both sides and bucketed. + // - For Bucket, bucketed by their keys on both sides. / Fitting in memory + // - For Map, no additional requirement. / Fitting in memory + + // Get key columns + JoinPredicateInfo joinPredInfo = HiveCalciteUtil.JoinPredicateInfo. + constructJoinPredicateInfo(this); + List joinKeysInChildren = new ArrayList(); + joinKeysInChildren.add( + ImmutableIntList.copyOf( + joinPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema())); + joinKeysInChildren.add( + ImmutableIntList.copyOf( + joinPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema())); + + // Obtain number of buckets + Integer buckets = RelMetadataQuery.splitCount(smallInput); + // Obtain map algorithms for which smallest input fits in memory + boolean bucketFitsMemory = false; + boolean inputFitsMemory = false; + if (buckets != null) { + bucketFitsMemory = isFittingIntoMemory(this.maxMemory, smallInput, buckets); + } + inputFitsMemory = bucketFitsMemory ? + isFittingIntoMemory(this.maxMemory, smallInput, 1) : false; + boolean orderedBucketed = true; + boolean bucketed = true; + for (int i=0; i maxSize) { + return false; + } + return true; + } + return false; + } + + private RelOptCost computeSelfCostCommonJoin() { + // 1. Sum of input cardinalities + final Double leftRCount = RelMetadataQuery.getRowCount(getLeft()); + final Double rightRCount = RelMetadataQuery.getRowCount(getRight()); + if (leftRCount == null || rightRCount == null) { + return null; + } + final double rCount = leftRCount + rightRCount; + // 2. CPU cost = sorting cost (for each relation) + + // total merge cost + ImmutableList cardinalities = new ImmutableList.Builder(). + add(leftRCount). + add(rightRCount). + build(); + // TODO: Check whether inputs are already sorted; currently we assume + // we need to sort all of them + final double cpuCost = HiveCostUtil.computeCommonJoinCPUCost(cardinalities,ImmutableBitSet.range(2)); + // 3. IO cost = cost of writing intermediary results to local FS + + // cost of reading from local FS for transferring to join + + // cost of transferring map outputs to Join operator + final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(getLeft()); + final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(getRight()); + if (leftRAverageSize == null || rightRAverageSize == null) { + return null; + } + ImmutableList> relationInfos = new ImmutableList.Builder>(). + add(new Pair(leftRCount,leftRAverageSize)). + add(new Pair(rightRCount,rightRAverageSize)). + build(); + final double ioCost = HiveCostUtil.computeCommonJoinIOCost(relationInfos); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + } + + private RelOptCost computeSelfCostMapJoin() { + // 1. Sum of input cardinalities + final Double leftRCount = RelMetadataQuery.getRowCount(getLeft()); + final Double rightRCount = RelMetadataQuery.getRowCount(getRight()); + if (leftRCount == null || rightRCount == null) { + return null; + } + final double rCount = leftRCount + rightRCount; + // 2. CPU cost = HashTable construction cost + + // join cost + ImmutableList cardinalities = new ImmutableList.Builder(). + add(leftRCount). + add(rightRCount). + build(); + ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); + switch (mapJoinStreamingSide) { + case LEFT_RELATION: + streamingBuilder.set(0); + break; + case RIGHT_RELATION: + streamingBuilder.set(1); + break; + default: + return null; + } + ImmutableBitSet streaming = streamingBuilder.build(); + final double cpuCost = HiveCostUtil.computeMapJoinCPUCost(cardinalities, streaming); + // 3. IO cost = cost of transferring small tables to join node * + // degree of parallelism + final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(getLeft()); + final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(getRight()); + if (leftRAverageSize == null || rightRAverageSize == null) { + return null; + } + ImmutableList> relationInfos = new ImmutableList.Builder>(). + add(new Pair(leftRCount,leftRAverageSize)). + add(new Pair(rightRCount,rightRAverageSize)). + build(); + final int parallelism = RelMetadataQuery.splitCount(this) == null + ? 1 : RelMetadataQuery.splitCount(this); + final double ioCost = HiveCostUtil.computeMapJoinIOCost(relationInfos, streaming, parallelism); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + } + + private RelOptCost computeSelfCostBucketJoin() { + // 1. Sum of input cardinalities + final Double leftRCount = RelMetadataQuery.getRowCount(getLeft()); + final Double rightRCount = RelMetadataQuery.getRowCount(getRight()); + if (leftRCount == null || rightRCount == null) { + return null; + } + final double rCount = leftRCount + rightRCount; + // 2. CPU cost = HashTable construction cost + + // join cost + ImmutableList cardinalities = new ImmutableList.Builder(). + add(leftRCount). + add(rightRCount). + build(); + ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); + switch (mapJoinStreamingSide) { + case LEFT_RELATION: + streamingBuilder.set(0); + break; + case RIGHT_RELATION: + streamingBuilder.set(1); + break; + default: + return null; + } + ImmutableBitSet streaming = streamingBuilder.build(); + final double cpuCost = HiveCostUtil.computeBucketMapJoinCPUCost(cardinalities, streaming); + // 3. IO cost = cost of transferring small tables to join node * + // degree of parallelism + final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(getLeft()); + final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(getRight()); + if (leftRAverageSize == null || rightRAverageSize == null) { + return null; + } + ImmutableList> relationInfos = new ImmutableList.Builder>(). + add(new Pair(leftRCount,leftRAverageSize)). + add(new Pair(rightRCount,rightRAverageSize)). + build(); + final int parallelism = RelMetadataQuery.splitCount(this) == null + ? 1 : RelMetadataQuery.splitCount(this); + final double ioCost = HiveCostUtil.computeBucketMapJoinIOCost(relationInfos, streaming, parallelism); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + } + + private RelOptCost computeSelfCostSMBJoin() { + // 1. Sum of input cardinalities + final Double leftRCount = RelMetadataQuery.getRowCount(getLeft()); + final Double rightRCount = RelMetadataQuery.getRowCount(getRight()); + if (leftRCount == null || rightRCount == null) { + return null; + } + final double rCount = leftRCount + rightRCount; + // 2. CPU cost = HashTable construction cost + + // join cost + ImmutableList cardinalities = new ImmutableList.Builder(). + add(leftRCount). + add(rightRCount). + build(); + ImmutableBitSet.Builder streamingBuilder = new ImmutableBitSet.Builder(); + switch (mapJoinStreamingSide) { + case LEFT_RELATION: + streamingBuilder.set(0); + break; + case RIGHT_RELATION: + streamingBuilder.set(1); + break; + default: + return null; + } + ImmutableBitSet streaming = streamingBuilder.build(); + final double cpuCost = HiveCostUtil.computeSMBMapJoinCPUCost(cardinalities); + // 3. IO cost = cost of transferring small tables to join node * + // degree of parallelism + final Double leftRAverageSize = RelMetadataQuery.getAverageRowSize(getLeft()); + final Double rightRAverageSize = RelMetadataQuery.getAverageRowSize(getRight()); + if (leftRAverageSize == null || rightRAverageSize == null) { + return null; + } + ImmutableList> relationInfos = new ImmutableList.Builder>(). + add(new Pair(leftRCount,leftRAverageSize)). + add(new Pair(rightRCount,rightRAverageSize)). + build(); + final int parallelism = RelMetadataQuery.splitCount(this) == null + ? 1 : RelMetadataQuery.splitCount(this); + final double ioCost = HiveCostUtil.computeSMBMapJoinIOCost(relationInfos, streaming, parallelism); + // 4. Result + return HiveCost.FACTORY.makeCost(rCount, cpuCost, ioCost); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .item("joinAlgorithm", joinAlgorithm.name().toLowerCase()) + .item("cost", joinCost); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSortExchange.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSortExchange.java new file mode 100644 index 0000000..d11f69d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveSortExchange.java @@ -0,0 +1,49 @@ +package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators; + +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelDistributionTraitDef; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.SortExchange; + +public class HiveSortExchange extends SortExchange { + + private HiveSortExchange(RelOptCluster cluster, RelTraitSet traitSet, + RelNode input, RelDistribution distribution, RelCollation collation) { + super(cluster, traitSet, input, distribution, collation); + } + + public HiveSortExchange(RelInput input) { + super(input); + } + + /** + * Creates a HiveSortExchange. + * + * @param input Input relational expression + * @param distribution Distribution specification + * @param collation Collation specification + */ + public static HiveSortExchange create(RelNode input, + RelDistribution distribution, RelCollation collation) { + RelOptCluster cluster = input.getCluster(); + distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution); + RelTraitSet traitSet = + input.getTraitSet().replace(Convention.NONE).replace(distribution); + collation = RelCollationTraitDef.INSTANCE.canonize(collation); + return new HiveSortExchange(cluster, traitSet, input, distribution, collation); + } + + @Override + public SortExchange copy(RelTraitSet traitSet, RelNode newInput, RelDistribution newDistribution, + RelCollation newCollation) { + return new HiveSortExchange(getCluster(), traitSet, newInput, + newDistribution, newCollation); + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchange4JoinRule.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchange4JoinRule.java index cdffb45..b5404a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchange4JoinRule.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveInsertExchange4JoinRule.java @@ -23,15 +23,20 @@ import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Exchange; import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.logical.LogicalExchange; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelCollation; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelDistribution; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortExchange; + +import com.google.common.collect.ImmutableList; /** Not an optimization rule. * Rule to aid in translation from Calcite tree -> Hive tree. @@ -43,12 +48,11 @@ * Join */ public class HiveInsertExchange4JoinRule extends RelOptRule { - + protected static transient final Log LOG = LogFactory .getLog(HiveInsertExchange4JoinRule.class); public HiveInsertExchange4JoinRule() { - // match join with exactly 2 inputs super(RelOptRule.operand(Join.class, operand(RelNode.class, any()), @@ -58,9 +62,9 @@ public HiveInsertExchange4JoinRule() { @Override public void onMatch(RelOptRuleCall call) { Join join = call.rel(0); - - if (call.rel(1) instanceof LogicalExchange && - call.rel(2) instanceof LogicalExchange) { + + if (call.rel(1) instanceof Exchange && + call.rel(2) instanceof Exchange) { return; } @@ -68,23 +72,38 @@ public void onMatch(RelOptRuleCall call) { HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join); // get key columns from inputs. Those are the columns on which we will distribute on. + // It is also the columns we will sort on. List joinLeftKeyPositions = new ArrayList(); List joinRightKeyPositions = new ArrayList(); + ImmutableList.Builder leftCollationListBuilder = + new ImmutableList.Builder(); + ImmutableList.Builder rightCollationListBuilder = + new ImmutableList.Builder(); for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) { JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo. getEquiJoinPredicateElements().get(i); joinLeftKeyPositions.addAll(joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema()); + for (int leftPos : joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInChildSchema()) { + leftCollationListBuilder.add(new RelFieldCollation(leftPos)); + } joinRightKeyPositions.addAll(joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema()); + for (int rightPos : joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInChildSchema()) { + rightCollationListBuilder.add(new RelFieldCollation(rightPos)); + } } - LogicalExchange left = LogicalExchange.create(join.getLeft(), new HiveRelDistribution(RelDistribution.Type.HASH_DISTRIBUTED, joinLeftKeyPositions)); - LogicalExchange right = LogicalExchange.create(join.getRight(), new HiveRelDistribution(RelDistribution.Type.HASH_DISTRIBUTED, joinRightKeyPositions)); + HiveSortExchange left = HiveSortExchange.create(join.getLeft(), + new HiveRelDistribution(RelDistribution.Type.HASH_DISTRIBUTED, joinLeftKeyPositions), + new HiveRelCollation(leftCollationListBuilder.build())); + HiveSortExchange right = HiveSortExchange.create(join.getRight(), + new HiveRelDistribution(RelDistribution.Type.HASH_DISTRIBUTED, joinRightKeyPositions), + new HiveRelCollation(rightCollationListBuilder.build())); Join newJoin = join.copy(join.getTraitSet(), join.getCondition(), left, right, join.getJoinType(), join.isSemiJoinDone()); call.getPlanner().onCopy(join, newJoin); - + call.transformTo(newJoin); } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java new file mode 100644 index 0000000..f29a91c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdCollation.java @@ -0,0 +1,90 @@ +package org.apache.hadoop.hive.ql.optimizer.calcite.stats; + +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider; +import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMdCollation; +import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinLeafPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil.JoinPredicateInfo; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelCollation; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; + +import com.google.common.collect.ImmutableList; + +public class HiveRelMdCollation { + + public static final RelMetadataProvider SOURCE = + ChainedRelMetadataProvider.of( + ImmutableList.of( + ReflectiveRelMetadataProvider.reflectiveSource( + BuiltInMethod.COLLATIONS.method, new HiveRelMdCollation()), + RelMdCollation.SOURCE)); + + //~ Constructors ----------------------------------------------------------- + + private HiveRelMdCollation() {} + + //~ Methods ---------------------------------------------------------------- + + public ImmutableList collations(HiveJoin join) { + // Compute collations + ImmutableList.Builder collationListBuilder = + new ImmutableList.Builder(); + ImmutableList.Builder leftCollationListBuilder = + new ImmutableList.Builder(); + ImmutableList.Builder rightCollationListBuilder = + new ImmutableList.Builder(); + JoinPredicateInfo joinPredInfo = + HiveCalciteUtil.JoinPredicateInfo.constructJoinPredicateInfo(join); + for (int i = 0; i < joinPredInfo.getEquiJoinPredicateElements().size(); i++) { + JoinLeafPredicateInfo joinLeafPredInfo = joinPredInfo. + getEquiJoinPredicateElements().get(i); + for (int leftPos : joinLeafPredInfo.getProjsFromLeftPartOfJoinKeysInJoinSchema()) { + final RelFieldCollation leftFieldCollation = new RelFieldCollation(leftPos); + collationListBuilder.add(leftFieldCollation); + leftCollationListBuilder.add(leftFieldCollation); + } + for (int rightPos : joinLeafPredInfo.getProjsFromRightPartOfJoinKeysInJoinSchema()) { + final RelFieldCollation rightFieldCollation = new RelFieldCollation(rightPos); + collationListBuilder.add(rightFieldCollation); + rightCollationListBuilder.add(rightFieldCollation); + } + } + + // Return join collations + final ImmutableList collation; + switch (join.getJoinAlgorithm()) { + case SMB_JOIN: + case COMMON_JOIN: + collation = ImmutableList.of( + RelCollationTraitDef.INSTANCE.canonize( + new HiveRelCollation(collationListBuilder.build()))); + break; + case BUCKET_JOIN: + case MAP_JOIN: + // Keep order from the streaming relation + if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { + collation = ImmutableList.of( + RelCollationTraitDef.INSTANCE.canonize( + new HiveRelCollation(leftCollationListBuilder.build()))); + } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { + collation = ImmutableList.of( + RelCollationTraitDef.INSTANCE.canonize( + new HiveRelCollation(rightCollationListBuilder.build()))); + } else { + collation = null; + } + break; + default: + collation = null; + } + return collation; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java new file mode 100644 index 0000000..207f402 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdMemory.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.stats; + +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMdMemory; +import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.JoinAlgorithm; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.MapJoinStreamingRelation; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveLimit; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; + +public class HiveRelMdMemory extends RelMdMemory { + + private static final HiveRelMdMemory INSTANCE = new HiveRelMdMemory(); + + public static final RelMetadataProvider SOURCE = + ReflectiveRelMetadataProvider.reflectiveSource(INSTANCE, + BuiltInMethod.MEMORY.method, + BuiltInMethod.CUMULATIVE_MEMORY_WITHIN_PHASE.method, + BuiltInMethod.CUMULATIVE_MEMORY_WITHIN_PHASE_SPLIT.method); + + //~ Constructors ----------------------------------------------------------- + + private HiveRelMdMemory() {} + + //~ Methods ---------------------------------------------------------------- + + public Double memory(HiveTableScan tableScan) { + return 0.0d; + } + + public Double memory(HiveAggregate aggregate) { + final Double avgRowSize = RelMetadataQuery.getAverageRowSize(aggregate.getInput()); + final Double rowCount = RelMetadataQuery.getRowCount(aggregate.getInput()); + if (avgRowSize == null || rowCount == null) { + return null; + } + return avgRowSize * rowCount; + } + + public Double memory(HiveFilter filter) { + return 0.0; + } + + public Double memory(HiveJoin join) { + Double memory = 0.0; + if (join.getJoinAlgorithm() == JoinAlgorithm.COMMON_JOIN) { + // Left side + final Double leftAvgRowSize = RelMetadataQuery.getAverageRowSize(join.getLeft()); + final Double leftRowCount = RelMetadataQuery.getRowCount(join.getLeft()); + if (leftAvgRowSize == null || leftRowCount == null) { + return null; + } + memory += leftAvgRowSize * leftRowCount; + // Right side + final Double rightAvgRowSize = RelMetadataQuery.getAverageRowSize(join.getRight()); + final Double rightRowCount = RelMetadataQuery.getRowCount(join.getRight()); + if (rightAvgRowSize == null || rightRowCount == null) { + return null; + } + memory += rightAvgRowSize * rightRowCount; + } else if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN || + join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN) { + RelNode inMemoryInput; + if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { + inMemoryInput = join.getRight(); + } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { + inMemoryInput = join.getLeft(); + } else { + return null; + } + // Result + final Double avgRowSize = RelMetadataQuery.getAverageRowSize(inMemoryInput); + final Double rowCount = RelMetadataQuery.getRowCount(inMemoryInput); + if (avgRowSize == null || rowCount == null) { + return null; + } + memory = avgRowSize * rowCount; + } + return memory; + } + + public Double cumulativeMemoryWithinPhaseSplit(HiveJoin join) { + if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN || + join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN) { + // Check streaming side + RelNode inMemoryInput; + if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.LEFT_RELATION) { + inMemoryInput = join.getRight(); + } else if (join.getMapJoinStreamingSide() == MapJoinStreamingRelation.RIGHT_RELATION) { + inMemoryInput = join.getLeft(); + } else { + return null; + } + + if (join.getJoinAlgorithm() == JoinAlgorithm.MAP_JOIN) { + // If simple map join, the whole relation goes in memory + return RelMetadataQuery.cumulativeMemoryWithinPhase(inMemoryInput); + } + else if (join.getJoinAlgorithm() == JoinAlgorithm.BUCKET_JOIN) { + // If bucket map join, only a split goes in memory + final Double memoryInput = + RelMetadataQuery.cumulativeMemoryWithinPhase(inMemoryInput); + final Integer splitCount = RelMetadataQuery.splitCount(inMemoryInput); + if (memoryInput == null || splitCount == null) { + return null; + } + return memoryInput / splitCount; + } + } + // Else, we fall back to default + return super.cumulativeMemoryWithinPhaseSplit(join); + } + + public Double memory(HiveLimit limit) { + return 0.0; + } + + public Double memory(HiveProject project) { + return 0.0; + } + + public Double memory(HiveSort sort) { + if (sort.getCollation() != RelCollations.EMPTY) { + // It sorts + final Double avgRowSize = RelMetadataQuery.getAverageRowSize(sort.getInput()); + final Double rowCount = RelMetadataQuery.getRowCount(sort.getInput()); + if (avgRowSize == null || rowCount == null) { + return null; + } + return avgRowSize * rowCount; + } + // It does not sort, memory footprint is zero + return 0.0; + } + + public Double memory(HiveUnion union) { + return 0.0; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java new file mode 100644 index 0000000..c4663e3 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdParallelism.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.stats; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMdParallelism; +import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin.JoinAlgorithm; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSort; + +public class HiveRelMdParallelism extends RelMdParallelism { + + private final Double maxSplitSize; + + //~ Constructors ----------------------------------------------------------- + + public HiveRelMdParallelism(Double maxSplitSize) { + this.maxSplitSize = maxSplitSize; + } + + public RelMetadataProvider getMetadataProvider() { + return ReflectiveRelMetadataProvider.reflectiveSource(this, + BuiltInMethod.IS_PHASE_TRANSITION.method, + BuiltInMethod.SPLIT_COUNT.method); + } + + //~ Methods ---------------------------------------------------------------- + + public Boolean isPhaseTransition(HiveJoin join) { + // As Exchange operator is introduced later on, we make a + // common join operator create a new stage for the moment + if (join.getJoinAlgorithm() == JoinAlgorithm.COMMON_JOIN) { + return true; + } + return false; + } + + public Boolean isPhaseTransition(HiveSort sort) { + // As Exchange operator is introduced later on, we make a + // sort operator create a new stage for the moment + return true; + } + + public Integer splitCount(RelNode rel) { + Boolean newPhase = RelMetadataQuery.isPhaseTransition(rel); + + if (newPhase == null) { + return null; + } + + if (newPhase) { + // We repartition: new number of splits + final Double averageRowSize = RelMetadataQuery.getAverageRowSize(rel); + final Double rowCount = RelMetadataQuery.getRowCount(rel); + if (averageRowSize == null || rowCount == null) { + return null; + } + final Double totalSize = averageRowSize * rowCount; + final Double splitCount = totalSize / maxSplitSize; + return splitCount.intValue(); + } + + // We do not repartition: take number of splits from children + Integer splitCount = 0; + for (RelNode input : rel.getInputs()) { + splitCount += RelMetadataQuery.splitCount(input); + } + return splitCount; + } + +} + +// End RelMdParallelism.java \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdSize.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdSize.java new file mode 100644 index 0000000..d4ae2ef --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/stats/HiveRelMdSize.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.stats; + +import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMdSize; +import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.util.BuiltInMethod; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class HiveRelMdSize extends RelMdSize { + + protected static final Log LOG = LogFactory.getLog(HiveRelMdSize.class.getName()); + + private static final HiveRelMdSize INSTANCE = new HiveRelMdSize(); + + public static final RelMetadataProvider SOURCE = + ReflectiveRelMetadataProvider.reflectiveSource(INSTANCE, + BuiltInMethod.AVERAGE_COLUMN_SIZES.method, + BuiltInMethod.AVERAGE_ROW_SIZE.method); + + //~ Constructors ----------------------------------------------------------- + + private HiveRelMdSize() {} + + //~ Methods ---------------------------------------------------------------- + + public Double averageTypeValueSize(RelDataType type) { + switch (type.getSqlTypeName()) { + case BOOLEAN: + case TINYINT: + return 1d; + case SMALLINT: + return 2d; + case INTEGER: + case FLOAT: + case REAL: + case DATE: + case TIME: + return 4d; + case BIGINT: + case DOUBLE: + case TIMESTAMP: + case INTERVAL_DAY_TIME: + case INTERVAL_YEAR_MONTH: + return 8d; + case BINARY: + return (double) type.getPrecision(); + case VARBINARY: + return Math.min((double) type.getPrecision(), 100d); + case CHAR: + return (double) type.getPrecision() * BYTES_PER_CHARACTER; + case VARCHAR: + // Even in large (say VARCHAR(2000)) columns most strings are small + return Math.min((double) type.getPrecision() * BYTES_PER_CHARACTER, 100d); + case ROW: + Double average = 0.0; + for (RelDataTypeField field : type.getFieldList()) { + average += averageTypeValueSize(field.getType()); + } + return average; + default: + return null; + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 5e5a792..986213d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -116,6 +116,7 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveConfigContext; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveDefaultRelMetadataProvider; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTypeSystemImpl; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; @@ -176,7 +177,8 @@ import com.google.common.collect.Lists; public class CalcitePlanner extends SemanticAnalyzer { - private final AtomicInteger noColsMissingStats = new AtomicInteger(0); + + private final AtomicInteger noColsMissingStats = new AtomicInteger(0); private List topLevelFieldSchema; private SemanticException semanticException; private boolean runCBO = true; @@ -731,7 +733,8 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu /* * recreate cluster, so that it picks up the additional traitDef */ - RelOptPlanner planner = HiveVolcanoPlanner.createPlanner(); + HiveConfigContext confContext = new HiveConfigContext(conf); + RelOptPlanner planner = HiveVolcanoPlanner.createPlanner(confContext); final RelOptQuery query = new RelOptQuery(planner); final RexBuilder rexBuilder = cluster.getRexBuilder(); cluster = query.createCluster(rexBuilder.getTypeFactory(), rexBuilder); @@ -750,13 +753,16 @@ public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlu throw new RuntimeException(e); } + // Create MD provider + HiveDefaultRelMetadataProvider mdProvider = new HiveDefaultRelMetadataProvider(conf); + // 2. Apply Pre Join Order optimizations calcitePreCboPlan = applyPreJoinOrderingTransforms(calciteGenPlan, - HiveDefaultRelMetadataProvider.INSTANCE); + mdProvider.getMetadataProvider()); // 3. Appy Join Order Optimizations using Hep Planner (MST Algorithm) List list = Lists.newArrayList(); - list.add(HiveDefaultRelMetadataProvider.INSTANCE); + list.add(mdProvider.getMetadataProvider()); RelTraitSet desiredTraits = cluster .traitSetOf(HiveRelNode.CONVENTION, RelCollations.EMPTY); @@ -832,7 +838,9 @@ private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProv SemiJoinFilterTransposeRule.INSTANCE, SemiJoinProjectTransposeRule.INSTANCE); // 2. Add not null filters - basePlan = hepPlan(basePlan, true, mdProvider, HiveJoinAddNotNullRule.INSTANCE); + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { + basePlan = hepPlan(basePlan, true, mdProvider, HiveJoinAddNotNullRule.INSTANCE); + } // 3. PPD basePlan = hepPlan(basePlan, true, mdProvider, @@ -1290,11 +1298,13 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc // 4. Build RelOptAbstractTable String fullyQualifiedTabName = tabMetaData.getDbName(); if (fullyQualifiedTabName != null && !fullyQualifiedTabName.isEmpty()) { - fullyQualifiedTabName = fullyQualifiedTabName + "." + tabMetaData.getTableName() - + "." + tableAlias; + fullyQualifiedTabName = fullyQualifiedTabName + "." + tabMetaData.getTableName(); } else { - fullyQualifiedTabName = tabMetaData.getTableName() + "." + tableAlias; + fullyQualifiedTabName = tabMetaData.getTableName(); + } + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { + fullyQualifiedTabName += "." + tableAlias; } RelOptHiveTable optTable = new RelOptHiveTable(relOptSchema, fullyQualifiedTabName, tableAlias, rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, @@ -2821,4 +2831,5 @@ private QBParseInfo getQBParseInfo(QB qb) throws CalciteSemanticException { return tabAliases; } } + }