diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index c19bc21..853f246 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; @@ -113,7 +114,8 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root sparkWork.add(reduceWork); - SparkEdgeProperty edgeProp = getEdgeProperty(reduceSink, reduceWork); + SparkEdgeProperty edgeProp = getEdgeProperty(reduceSink, reduceWork, + context.parseContext.getQueryProperties()); sparkWork.connect(context.preceedingWork, reduceWork, edgeProp); @@ -329,7 +331,7 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi } public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink, - ReduceWork reduceWork) throws SemanticException { + ReduceWork reduceWork, QueryProperties queryProperties) throws SemanticException { SparkEdgeProperty edgeProperty = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE); edgeProperty.setNumPartitions(reduceWork.getNumReduceTasks()); String sortOrder = Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim(); @@ -361,13 +363,11 @@ public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink, } } - // test if we need total order, if so, we can either use MR shuffle + set #reducer to 1, - // or we can use SHUFFLE_SORT + // test if we need partition/global order, SHUFFLE_SORT should only be used for global order if (edgeProperty.isShuffleNone() && !sortOrder.isEmpty()) { - if (reduceSink.getConf().getPartitionCols() == null - || reduceSink.getConf().getPartitionCols().isEmpty() - || isSame(reduceSink.getConf().getPartitionCols(), - reduceSink.getConf().getKeyCols())) { + if ((reduceSink.getConf().getPartitionCols() == null + || reduceSink.getConf().getPartitionCols().isEmpty()) + && queryProperties.hasOrderBy()) { edgeProperty.setShuffleSort(); } else { edgeProperty.setMRShuffle(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index 3dd6d92..9d5e99f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -217,7 +217,8 @@ public Object process(Node nd, Stack stack, "AssertionError: expected operator to be a ReduceSinkOperator, but was " + parent.getClass().getName()); ReduceSinkOperator rsOp = (ReduceSinkOperator) parent; - SparkEdgeProperty edgeProp = GenSparkUtils.getEdgeProperty(rsOp, reduceWork); + SparkEdgeProperty edgeProp = GenSparkUtils.getEdgeProperty(rsOp, reduceWork, + context.parseContext.getQueryProperties()); rsOp.getConf().setOutputName(reduceWork.getName()); GenMapRedUtils.setKeyAndValueDesc(reduceWork, rsOp);