diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index e4cb6f3..5da0646 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -109,7 +109,7 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root edgeProp.setShuffleGroup(); } String sortOrder = Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim(); - if (!sortOrder.isEmpty()) { + if (!sortOrder.isEmpty() && isSortNecessary(reduceSink)) { edgeProp.setShuffleSort(); } @@ -300,4 +300,26 @@ public void processFileSink(GenSparkProcContext context, FileSinkOperator fileSi } } } + + /** + * Test if the sort order in the RS is necessary. + * Unnecessary sort is mainly introduced when GBY is created. Therefore, if the sorting + * keys, partitioning keys and grouping keys are the same, we ignore the sort and use + * GroupByShuffler to shuffle the data. In this case a group-by transformation should be + * sufficient to produce the correct results, i.e. data is properly grouped by the keys + * but keys are not guaranteed to be sorted. + */ + public static boolean isSortNecessary(ReduceSinkOperator reduceSinkOperator) { + List> children = reduceSinkOperator.getChildOperators(); + if (children != null && children.size() == 1 && + children.get(0) instanceof GroupByOperator) { + GroupByOperator child = (GroupByOperator) children.get(0); + if (reduceSinkOperator.getConf().getKeyCols().equals( + reduceSinkOperator.getConf().getPartitionCols()) && + reduceSinkOperator.getConf().getKeyCols().size() == child.getConf().getKeys().size()) { + return false; + } + } + return true; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index 7c76c20..d40345f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -277,7 +277,7 @@ public Object process(Node nd, Stack stack, edgeProp.setShuffleGroup(); } String sortOrder = Strings.nullToEmpty(rs.getConf().getOrder()).trim(); - if (!sortOrder.isEmpty()) { + if (!sortOrder.isEmpty() && GenSparkUtils.isSortNecessary(rs)) { edgeProp.setShuffleSort(); } sparkWork.connect(work, rWork, edgeProp);