diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0e4f1f6..c83d735 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2847,6 +2847,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal true, "Allows hive server 2 to send progress bar update information. This is currently available" + " only if the execution engine is tez."), + + SPARK_USE_OP_STATS("hive.spark.use.op.stats", true, + "Whether to use operator stats to determine reducer parallelism for Hive on Spark. " + + "If this is false, Hive will use source table stats to determine the reducer " + + "parallelism for all first level reduce tasks, and the maximum reducer parallelism" + + "from all parents for all the rest (second level and onward) reducer tasks."), SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true, "Updates spark job execution progress in-place in the terminal."), TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index 7a5b71f..6151677 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.optimizer.spark; import java.util.List; +import java.util.HashSet; +import java.util.Set; import java.util.Stack; import org.slf4j.Logger; @@ -29,7 +31,9 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; @@ -57,6 +61,12 @@ // Spark memory per task, and total number of cores private ObjectPair sparkMemoryAndCores; + private final boolean useOpStats; + + public SetSparkReducerParallelism(HiveConf conf) { + sparkMemoryAndCores = null; + useOpStats = conf.getBoolVar(HiveConf.ConfVars.SPARK_USE_OP_STATS); + } @Override public Object process(Node nd, Stack stack, @@ -67,16 +77,28 @@ public Object process(Node nd, Stack stack, ReduceSinkOperator sink = (ReduceSinkOperator) nd; ReduceSinkDesc desc = sink.getConf(); + Set parentSinks = null; int maxReducers = context.getConf().getIntVar(HiveConf.ConfVars.MAXREDUCERS); int constantReducers = context.getConf().getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS); + if (!useOpStats) { + parentSinks = OperatorUtils.findOperatorsUpstream(sink, ReduceSinkOperator.class); + parentSinks.remove(sink); + if (!context.getVisitedReduceSinks().containsAll(parentSinks)) { + // We haven't processed all the parent sinks, and we need + // them to be done in order to compute the parallelism for this sink. + // In this case, skip. We should visit this again from another path. + LOG.debug("Skipping sink " + sink + " for now as we haven't seen all its parents."); + return false; + } + } + if (context.getVisitedReduceSinks().contains(sink)) { // skip walking the children LOG.debug("Already processed reduce sink: " + sink.getName()); return true; } - context.getVisitedReduceSinks().add(sink); if (needSetParallelism(sink, context.getConf())) { @@ -96,18 +118,44 @@ public Object process(Node nd, Stack stack, return false; } } + long numberOfBytes = 0; - // we need to add up all the estimates from the siblings of this reduce sink - for (Operator sibling - : sink.getChildOperators().get(0).getParentOperators()) { - if (sibling.getStatistics() != null) { - numberOfBytes += sibling.getStatistics().getDataSize(); - if (LOG.isDebugEnabled()) { - LOG.debug("Sibling " + sibling + " has stats: " + sibling.getStatistics()); + if (useOpStats) { + // we need to add up all the estimates from the siblings of this reduce sink + for (Operator sibling + : sink.getChildOperators().get(0).getParentOperators()) { + if (sibling.getStatistics() != null) { + numberOfBytes += sibling.getStatistics().getDataSize(); + if (LOG.isDebugEnabled()) { + LOG.debug("Sibling " + sibling + " has stats: " + sibling.getStatistics()); + } + } else { + LOG.warn("No stats available from: " + sibling); } + } + } else { + if (parentSinks.isEmpty()) { + // This is the first sink in the path, meaning that we should use TS stats + // to infer parallelism + Set sources = + OperatorUtils.findOperatorsUpstream(sink, TableScanOperator.class); + for (TableScanOperator source : sources) { + numberOfBytes += source.getStatistics().getDataSize(); + } + LOG.debug("Gathered stats for sink " + sink + ". Total size is " + + numberOfBytes + " bytes."); + } else { - LOG.warn("No stats available from: " + sibling); + // Use the maximum parallelism from all parent reduce sinks + int numberOfReducers = 0; + for (ReduceSinkOperator parent : parentSinks) { + numberOfReducers = Math.max(numberOfReducers, parent.getConf().getNumReducers()); + } + desc.setNumReducers(numberOfReducers); + LOG.debug("Set parallelism for sink " + sink + " to " + numberOfReducers + + " based on its parents"); + return false; } } @@ -134,7 +182,7 @@ public Object process(Node nd, Stack stack, desc.setNumReducers(numReducers); } } else { - LOG.info("Number of reducers determined to be: " + desc.getNumReducers()); + LOG.info("Number of reducers for sink " + sink + " was already determined to be: " + desc.getNumReducers()); } return false; @@ -165,6 +213,9 @@ private boolean needSetParallelism(ReduceSinkOperator reduceSink, HiveConf hiveC } private void getSparkMemoryAndCores(OptimizeSparkProcContext context) throws SemanticException { + if (sparkMemoryAndCores != null) { + return; + } if (context.getConf().getBoolean(SPARK_DYNAMIC_ALLOCATION_ENABLED, false)) { // If dynamic allocation is enabled, numbers for memory and cores are meaningless. So, we don't // try to get it. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 7b2b3c0..68f3ae4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -101,21 +101,21 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root reduceWork.setReducer(root); reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork)); - // All parents should be reduce sinks. We pick the one we just walked - // to choose the number of reducers. In the join/union case they will - // all be -1. In sort/order case where it matters there will be only - // one parent. - Preconditions.checkArgument(context.parentOfRoot instanceof ReduceSinkOperator, - "AssertionError: expected context.parentOfRoot to be an instance of ReduceSinkOperator, but was " - + context.parentOfRoot.getClass().getName()); - ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot; - - reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); + // Pick the maximum # reducers across all parents as the # of reduce tasks. + int maxExecutors = -1; + for (Operator parentOfRoot : root.getParentOperators()) { + Preconditions.checkArgument(parentOfRoot instanceof ReduceSinkOperator, + "AssertionError: expected parentOfRoot to be an " + + "instance of ReduceSinkOperator, but was " + + parentOfRoot.getClass().getName()); + ReduceSinkOperator reduceSink = (ReduceSinkOperator) parentOfRoot; + maxExecutors = Math.max(maxExecutors, reduceSink.getConf().getNumReducers()); + } + reduceWork.setNumReduceTasks(maxExecutors); + ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot; setupReduceSink(context, reduceWork, reduceSink); - sparkWork.add(reduceWork); - SparkEdgeProperty edgeProp = getEdgeProperty(reduceSink, reduceWork); sparkWork.connect(context.preceedingWork, reduceWork, edgeProp); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 71528e8..ffde87e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.CompositeProcessor; -import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.ForwardWalker; @@ -55,6 +54,7 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.lib.TypeRule; @@ -270,7 +270,7 @@ private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws Seman Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("Set parallelism - ReduceSink", ReduceSinkOperator.getOperatorName() + "%"), - new SetSparkReducerParallelism()); + new SetSparkReducerParallelism(pCtx.getConf())); opRules.put(new TypeRule(JoinOperator.class), new SparkJoinOptimizer(pCtx)); @@ -283,7 +283,7 @@ private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws Seman // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); - GraphWalker ogw = new DefaultGraphWalker(disp); + GraphWalker ogw = new PreOrderWalker(disp); // Create a list of topop nodes ArrayList topNodes = new ArrayList();