diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 53b9b0c..d1acd41 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2886,6 +2886,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.tez.exec.inplace.progress", true, "Updates tez job execution progress in-place in the terminal."), + + SPARK_USE_OP_STATS("hive.spark.use.op.stats", false, + "Whether to use operator stats for Hive on Spark. If this is false, Hive will only use " + + "source table stats to determine map-join and reducer parallelism (similar to MR)"), SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true, "Updates spark job execution progress in-place in the terminal."), TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f, diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index 7a5b71f..71cb4de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.optimizer.spark; import java.util.List; +import java.util.HashSet; +import java.util.Set; import java.util.Stack; import org.slf4j.Logger; @@ -29,7 +31,9 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; @@ -57,6 +61,12 @@ // Spark memory per task, and total number of cores private ObjectPair sparkMemoryAndCores; + private final boolean useOpStats; + + public SetSparkReducerParallelism(HiveConf conf) { + sparkMemoryAndCores = null; + useOpStats = conf.getBoolVar(HiveConf.ConfVars.SPARK_USE_OP_STATS); + } @Override public Object process(Node nd, Stack stack, @@ -67,16 +77,28 @@ public Object process(Node nd, Stack stack, ReduceSinkOperator sink = (ReduceSinkOperator) nd; ReduceSinkDesc desc = sink.getConf(); + Set parentSinks = null; int maxReducers = context.getConf().getIntVar(HiveConf.ConfVars.MAXREDUCERS); int constantReducers = context.getConf().getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS); + if (!useOpStats) { + parentSinks = OperatorUtils.findOperatorsUpstream(sink, ReduceSinkOperator.class); + parentSinks.remove(sink); + if (!context.getVisitedReduceSinks().containsAll(parentSinks)) { + // We haven't processed all the parent sinks, and we need + // them to be done in order to compute the parallelism for this sink. + // In this case, skip. We should visit this again from another path. + LOG.debug("Skipping sink " + sink + " for now as we haven't seen all its parents."); + return false; + } + } + if (context.getVisitedReduceSinks().contains(sink)) { // skip walking the children LOG.debug("Already processed reduce sink: " + sink.getName()); return true; } - context.getVisitedReduceSinks().add(sink); if (needSetParallelism(sink, context.getConf())) { @@ -96,18 +118,39 @@ public Object process(Node nd, Stack stack, return false; } } + long numberOfBytes = 0; - // we need to add up all the estimates from the siblings of this reduce sink - for (Operator sibling - : sink.getChildOperators().get(0).getParentOperators()) { - if (sibling.getStatistics() != null) { - numberOfBytes += sibling.getStatistics().getDataSize(); - if (LOG.isDebugEnabled()) { - LOG.debug("Sibling " + sibling + " has stats: " + sibling.getStatistics()); + if (!useOpStats) { + if (parentSinks.isEmpty()) { + // This is the first sink in the path, meaning that we should use TS stats to infer + // its parallelism + Set sources = OperatorUtils.findOperatorsUpstream(sink, TableScanOperator.class); + for (TableScanOperator source : sources) { + numberOfBytes += source.getStatistics().getDataSize(); } + LOG.debug("Gathered data size for sink " + sink + ". Total size is " + numberOfBytes + " bytes."); } else { - LOG.warn("No stats available from: " + sibling); + int numberOfReducers = 0; + for (ReduceSinkOperator parent : parentSinks) { + numberOfReducers = Math.max(numberOfReducers, parent.getConf().getNumReducers()); + } + desc.setNumReducers(numberOfReducers); + LOG.debug("Set parallelism for sink " + sink + " to " + numberOfReducers + " based on its parents"); + return false; + } + } else { + // we need to add up all the estimates from the siblings of this reduce sink + for (Operator sibling + : sink.getChildOperators().get(0).getParentOperators()) { + if (sibling.getStatistics() != null) { + numberOfBytes += sibling.getStatistics().getDataSize(); + if (LOG.isDebugEnabled()) { + LOG.debug("Sibling " + sibling + " has stats: " + sibling.getStatistics()); + } + } else { + LOG.warn("No stats available from: " + sibling); + } } } @@ -134,7 +177,7 @@ public Object process(Node nd, Stack stack, desc.setNumReducers(numReducers); } } else { - LOG.info("Number of reducers determined to be: " + desc.getNumReducers()); + LOG.info("Number of reducers for sink " + sink + " was already determined to be: " + desc.getNumReducers()); } return false; @@ -165,6 +208,9 @@ private boolean needSetParallelism(ReduceSinkOperator reduceSink, HiveConf hiveC } private void getSparkMemoryAndCores(OptimizeSparkProcContext context) throws SemanticException { + if (sparkMemoryAndCores != null) { + return; + } if (context.getConf().getBoolean(SPARK_DYNAMIC_ALLOCATION_ENABLED, false)) { // If dynamic allocation is enabled, numbers for memory and cores are meaningless. So, we don't // try to get it. diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 71528e8..64a9703 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.CompositeProcessor; -import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.ForwardWalker; @@ -55,6 +54,7 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.lib.TypeRule; @@ -269,8 +269,8 @@ private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws Seman ParseContext pCtx = procCtx.getParseContext(); Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("Set parallelism - ReduceSink", - ReduceSinkOperator.getOperatorName() + "%"), - new SetSparkReducerParallelism()); + ReduceSinkOperator.getOperatorName() + "%"), + new SetSparkReducerParallelism(pCtx.getConf())); opRules.put(new TypeRule(JoinOperator.class), new SparkJoinOptimizer(pCtx)); @@ -283,7 +283,7 @@ private void runJoinOptimizations(OptimizeSparkProcContext procCtx) throws Seman // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); - GraphWalker ogw = new DefaultGraphWalker(disp); + GraphWalker ogw = new PreOrderWalker(disp); // Create a list of topop nodes ArrayList topNodes = new ArrayList();