diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f3b01b2..e82758f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3228,6 +3228,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE( "hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L, "Maximum total data size in dynamic pruning."), + SPARK_USE_GROUPBY_SHUFFLE( + "hive.spark.use.groupby.shuffle", true, + "Spark groupByKey transformation has better performance but uses unbounded memory." + + "Turn this off when there is a memory issue."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java index 8267515..9f9e3b2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java @@ -20,18 +20,17 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; -import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; -public class GroupByShuffler implements SparkShuffler { +public class GroupByShuffler implements SparkShuffler> { @Override - public JavaPairRDD shuffle( + public JavaPairRDD> shuffle( JavaPairRDD input, int numPartitions) { - if (numPartitions < 0) { - numPartitions = 1; + if (numPartitions > 0) { + return input.groupByKey(numPartitions); } - return input.repartitionAndSortWithinPartitions(new HashPartitioner(numPartitions)); + return input.groupByKey(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java index 2b85872..2b6e2de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java @@ -25,8 +25,8 @@ import scala.Tuple2; -public class HiveReduceFunction extends HivePairFlatMapFunction< - Iterator>, HiveKey, BytesWritable> { +public class HiveReduceFunction extends HivePairFlatMapFunction< + Iterator>, HiveKey, BytesWritable> { private static final long serialVersionUID = 1L; @@ -37,12 +37,12 @@ public HiveReduceFunction(byte[] buffer, SparkReporter sparkReporter) { @SuppressWarnings("unchecked") @Override public Iterator> - call(Iterator> it) throws Exception { + call(Iterator> it) throws Exception { initJobConf(); SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler(); - HiveReduceFunctionResultList result = - new HiveReduceFunctionResultList(it, reducerRecordhandler); + HiveReduceFunctionResultList result = + new HiveReduceFunctionResultList(it, reducerRecordhandler); reducerRecordhandler.init(jobConf, result, sparkReporter); return result; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java index 8708819..1f1517d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java @@ -25,8 +25,8 @@ import scala.Tuple2; -public class HiveReduceFunctionResultList extends - HiveBaseFunctionResultList> { +public class HiveReduceFunctionResultList extends + HiveBaseFunctionResultList> { private static final long serialVersionUID = 1L; private final SparkReduceRecordHandler reduceRecordHandler; @@ -37,16 +37,24 @@ * @param reducer Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecReducer} instance. */ public HiveReduceFunctionResultList( - Iterator> inputIterator, + Iterator> inputIterator, SparkReduceRecordHandler reducer) { super(inputIterator); this.reduceRecordHandler = reducer; } @Override - protected void processNextRecord(Tuple2 inputRecord) + protected void processNextRecord(Tuple2 inputRecord) throws IOException { - reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2()); + HiveKey key = inputRecord._1(); + V value = inputRecord._2(); + if (value instanceof Iterable) { + @SuppressWarnings("unchecked") + Iterable values = (Iterable)value; + reduceRecordHandler.processRow(key, values.iterator()); + } else { + reduceRecordHandler.processRow(key, value); + } } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index 926e1f8..709a4ac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -22,8 +22,8 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; -public class ReduceTran extends CacheTran { - private HiveReduceFunction reduceFunc; +public class ReduceTran extends CacheTran { + private HiveReduceFunction reduceFunc; private String name = "Reduce"; public ReduceTran() { @@ -36,11 +36,11 @@ public ReduceTran(boolean caching) { @Override public JavaPairRDD doTransform( - JavaPairRDD input) { + JavaPairRDD input) { return input.mapPartitionsToPair(reduceFunc); } - public void setReduceFunction(HiveReduceFunction redFunc) { + public void setReduceFunction(HiveReduceFunction redFunc) { this.reduceFunc = redFunc; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RepartitionShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RepartitionShuffler.java new file mode 100644 index 0000000..d0c708c --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RepartitionShuffler.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.HashPartitioner; +import org.apache.spark.api.java.JavaPairRDD; + +public class RepartitionShuffler implements SparkShuffler { + + @Override + public JavaPairRDD shuffle( + JavaPairRDD input, int numPartitions) { + if (numPartitions < 0) { + numPartitions = 1; + } + return input.repartitionAndSortWithinPartitions(new HashPartitioner(numPartitions)); + } + + @Override + public String getName() { + return "GroupBy_Repartition"; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index b7ab5e5..61f04f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -25,7 +25,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.storage.StorageLevel; -public class SortByShuffler implements SparkShuffler { +public class SortByShuffler implements SparkShuffler { private final boolean totalOrder; private final SparkPlan sparkPlan; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 1b8b058..12a76a7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -212,7 +212,11 @@ private ShuffleTran generate(SparkPlan sparkPlan, SparkEdgeProperty edge, boolea } else if (edge.isShuffleSort()) { shuffler = new SortByShuffler(true, sparkPlan); } else { - shuffler = new GroupByShuffler(); + boolean useSparkGroupBy = jobConf.getBoolean("hive.spark.use.groupby.shuffle", true); + if (!useSparkGroupBy) { + LOG.info("hive.spark.use.groupby.shuffle is off. Use repartitin shuffle instead."); + } + shuffler = useSparkGroupBy ? new GroupByShuffler() : new RepartitionShuffler(); } return new ShuffleTran(sparkPlan, shuffler, edge.getNumPartitions(), toCache); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java index d71d940..e4913b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java @@ -22,9 +22,9 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; -public interface SparkShuffler { +public interface SparkShuffler { - JavaPairRDD shuffle( + JavaPairRDD shuffle( JavaPairRDD input, int numPartitions); public String getName(); diff --git ql/src/test/queries/clientpositive/lateral_view_explode2.q ql/src/test/queries/clientpositive/lateral_view_explode2.q index 3c48027..1b5479a 100644 --- ql/src/test/queries/clientpositive/lateral_view_explode2.q +++ ql/src/test/queries/clientpositive/lateral_view_explode2.q @@ -2,8 +2,10 @@ add jar ${system:maven.local.repository}/org/apache/hive/hive-contrib/${system:h CREATE TEMPORARY FUNCTION explode2 AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFExplode2'; +-- SORT_QUERY_RESULTS + EXPLAIN SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3; SELECT col1, col2 FROM src LATERAL VIEW explode2(array(1,2,3)) myTable AS col1, col2 group by col1, col2 LIMIT 3; -DROP TEMPORARY FUNCTION explode2; \ No newline at end of file +DROP TEMPORARY FUNCTION explode2; diff --git ql/src/test/results/clientpositive/spark/union_remove_25.q.out ql/src/test/results/clientpositive/spark/union_remove_25.q.out index e7844ed..9fec1d4 100644 --- ql/src/test/results/clientpositive/spark/union_remove_25.q.out +++ ql/src/test/results/clientpositive/spark/union_remove_25.q.out @@ -370,7 +370,7 @@ Table: outputtbl2 #### A masked pattern was here #### Partition Parameters: numFiles 2 - totalSize 6812 + totalSize 6826 #### A masked pattern was here #### # Storage Information