diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b4e89b0..51d2580 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3152,6 +3152,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + SPARK_USE_TS_STATS("hive.spark.use.ts.stats", true, + "If this is set to true, mapjoin optimization in Hive/Spark will use the statistics associated" + + "with TableScan operator on the root of operator tree, instead of populated stats"), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), "Timeout for requests from Hive client to remote Spark driver."), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index a8ed74c..074b60a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -330,4 +330,38 @@ public static int countOperatorsUpstream(Operator start, Set op, Collection> roots) { + List> parents = op.getParentOperators(); + if (parents == null || parents.isEmpty()) { + roots.add(op); + return; + } + for (Operator p : parents) { + findRoots(p, roots); + } + } + + /** + * Remove the branch that contains the specified operator. Do nothing if there's no branching, + * i.e. all the upstream operators have only one child. + */ + public static void removeBranch(Operator op) { + Operator child = op; + Operator curr = op; + + while (curr.getChildOperators().size() <= 1) { + child = curr; + if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) { + return; + } + curr = curr.getParentOperators().get(0); + } + + curr.removeChild(child); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java index c41a0c8..26a1088 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java @@ -20,6 +20,7 @@ import java.util.Stack; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -28,7 +29,6 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils; import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; @@ -54,7 +54,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, if (desc.getStatistics().getDataSize() > context.getConf() .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) { - GenSparkUtils.removeBranch(op); + OperatorUtils.removeBranch(op); // at this point we've found the fork in the op pipeline that has the pruning as a child plan. LOG.info("Disabling dynamic pruning for: " + desc.getTableScan().getName() diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index 7faff88..79e04a4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -19,11 +19,15 @@ package org.apache.hadoop.hive.ql.optimizer.spark; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Stack; +import com.clearspring.analytics.util.Preconditions; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -195,8 +199,22 @@ private int convertJoinBucketMapJoin(JoinOperator joinOp, MapJoinOperator mapJoi boolean bigTableFound = false; for (Operator parentOp : joinOp.getParentOperators()) { + Statistics currInputStat; + if (context.getConf().getBoolean(HiveConf.ConfVars.SPARK_USE_TS_STATS.varname, false)) { + currInputStat = new Statistics(); + Set> roots = new HashSet<>(); + OperatorUtils.findRoots(parentOp, roots); + + // Find all root TS ops and add data size to the new Statistics object created above + // Not adding other stats (e.g., # of rows, col stats) since only data size is used here + for (Operator root : roots) { + Preconditions.checkArgument(root instanceof TableScanOperator); + currInputStat.addToDataSize(root.getStatistics().getDataSize()); + } + } else { + currInputStat = parentOp.getStatistics(); + } - Statistics currInputStat = parentOp.getStatistics(); if (currInputStat == null) { LOG.warn("Couldn't get statistics from: " + parentOp); return new long[]{-1, 0, 0}; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 7b2b3c0..36bde30 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; @@ -573,7 +574,7 @@ private static boolean hasGBYOperator(ReduceSinkOperator rs) { */ public BaseWork getEnclosingWork(Operator op, GenSparkProcContext procCtx) { List> ops = new ArrayList>(); - findRoots(op, ops); + OperatorUtils.findRoots(op, ops); for (Operator r : ops) { BaseWork work = procCtx.rootToWorkMap.get(r); if (work != null) { @@ -582,37 +583,4 @@ public BaseWork getEnclosingWork(Operator op, GenSparkProcContext procCtx) { } return null; } - - /* - * findRoots returns all root operators (in ops) that result in operator op - */ - private void findRoots(Operator op, List> ops) { - List> parents = op.getParentOperators(); - if (parents == null || parents.isEmpty()) { - ops.add(op); - return; - } - for (Operator p : parents) { - findRoots(p, ops); - } - } - - /** - * Remove the branch that contains the specified operator. Do nothing if there's no branching, - * i.e. all the upstream operators have only one child. - */ - public static void removeBranch(Operator op) { - Operator child = op; - Operator curr = op; - - while (curr.getChildOperators().size() <= 1) { - child = curr; - if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) { - return; - } - curr = curr.getParentOperators().get(0); - } - - curr.removeChild(child); - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 71528e8..c4b1640 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -169,7 +170,7 @@ private void removeDPPOperator(Set> component, OptimizeSparkProcCont return; } - GenSparkUtils.removeBranch(toRemove); + OperatorUtils.removeBranch(toRemove); // at this point we've found the fork in the op pipeline that has the pruning as a child plan. LOG.info("Disabling dynamic pruning for: " + toRemove.getConf().getTableScan().toString() + ". Needed to break cyclic dependency");