diff --git pom.xml pom.xml index e5c7b2b..372a017 100644 --- pom.xml +++ pom.xml @@ -147,7 +147,7 @@ 1.7.5 4.0.4 0.4.0-incubating - 1.0.1 + 1.1.0-SNAPSHOT 2.10 2.10.4 1.1 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index c915deb..c424646 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -18,23 +18,26 @@ package org.apache.hadoop.hive.ql.exec.spark; -import java.util.*; - import com.google.common.collect.Ordering; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFlatMapFunction; - import scala.Tuple2; +import java.util.*; + public class SortByShuffler implements SparkShuffler { @Override public JavaPairRDD> shuffle( JavaPairRDD input, int numPartitions) { Comparator comp = Ordering.natural(); - // Due to HIVE-7540, numPartitions must be to 1 - JavaPairRDD rdd = input.sortByKey(comp, true, 1); + JavaPairRDD rdd; + if (numPartitions > 0) { + rdd = input.sortByKey(true, numPartitions); + } else { + rdd = input.sortByKey(true); + } return rdd.mapPartitionsToPair(new ShuffleFunction()); }