diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index f9ef474..d7f4f5c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.spark; +import java.util.List; import java.util.Stack; import org.apache.commons.logging.Log; @@ -26,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.LimitOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -75,7 +77,7 @@ public Object process(Node nd, Stack stack, context.getVisitedReduceSinks().add(sink); - if (desc.getNumReducers() <= 0) { + if (needSetParallelism(sink)) { if (constantReducers > 0) { LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers); desc.setNumReducers(constantReducers); @@ -158,4 +160,27 @@ public Object process(Node nd, Stack stack, return false; } + // tests whether the RS needs automatic setting parallelism + private boolean needSetParallelism(ReduceSinkOperator reduceSink) { + ReduceSinkDesc desc = reduceSink.getConf(); + if (desc.getNumReducers() <= 0) { + return true; + } + if (desc.getNumReducers() == 1 && desc.hasOrderBy()) { + List> children = reduceSink.getChildOperators(); + while (children != null && children.size() > 0) { + if (children.size() != 1 || children.get(0) instanceof LimitOperator) { + return false; + } + if (children.get(0) instanceof ReduceSinkOperator || + children.get(0) instanceof FileSinkOperator) { + break; + } + children = children.get(0).getChildOperators(); + } + return true; + } + return false; + } + }