commit 22b4f1a1764c95a396711713550fd44e847c7e3d Author: kellyzly Date: Wed Jun 21 06:27:40 2017 +0800 HIVE-16840.patch diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 92741ee..89d63ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -62,6 +62,8 @@ public class ReduceSinkOperator extends TerminalOperator implements Serializable, TopNHash.BinaryCollector { + private boolean sortLimit; + /** * Counters. */ @@ -605,4 +607,13 @@ public String getReduceOutputName() { public void setOutputCollector(OutputCollector _out) { this.out = _out; } + + + public boolean getSortLimit() { + return sortLimit; + } + + public void setSortLimit(boolean sortLimit) { + this.sortLimit = sortLimit; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index 4924df7..cde4ba5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.spark; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.Stack; @@ -34,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; @@ -189,13 +191,63 @@ public Object process(Node nd, Stack stack, desc.setNumReducers(numReducers); } } else { + LimitOperator limit = getLimitAfterRS(sink); + if (limit != null) { + // RS-LIMIT -> SPARKSORTREDUCESINK-NEWLIMIT-RS-LIMIT + // SparkSortReduceSinkOperator sparkSortRS = new SparkSortReduceSinkOperator(sink); + ReduceSinkOperator sparkSortRS = new ReduceSinkOperator(sink.getCompilationOpContext()); + sparkSortRS.setSortLimit(true); + ReduceSinkDesc sparkSortRSDesc = (ReduceSinkDesc) sink.getConf().clone(); + sparkSortRS.setConf(sparkSortRSDesc); + sparkSortRS.getConf().setNumReducers(getNumReducersForSparkSortRS()); + SelectOperator sel = (SelectOperator) limit.getParentOperators().get(0); + SelectOperator newSel = new SelectOperator(sparkSortRS.getCompilationOpContext()); + newSel.setConf(sel.getConf()); + LimitOperator newLimit = null; + newLimit = new LimitOperator(limit.getCompilationOpContext()); + newLimit.setConf(limit.getConf()); + sparkSortRS.setChildOperators(Utilities.makeList(newSel)); + //remove sink from sink.getParentOperators,and add sparkSortRS as their child + Operator[] newParent = new Operator[sink.getParentOperators().size()]; + int i = 0; + for (Operator parent : sink.getParentOperators()) { + parent.replaceChild(sink, sparkSortRS); + newParent[i++] = parent; + } + + sparkSortRS.setParentOperators(Utilities.makeList(newParent)); + sparkSortRS.setChildOperators(Utilities.makeList(newSel)); + // the parent of newSel is sparkSortRS + newSel.setParentOperators(Utilities.makeList(sparkSortRS)); + // the child of newSel is newLimit + newSel.setChildOperators(Utilities.makeList(newLimit)); + // the parent of newLimit is newSel + newLimit.setParentOperators(Utilities.makeList(newSel)); + //the child of newLimit is RS + newLimit.setChildOperators(Utilities.makeList(sink)); + sink.setParentOperators(Utilities.makeList(newLimit)); + } LOG.info("Number of reducers for sink " + sink + " was already determined to be: " + desc.getNumReducers()); } return false; } - // tests whether the RS needs automatic setting parallelism + private LimitOperator getLimitAfterRS(ReduceSinkOperator sink) { + ArrayList> limitList = new ArrayList(); + SparkUtilities.collectOp(limitList,sink,LimitOperator.class); + if( limitList.size()>0){ + return (LimitOperator)limitList.get(0); + }else{ + return null; + } + } + + private int getNumReducersForSparkSortRS() { + return 10; + } + + // tests whether the RS needs automatic setting parallelism private boolean needSetParallelism(ReduceSinkOperator reduceSink, HiveConf hiveConf) { ReduceSinkDesc desc = reduceSink.getConf(); if (desc.getNumReducers() <= 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index d0a82af..f229984 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -458,6 +458,9 @@ public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink, } } + if (reduceSink.getSortLimit()) { + edgeProperty.setMRShuffle(); + } // set to groupby-shuffle if it's still NONE // simple distribute-by goes here if (edgeProperty.isShuffleNone()) {