diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java index 404b759..8ac9ca7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java @@ -489,7 +489,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { CorrelationUtilities.replaceReduceSinkWithSelectOperator( cRS, dedupCtx.getPctx(), dedupCtx); - pRS.getConf().setEnforceSort(true); + pRS.getConf().setDeduplicated(true); return true; } return false; @@ -512,7 +512,7 @@ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { CorrelationUtilities.removeReduceSinkForGroupBy( cRS, cGBY, dedupCtx.getPctx(), dedupCtx); - pRS.getConf().setEnforceSort(true); + pRS.getConf().setDeduplicated(true); return true; } return false; @@ -535,7 +535,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup CorrelationUtilities.findPossibleParent( pJoin, ReduceSinkOperator.class, dedupCtx.trustScript()); if (pRS != null) { - pRS.getConf().setEnforceSort(true); + pRS.getConf().setDeduplicated(true); } return true; } @@ -559,7 +559,7 @@ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, CorrelationUtilities.findPossibleParent( pJoin, ReduceSinkOperator.class, dedupCtx.trustScript()); if (pRS != null) { - pRS.getConf().setEnforceSort(true); + pRS.getConf().setDeduplicated(true); } return true; } @@ -579,7 +579,7 @@ public Object process(ReduceSinkOperator cRS, ReduceSinkDeduplicateProcCtx dedup if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { CorrelationUtilities.replaceReduceSinkWithSelectOperator( cRS, dedupCtx.getPctx(), dedupCtx); - pRS.getConf().setEnforceSort(true); + pRS.getConf().setDeduplicated(true); return true; } return false; @@ -596,7 +596,7 @@ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, start, ReduceSinkOperator.class, dedupCtx.trustScript()); if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) { CorrelationUtilities.removeReduceSinkForGroupBy(cRS, cGBY, dedupCtx.getPctx(), dedupCtx); - pRS.getConf().setEnforceSort(true); + pRS.getConf().setDeduplicated(true); return true; } return false; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index f9ef474..5f9225c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.spark; +import java.util.List; import java.util.Stack; import org.apache.commons.logging.Log; @@ -26,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -75,7 +77,7 @@ public Object process(Node nd, Stack stack, context.getVisitedReduceSinks().add(sink); - if (desc.getNumReducers() <= 0) { + if (needSetParallelism(sink, context.getConf())) { if (constantReducers > 0) { LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers); desc.setNumReducers(constantReducers); @@ -158,4 +160,28 @@ public Object process(Node nd, Stack stack, return false; } + // tests whether the RS needs automatic setting parallelism + private boolean needSetParallelism(ReduceSinkOperator reduceSink, HiveConf hiveConf) { + ReduceSinkDesc desc = reduceSink.getConf(); + if (desc.getNumReducers() <= 0) { + return true; + } + if (desc.getNumReducers() == 1 && desc.hasOrderBy() && + hiveConf.getBoolVar(HiveConf.ConfVars.HIVESAMPLINGFORORDERBY) && !desc.isDeduplicated()) { + List> children = reduceSink.getChildOperators(); + while (children != null && children.size() > 0) { + if (children.size() != 1 || children.get(0) instanceof LimitOperator) { + return false; + } + if (children.get(0) instanceof ReduceSinkOperator || + children.get(0) instanceof FileSinkOperator) { + break; + } + children = children.get(0).getChildOperators(); + } + return true; + } + return false; + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index e27ce0d..7992c88 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -399,7 +399,7 @@ public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink, */ private static boolean groupByNeedParLevelOrder(ReduceSinkOperator reduceSinkOperator) { // whether we have to enforce sort anyway, e.g. in case of RS deduplication - if (reduceSinkOperator.getConf().isEnforceSort()) { + if (reduceSinkOperator.getConf().isDeduplicated()) { return true; } List> children = reduceSinkOperator.getChildOperators(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java index 1891dff..b4316ec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.List; -import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -114,8 +113,8 @@ private ReducerTraits(int trait) { // Write type, since this needs to calculate buckets differently for updates and deletes private AcidUtils.Operation writeType; - // whether we'll enforce the sort order of the RS - private transient boolean enforceSort = false; + // whether this RS is deduplicated + private transient boolean isDeduplicated = false; // used by spark mode to decide whether global order is needed private transient boolean hasOrderBy = false; @@ -174,7 +173,7 @@ public Object clone() { desc.setStatistics(this.getStatistics()); desc.setSkipTag(skipTag); desc.reduceTraits = reduceTraits.clone(); - desc.setEnforceSort(enforceSort); + desc.setDeduplicated(isDeduplicated); desc.setHasOrderBy(hasOrderBy); return desc; } @@ -434,12 +433,12 @@ public final void setReducerTraits(EnumSet traits) { return writeType; } - public boolean isEnforceSort() { - return enforceSort; + public boolean isDeduplicated() { + return isDeduplicated; } - public void setEnforceSort(boolean isDeduplicated) { - this.enforceSort = isDeduplicated; + public void setDeduplicated(boolean isDeduplicated) { + this.isDeduplicated = isDeduplicated; } public boolean hasOrderBy() { diff --git a/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out b/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out index 0194dbb..03314ea 100644 --- a/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out +++ b/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out @@ -38,7 +38,7 @@ STAGE PLANS: Stage: Stage-1 Spark Edges: - Reducer 2 <- Map 1 (SORT, 1) + Reducer 2 <- Map 1 (SORT, 4) #### A masked pattern was here #### Vertices: Map 1 @@ -117,7 +117,7 @@ Retention: 0 Table Type: MANAGED_TABLE Table Parameters: COLUMN_STATS_ACCURATE true - numFiles 1 + numFiles 4 numRows 48 rawDataSize 512 totalSize 560 @@ -231,7 +231,7 @@ Retention: 0 Table Type: MANAGED_TABLE Table Parameters: COLUMN_STATS_ACCURATE true - numFiles 1 + numFiles 4 numRows 48 rawDataSize 512 totalSize 560