diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java index e128dd2..905039d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java @@ -20,17 +20,18 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; public class GroupByShuffler implements SparkShuffler { @Override - public JavaPairRDD> shuffle( + public JavaPairRDD shuffle( JavaPairRDD input, int numPartitions) { - if (numPartitions > 0) { - return input.groupByKey(numPartitions); + if (numPartitions < 0) { + numPartitions = 1; } - return input.groupByKey(); + return input.repartitionAndSortWithinPartitions(new HashPartitioner(numPartitions)); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java index eeb4443..2b85872 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java @@ -26,7 +26,7 @@ import scala.Tuple2; public class HiveReduceFunction extends HivePairFlatMapFunction< - Iterator>>, HiveKey, BytesWritable> { + Iterator>, HiveKey, BytesWritable> { private static final long serialVersionUID = 1L; @@ -37,7 +37,7 @@ public HiveReduceFunction(byte[] buffer, SparkReporter sparkReporter) { @SuppressWarnings("unchecked") @Override public Iterator> - call(Iterator>> it) throws Exception { + call(Iterator> it) throws Exception { initJobConf(); SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java index d57cac4..8708819 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java @@ -26,7 +26,7 @@ import scala.Tuple2; public class HiveReduceFunctionResultList extends - HiveBaseFunctionResultList>> { + HiveBaseFunctionResultList> { private static final long serialVersionUID = 1L; private final SparkReduceRecordHandler reduceRecordHandler; @@ -37,16 +37,16 @@ * @param reducer Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecReducer} instance. */ public HiveReduceFunctionResultList( - Iterator>> inputIterator, + Iterator> inputIterator, SparkReduceRecordHandler reducer) { super(inputIterator); this.reduceRecordHandler = reducer; } @Override - protected void processNextRecord(Tuple2> inputRecord) + protected void processNextRecord(Tuple2 inputRecord) throws IOException { - reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2().iterator()); + reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2()); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index 3d56876..926e1f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -22,7 +22,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; -public class ReduceTran extends CacheTran, HiveKey, BytesWritable> { +public class ReduceTran extends CacheTran { private HiveReduceFunction reduceFunc; private String name = "Reduce"; @@ -36,7 +36,7 @@ public ReduceTran(boolean caching) { @Override public JavaPairRDD doTransform( - JavaPairRDD> input) { + JavaPairRDD input) { return input.mapPartitionsToPair(reduceFunc); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java index a774395..2aac2b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java @@ -23,7 +23,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.storage.StorageLevel; -public class ShuffleTran implements SparkTran> { +public class ShuffleTran implements SparkTran { private final SparkShuffler shuffler; private final int numOfPartitions; private final boolean toCache; @@ -42,8 +42,8 @@ public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache } @Override - public JavaPairRDD> transform(JavaPairRDD input) { - JavaPairRDD> result = shuffler.shuffle(input, numOfPartitions); + public JavaPairRDD transform(JavaPairRDD input) { + JavaPairRDD result = shuffler.shuffle(input, numOfPartitions); if (toCache) { sparkPlan.addCachedRDDId(result.id()); result = result.persist(StorageLevel.MEMORY_AND_DISK()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 997ab7e..af8f0f5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -43,7 +43,7 @@ public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { } @Override - public JavaPairRDD> shuffle( + public JavaPairRDD shuffle( JavaPairRDD input, int numPartitions) { JavaPairRDD rdd; if (totalOrder) { @@ -60,7 +60,7 @@ public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { Partitioner partitioner = new HashPartitioner(numPartitions); rdd = input.repartitionAndSortWithinPartitions(partitioner); } - return rdd.mapPartitionsToPair(new ShuffleFunction()); + return rdd; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 0d31e5f..a8eb44d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -26,7 +26,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; @@ -48,8 +47,6 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; @@ -214,8 +211,24 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) throws } @Override - public void processRow(Object key, Object value) throws IOException { - throw new UnsupportedOperationException("Do not support this method in SparkReduceRecordHandler."); + public void processRow(Object key, final Object value) throws IOException { + processRow(key, new Iterator() { + boolean done = false; + @Override + public boolean hasNext() { + return !done; + } + + @Override + public Object next() { + done = true; + return value; + } + + @Override + public void remove() {} + + }); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java index 40e251f..d71d940 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java @@ -24,7 +24,7 @@ public interface SparkShuffler { - JavaPairRDD> shuffle( + JavaPairRDD shuffle( JavaPairRDD input, int numPartitions); public String getName(); diff --git ql/src/test/results/clientpositive/spark/groupby_map_ppr.q.out ql/src/test/results/clientpositive/spark/groupby_map_ppr.q.out index beae497..f97f63e 100644 --- ql/src/test/results/clientpositive/spark/groupby_map_ppr.q.out +++ ql/src/test/results/clientpositive/spark/groupby_map_ppr.q.out @@ -73,7 +73,7 @@ STAGE PLANS: ds 2008-04-08 hr 11 properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -119,7 +119,7 @@ STAGE PLANS: ds 2008-04-08 hr 12 properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' diff --git ql/src/test/results/clientpositive/spark/groupby_map_ppr_multi_distinct.q.out ql/src/test/results/clientpositive/spark/groupby_map_ppr_multi_distinct.q.out index 2ad4d68..c833657 100644 --- ql/src/test/results/clientpositive/spark/groupby_map_ppr_multi_distinct.q.out +++ ql/src/test/results/clientpositive/spark/groupby_map_ppr_multi_distinct.q.out @@ -73,7 +73,7 @@ STAGE PLANS: ds 2008-04-08 hr 11 properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -119,7 +119,7 @@ STAGE PLANS: ds 2008-04-08 hr 12 properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' diff --git ql/src/test/results/clientpositive/spark/groupby_ppr.q.out ql/src/test/results/clientpositive/spark/groupby_ppr.q.out index f1e1027..a2c2ced 100644 --- ql/src/test/results/clientpositive/spark/groupby_ppr.q.out +++ ql/src/test/results/clientpositive/spark/groupby_ppr.q.out @@ -66,7 +66,7 @@ STAGE PLANS: ds 2008-04-08 hr 11 properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -112,7 +112,7 @@ STAGE PLANS: ds 2008-04-08 hr 12 properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' diff --git ql/src/test/results/clientpositive/spark/groupby_ppr_multi_distinct.q.out ql/src/test/results/clientpositive/spark/groupby_ppr_multi_distinct.q.out index 7d2f9c3..959db2e 100644 --- ql/src/test/results/clientpositive/spark/groupby_ppr_multi_distinct.q.out +++ ql/src/test/results/clientpositive/spark/groupby_ppr_multi_distinct.q.out @@ -66,7 +66,7 @@ STAGE PLANS: ds 2008-04-08 hr 11 properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -112,7 +112,7 @@ STAGE PLANS: ds 2008-04-08 hr 12 properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -327,7 +327,7 @@ STAGE PLANS: ds 2008-04-08 hr 11 properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default' @@ -373,7 +373,7 @@ STAGE PLANS: ds 2008-04-08 hr 12 properties: - COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}} + COLUMN_STATS_ACCURATE {"COLUMN_STATS":{"key":"true","value":"true"},"BASIC_STATS":"true"} bucket_count -1 columns key,value columns.comments 'default','default'