diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java index e128dd2..8267515 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GroupByShuffler.java @@ -20,21 +20,23 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; public class GroupByShuffler implements SparkShuffler { @Override - public JavaPairRDD> shuffle( + public JavaPairRDD shuffle( JavaPairRDD input, int numPartitions) { - if (numPartitions > 0) { - return input.groupByKey(numPartitions); + if (numPartitions < 0) { + numPartitions = 1; } - return input.groupByKey(); + return input.repartitionAndSortWithinPartitions(new HashPartitioner(numPartitions)); } @Override public String getName() { return "GroupBy"; } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java index eeb4443..2b85872 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java @@ -26,7 +26,7 @@ import scala.Tuple2; public class HiveReduceFunction extends HivePairFlatMapFunction< - Iterator>>, HiveKey, BytesWritable> { + Iterator>, HiveKey, BytesWritable> { private static final long serialVersionUID = 1L; @@ -37,7 +37,7 @@ public HiveReduceFunction(byte[] buffer, SparkReporter sparkReporter) { @SuppressWarnings("unchecked") @Override public Iterator> - call(Iterator>> it) throws Exception { + call(Iterator> it) throws Exception { initJobConf(); SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java index d57cac4..8708819 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java @@ -26,7 +26,7 @@ import scala.Tuple2; public class HiveReduceFunctionResultList extends - HiveBaseFunctionResultList>> { + HiveBaseFunctionResultList> { private static final long serialVersionUID = 1L; private final SparkReduceRecordHandler reduceRecordHandler; @@ -37,16 +37,16 @@ * @param reducer Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecReducer} instance. */ public HiveReduceFunctionResultList( - Iterator>> inputIterator, + Iterator> inputIterator, SparkReduceRecordHandler reducer) { super(inputIterator); this.reduceRecordHandler = reducer; } @Override - protected void processNextRecord(Tuple2> inputRecord) + protected void processNextRecord(Tuple2 inputRecord) throws IOException { - reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2().iterator()); + reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2()); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java index 3d56876..926e1f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java @@ -22,7 +22,7 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.spark.api.java.JavaPairRDD; -public class ReduceTran extends CacheTran, HiveKey, BytesWritable> { +public class ReduceTran extends CacheTran { private HiveReduceFunction reduceFunc; private String name = "Reduce"; @@ -36,7 +36,7 @@ public ReduceTran(boolean caching) { @Override public JavaPairRDD doTransform( - JavaPairRDD> input) { + JavaPairRDD input) { return input.mapPartitionsToPair(reduceFunc); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java index a774395..2aac2b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java @@ -23,7 +23,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.storage.StorageLevel; -public class ShuffleTran implements SparkTran> { +public class ShuffleTran implements SparkTran { private final SparkShuffler shuffler; private final int numOfPartitions; private final boolean toCache; @@ -42,8 +42,8 @@ public ShuffleTran(SparkPlan sparkPlan, SparkShuffler sf, int n, boolean toCache } @Override - public JavaPairRDD> transform(JavaPairRDD input) { - JavaPairRDD> result = shuffler.shuffle(input, numOfPartitions); + public JavaPairRDD transform(JavaPairRDD input) { + JavaPairRDD result = shuffler.shuffle(input, numOfPartitions); if (toCache) { sparkPlan.addCachedRDDId(result.id()); result = result.persist(StorageLevel.MEMORY_AND_DISK()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index 997ab7e..b7ab5e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -23,11 +23,7 @@ import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.storage.StorageLevel; -import scala.Tuple2; - -import java.util.*; public class SortByShuffler implements SparkShuffler { @@ -43,7 +39,7 @@ public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { } @Override - public JavaPairRDD> shuffle( + public JavaPairRDD shuffle( JavaPairRDD input, int numPartitions) { JavaPairRDD rdd; if (totalOrder) { @@ -60,7 +56,7 @@ public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) { Partitioner partitioner = new HashPartitioner(numPartitions); rdd = input.repartitionAndSortWithinPartitions(partitioner); } - return rdd.mapPartitionsToPair(new ShuffleFunction()); + return rdd; } @Override @@ -68,61 +64,4 @@ public String getName() { return "SortBy"; } - private static class ShuffleFunction implements - PairFlatMapFunction>, - HiveKey, Iterable> { - // make eclipse happy - private static final long serialVersionUID = 1L; - - @Override - public Iterator>> call( - final Iterator> it) throws Exception { - // Use input iterator to back returned iterable object. - return new Iterator>>() { - HiveKey curKey = null; - List curValues = new ArrayList(); - - @Override - public boolean hasNext() { - return it.hasNext() || curKey != null; - } - - @Override - public Tuple2> next() { - // TODO: implement this by accumulating rows with the same key into a list. - // Note that this list needs to improved to prevent excessive memory usage, but this - // can be done in later phase. - while (it.hasNext()) { - Tuple2 pair = it.next(); - if (curKey != null && !curKey.equals(pair._1())) { - HiveKey key = curKey; - List values = curValues; - curKey = pair._1(); - curValues = new ArrayList(); - curValues.add(pair._2()); - return new Tuple2>(key, values); - } - curKey = pair._1(); - curValues.add(pair._2()); - } - if (curKey == null) { - throw new NoSuchElementException(); - } - // if we get here, this should be the last element we have - HiveKey key = curKey; - curKey = null; - return new Tuple2>(key, curValues); - } - - @Override - public void remove() { - // Not implemented. - // throw Unsupported Method Invocation Exception. - throw new UnsupportedOperationException(); - } - - }; - } - } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 66ffe5d..1b8b058 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -46,10 +45,6 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; -import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; -import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat; -import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; -import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -61,9 +56,7 @@ import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 0d31e5f..4eaa061 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -26,7 +26,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; @@ -48,8 +47,6 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; @@ -213,9 +210,31 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) throws perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS); } + /** + * TODO: Instead of creating a dummy iterator per row, we can implement a private method that's + * similar to processRow(Object key, Iterator values) but processes one row at a time. Then, + * we just call that private method here. + */ @Override - public void processRow(Object key, Object value) throws IOException { - throw new UnsupportedOperationException("Do not support this method in SparkReduceRecordHandler."); + public void processRow(Object key, final Object value) throws IOException { + processRow(key, new Iterator() { + boolean done = false; + @Override + public boolean hasNext() { + return !done; + } + + @Override + public Object next() { + done = true; + return value; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Iterator.remove() is not implemented/supported"); + } + }); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java index 40e251f..d71d940 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java @@ -24,7 +24,7 @@ public interface SparkShuffler { - JavaPairRDD> shuffle( + JavaPairRDD shuffle( JavaPairRDD input, int numPartitions); public String getName(); diff --git ql/src/test/queries/clientpositive/union_top_level.q ql/src/test/queries/clientpositive/union_top_level.q index d93fe38..2050442 100644 --- ql/src/test/queries/clientpositive/union_top_level.q +++ ql/src/test/queries/clientpositive/union_top_level.q @@ -16,13 +16,13 @@ union all select * from (select key, 2 as value from src where key % 3 == 2 limit 3)c; explain -select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a +select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a union all -select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b; +select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b; -select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a +select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a union all -select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b; +select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b; -- ctas explain diff --git ql/src/test/results/clientpositive/spark/union_top_level.q.out ql/src/test/results/clientpositive/spark/union_top_level.q.out index 9be5361..1bc1666 100644 --- ql/src/test/results/clientpositive/spark/union_top_level.q.out +++ ql/src/test/results/clientpositive/spark/union_top_level.q.out @@ -184,14 +184,14 @@ POSTHOOK: Input: default@src 484 1 86 2 PREHOOK: query: explain -select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a +select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a union all -select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b +select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b PREHOOK: type: QUERY POSTHOOK: query: explain -select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a +select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a union all -select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b +select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage @@ -202,8 +202,8 @@ STAGE PLANS: Spark Edges: Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2) - Reducer 3 <- Reducer 2 (GROUP, 1) - Reducer 7 <- Reducer 2 (GROUP, 1) + Reducer 3 <- Reducer 2 (SORT, 1) + Reducer 7 <- Reducer 2 (SORT, 1) #### A masked pattern was here #### Vertices: Map 1 @@ -255,20 +255,18 @@ STAGE PLANS: expressions: _col2 (type: string), _col1 (type: string) outputColumnNames: _col0, _col1 Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE - Limit - Number of rows: 10 - Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - sort order: - Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE - TopN Hash Memory Usage: 0.1 - value expressions: _col0 (type: string), _col1 (type: string) + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + TopN Hash Memory Usage: 0.1 + value expressions: _col1 (type: string) Reducer 3 Reduce Operator Tree: Select Operator - expressions: VALUE._col0 (type: string), VALUE._col1 (type: string) + expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string) outputColumnNames: _col0, _col1 - Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE Limit Number of rows: 10 Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE @@ -282,9 +280,9 @@ STAGE PLANS: Reducer 7 Reduce Operator Tree: Select Operator - expressions: VALUE._col0 (type: string), VALUE._col1 (type: string) + expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string) outputColumnNames: _col0, _col1 - Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE Limit Number of rows: 10 Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE @@ -302,15 +300,15 @@ STAGE PLANS: Processor Tree: ListSink -PREHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a +PREHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a union all -select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b +select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b PREHOOK: type: QUERY PREHOOK: Input: default@src #### A masked pattern was here #### -POSTHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)a +POSTHOOK: query: select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)a union all -select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) limit 10)b +select * from (select s1.key as k, s2.value as v from src s1 join src s2 on (s1.key = s2.key) order by k limit 10)b POSTHOOK: type: QUERY POSTHOOK: Input: default@src #### A masked pattern was here #### @@ -324,16 +322,16 @@ POSTHOOK: Input: default@src 0 val_0 0 val_0 0 val_0 -100 val_100 -100 val_100 -100 val_100 -100 val_100 -100 val_100 -100 val_100 -100 val_100 -100 val_100 -104 val_104 -104 val_104 +0 val_0 +0 val_0 +0 val_0 +0 val_0 +0 val_0 +0 val_0 +0 val_0 +0 val_0 +10 val_10 +10 val_10 PREHOOK: query: -- ctas explain create table union_top as