diff --git pom.xml pom.xml index e5c7b2b..372a017 100644 --- pom.xml +++ pom.xml @@ -147,7 +147,7 @@ 1.7.5 4.0.4 0.4.0-incubating - 1.0.1 + 1.1.0-SNAPSHOT 2.10 2.10.4 1.1 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index c915deb..70e20b0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -18,23 +18,24 @@ package org.apache.hadoop.hive.ql.exec.spark; -import java.util.*; - -import com.google.common.collect.Ordering; import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFlatMapFunction; - import scala.Tuple2; +import java.util.*; + public class SortByShuffler implements SparkShuffler { @Override public JavaPairRDD> shuffle( JavaPairRDD input, int numPartitions) { - Comparator comp = Ordering.natural(); - // Due to HIVE-7540, numPartitions must be to 1 - JavaPairRDD rdd = input.sortByKey(comp, true, 1); + JavaPairRDD rdd; + if (numPartitions > 0) { + rdd = input.sortByKey(true, numPartitions); + } else { + rdd = input.sortByKey(true); + } return rdd.mapPartitionsToPair(new ShuffleFunction()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java index beeeab8..f22870a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java @@ -28,7 +28,7 @@ private long edgeType; private int numPartitions; - + public SparkEdgeProperty(long edgeType, int numPartitions) { this.edgeType = edgeType; this.numPartitions = numPartitions;