diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 45f8075..8c1b820 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -196,6 +197,8 @@ private MapTran generate(MapWork mw) throws IOException { } private SparkShuffler generate(SparkEdgeProperty edge) { + Preconditions.checkArgument(!edge.isShuffleNone(), + "AssertionError: SHUFFLE_NONE should only be used for UnionWork."); if (edge.isShuffleSort()) { return new SortByShuffler(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index a89c419..91022a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -102,12 +102,10 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root sparkWork.add(reduceWork); - SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE, + // Use group-by as the default shuffler + SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_GROUP, reduceWork.getNumReduceTasks()); - if (root instanceof GroupByOperator) { - edgeProp.setShuffleGroup(); - } String sortOrder = Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim(); if (!sortOrder.isEmpty() && isSortNecessary(reduceSink)) { edgeProp.setShuffleSort(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index d40345f..864965e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -225,7 +225,7 @@ public Object process(Node nd, Stack stack, // finally hook everything up LOG.debug("Connecting union work ("+unionWork+") with work ("+work+")"); - SparkEdgeProperty edgeProp = new SparkEdgeProperty(0/*EdgeType.CONTAINS*/); + SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE); sparkWork.connect(work, unionWork, edgeProp); unionWork.addUnionOperators(context.currentUnionOperators); context.currentUnionOperators.clear(); @@ -271,11 +271,9 @@ public Object process(Node nd, Stack stack, if (!context.connectedReduceSinks.contains(rs)) { // add dependency between the two work items - SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE, + // Use group-by as the default shuffler + SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_GROUP, rs.getConf().getNumReducers()); - if(rWork.getReducer() instanceof GroupByOperator){ - edgeProp.setShuffleGroup(); - } String sortOrder = Strings.nullToEmpty(rs.getConf().getOrder()).trim(); if (!sortOrder.isEmpty() && GenSparkUtils.isSortNecessary(rs)) { edgeProp.setShuffleSort(); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java index 3debae5..bdfef87 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java @@ -77,11 +77,10 @@ public String getShuffleType() { sb.append("GROUP"); } - if (sb.length() != 0) { - sb.append(" "); - } - if (isShuffleSort()) { + if (sb.length() != 0) { + sb.append(" "); + } sb.append("SORT"); }