diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 19ff316..409fc90 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1392,7 +1392,8 @@ spark.only.query.files=spark_combine_equivalent_work.q,\ spark_explainuser_1.q,\ spark_vectorized_dynamic_partition_pruning.q,\ spark_use_ts_stats_for_mapjoin.q,\ - spark_use_op_stats.q + spark_use_op_stats.q,\ + spark_explain_groupbyshuffle.q miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\ bucket4.q,\ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RepartitionShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RepartitionShuffler.java deleted file mode 100644 index d0c708c..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RepartitionShuffler.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.spark; - -import org.apache.hadoop.hive.ql.io.HiveKey; -import org.apache.hadoop.io.BytesWritable; -import org.apache.spark.HashPartitioner; -import org.apache.spark.api.java.JavaPairRDD; - -public class RepartitionShuffler implements SparkShuffler { - - @Override - public JavaPairRDD shuffle( - JavaPairRDD input, int numPartitions) { - if (numPartitions < 0) { - numPartitions = 1; - } - return input.repartitionAndSortWithinPartitions(new HashPartitioner(numPartitions)); - } - - @Override - public String getName() { - return "GroupBy_Repartition"; - } - -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 5f85f9e..079ec42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -212,11 +212,7 @@ private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolea } else if (edge.isShuffleSort()) { shuffler = new SortByShuffler(true, sparkPlan); } else { - boolean useSparkGroupBy = jobConf.getBoolean("hive.spark.use.groupby.shuffle", true); - if (!useSparkGroupBy) { - LOG.info("hive.spark.use.groupby.shuffle is off. Use repartitin shuffle instead."); - } - shuffler = useSparkGroupBy ? new GroupByShuffler() : new RepartitionShuffler(); + shuffler = new GroupByShuffler(); } return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index b9901da..2d3d0b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -65,6 +65,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; + /** * GenSparkUtils is a collection of shared helper methods to produce SparkWork * Cloned from GenTezUtils. @@ -117,7 +118,7 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot; setupReduceSink(context, reduceWork, reduceSink); sparkWork.add(reduceWork); - SparkEdgeProperty edgeProp = getEdgeProperty(reduceSink, reduceWork); + SparkEdgeProperty edgeProp = getEdgeProperty(context.conf, reduceSink, reduceWork); sparkWork.connect(context.preceedingWork, reduceWork, edgeProp); @@ -425,8 +426,9 @@ public void processPartitionPruningSink(GenSparkProcContext context, keys.add(desc.getPartKey()); } - public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink, + public static SparkEdgeProperty getEdgeProperty(HiveConf conf, ReduceSinkOperator reduceSink, ReduceWork reduceWork) throws SemanticException { + boolean useSparkGroupBy = conf.getBoolVar(HiveConf.ConfVars.SPARK_USE_GROUPBY_SHUFFLE); SparkEdgeProperty edgeProperty = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE); edgeProperty.setNumPartitions(reduceWork.getNumReduceTasks()); String sortOrder = Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim(); @@ -435,7 +437,10 @@ public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink, edgeProperty.setShuffleGroup(); // test if the group by needs partition level sort, if so, use the MR style shuffle // SHUFFLE_SORT shouldn't be used for this purpose, see HIVE-8542 - if (!sortOrder.isEmpty() && groupByNeedParLevelOrder(reduceSink)) { + if (!useSparkGroupBy || (!sortOrder.isEmpty() && groupByNeedParLevelOrder(reduceSink))) { + if (!useSparkGroupBy) { + LOG.info("hive.spark.use.groupby.shuffle is off. Use repartition shuffle instead."); + } edgeProperty.setMRShuffle(); } } @@ -468,12 +473,17 @@ public static SparkEdgeProperty getEdgeProperty(ReduceSinkOperator reduceSink, } } - // set to groupby-shuffle if it's still NONE // simple distribute-by goes here if (edgeProperty.isShuffleNone()) { - edgeProperty.setShuffleGroup(); + if (!useSparkGroupBy) { + LOG.info("hive.spark.use.groupby.shuffle is off. Use repartition shuffle instead."); + edgeProperty.setMRShuffle(); + } else { + edgeProperty.setShuffleGroup(); + } } + return edgeProperty; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index afbeccb..5385d5e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -211,7 +211,7 @@ public Object process(Node nd, Stack stack, "AssertionError: expected operator to be a ReduceSinkOperator, but was " + parent.getClass().getName()); ReduceSinkOperator rsOp = (ReduceSinkOperator) parent; - SparkEdgeProperty edgeProp = GenSparkUtils.getEdgeProperty(rsOp, reduceWork); + SparkEdgeProperty edgeProp = GenSparkUtils.getEdgeProperty(context.conf, rsOp, reduceWork); rsOp.getConf().setOutputName(reduceWork.getName()); GenMapRedUtils.setKeyAndValueDesc(reduceWork, rsOp); diff --git a/ql/src/test/queries/clientpositive/spark_explain_groupbyshuffle.q b/ql/src/test/queries/clientpositive/spark_explain_groupbyshuffle.q new file mode 100644 index 0000000..cd2cba1 --- /dev/null +++ b/ql/src/test/queries/clientpositive/spark_explain_groupbyshuffle.q @@ -0,0 +1,8 @@ +set hive.spark.use.groupby.shuffle=true; + +explain select key, count(value) from src group by key; + + +set hive.spark.use.groupby.shuffle=false; + +explain select key, count(value) from src group by key; diff --git a/ql/src/test/results/clientpositive/spark/spark_explain_groupbyshuffle.q.out b/ql/src/test/results/clientpositive/spark/spark_explain_groupbyshuffle.q.out new file mode 100644 index 0000000..9d162c9 --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/spark_explain_groupbyshuffle.q.out @@ -0,0 +1,116 @@ +PREHOOK: query: explain select key, count(value) from src group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain select key, count(value) from src group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP, 2) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: key, value + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(value) + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: explain select key, count(value) from src group by key +PREHOOK: type: QUERY +POSTHOOK: query: explain select key, count(value) from src group by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP PARTITION-LEVEL SORT, 2) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: key, value + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(value) + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +