diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RepartitionShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RepartitionShuffler.java deleted file mode 100644 index d0c708c..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RepartitionShuffler.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.spark; - -import org.apache.hadoop.hive.ql.io.HiveKey; -import org.apache.hadoop.io.BytesWritable; -import org.apache.spark.HashPartitioner; -import org.apache.spark.api.java.JavaPairRDD; - -public class RepartitionShuffler implements SparkShuffler { - - @Override - public JavaPairRDD shuffle( - JavaPairRDD input, int numPartitions) { - if (numPartitions < 0) { - numPartitions = 1; - } - return input.repartitionAndSortWithinPartitions(new HashPartitioner(numPartitions)); - } - - @Override - public String getName() { - return "GroupBy_Repartition"; - } - -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 12a76a7..1b8b058 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -212,11 +212,7 @@ private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolea } else if (edge.isShuffleSort()) { shuffler = new SortByShuffler(true, sparkPlan); } else { - boolean useSparkGroupBy = jobConf.getBoolean("hive.spark.use.groupby.shuffle", true); - if (!useSparkGroupBy) { - LOG.info("hive.spark.use.groupby.shuffle is off. Use repartitin shuffle instead."); - } - shuffler = useSparkGroupBy ? new GroupByShuffler() : new RepartitionShuffler(); + shuffler = new GroupByShuffler(); } return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index d0a82af..695ac31 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -65,6 +65,8 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import static org.apache.hadoop.hive.conf.HiveConf.*; + /** * GenSparkUtils is a collection of shared helper methods to produce SparkWork * Cloned from GenTezUtils. @@ -117,7 +119,7 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot; setupReduceSink(context, reduceWork, reduceSink); sparkWork.add(reduceWork); - SparkEdgeProperty edgeProp = getEdgeProperty(reduceSink, reduceWork); + SparkEdgeProperty edgeProp = getEdgeProperty(context.conf, reduceSink, reduceWork); sparkWork.connect(context.preceedingWork, reduceWork, edgeProp); @@ -415,8 +417,9 @@ public void processPartitionPruningSink(GenSparkProcContext context, keys.add(desc.getPartKey()); } - public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink, + public static SparkEdgeProperty getEdgeProperty(HiveConf conf, ReduceSinkOperator reduceSink, ReduceWork reduceWork) throws SemanticException { + boolean useSparkGroupBy = conf.getBoolean(String.valueOf(ConfVars.SPARK_USE_GROUPBY_SHUFFLE), true); SparkEdgeProperty edgeProperty = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE); edgeProperty.setNumPartitions(reduceWork.getNumReduceTasks()); String sortOrder = Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim(); @@ -425,7 +428,10 @@ public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink, edgeProperty.setShuffleGroup(); // test if the group by needs partition level sort, if so, use the MR style shuffle // SHUFFLE_SORT shouldn't be used for this purpose, see HIVE-8542 - if (!sortOrder.isEmpty() && groupByNeedParLevelOrder(reduceSink)) { + if ( !useSparkGroupBy || (!sortOrder.isEmpty() && groupByNeedParLevelOrder(reduceSink))) { + if (!useSparkGroupBy) { + LOG.info("hive.spark.use.groupby.shuffle is off. Use repartition shuffle instead."); + } edgeProperty.setMRShuffle(); } } @@ -461,9 +467,15 @@ public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink, // set to groupby-shuffle if it's still NONE // simple distribute-by goes here if (edgeProperty.isShuffleNone()) { - edgeProperty.setShuffleGroup(); + if (!useSparkGroupBy) { + LOG.info("hive.spark.use.groupby.shuffle is off. Use repartition shuffle instead."); + edgeProperty.setMRShuffle(); + } else { + edgeProperty.setShuffleGroup(); + } } + return edgeProperty; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index afbeccb..5385d5e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -211,7 +211,7 @@ public Object process(Node nd, Stack stack, "AssertionError: expected operator to be a ReduceSinkOperator, but was " + parent.getClass().getName()); ReduceSinkOperator rsOp = (ReduceSinkOperator) parent; - SparkEdgeProperty edgeProp = GenSparkUtils.getEdgeProperty(rsOp, reduceWork); + SparkEdgeProperty edgeProp = GenSparkUtils.getEdgeProperty(context.conf, rsOp, reduceWork); rsOp.getConf().setOutputName(reduceWork.getName()); GenMapRedUtils.setKeyAndValueDesc(reduceWork, rsOp);