diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java index 841db87..a14603b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java @@ -18,11 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; import org.apache.hadoop.io.BytesWritable; @@ -49,18 +45,11 @@ public HiveReduceFunction(byte[] buffer) { @Override public Iterable> call(Iterator> it) throws Exception { - if (jobConf == null) { - jobConf = KryoSerializer.deserializeJobConf(this.buffer); - jobConf.set("mapred.reducer.class", ExecReducer.class.getName()); - - reducer = new ExecReducer(); - reducer.configure(jobConf); - collector = new SparkCollector(); - } + init(); collector.clear(); Map> clusteredRows = - new HashMap>(); + new LinkedHashMap>(); while (it.hasNext()) { Tuple2 input = it.next(); BytesWritable key = input._1(); @@ -83,5 +72,16 @@ public HiveReduceFunction(byte[] buffer) { reducer.close(); return collector.getResult(); } + + public void init() { + if (jobConf == null) { + jobConf = KryoSerializer.deserializeJobConf(this.buffer); + jobConf.set("mapred.reducer.class", ExecReducer.class.getName()); + + reducer = new ExecReducer(); + reducer.configure(jobConf); + collector = new SparkCollector(); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index 94db145..c9322c1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -207,7 +207,7 @@ public int execute(DriverContext driverContext, SparkWork sparkWork) { } } } else { - JavaPairRDD rdd3 = rdd2.partitionBy(new HashPartitioner(1/*redWork.getNumReduceTasks()*/)); // Two partitions. + JavaPairRDD rdd3 = SparkShuffler.shuffle(sparkWork, mapWork, redWork, rdd2); HiveReduceFunction rf = new HiveReduceFunction(confBytes); JavaPairRDD rdd4 = rdd3.mapPartitionsToPair(rf); rdd4.foreach(HiveVoidFunction.getInstance()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java new file mode 100644 index 0000000..4b07293 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkShuffler.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.spark.HashPartitioner; +import org.apache.spark.api.java.JavaPairRDD; + +import java.util.Comparator; + +// Use this class to inject shuffle transformations +public class SparkShuffler { + private static final Log LOG = LogFactory.getLog(SparkShuffler.class); + + public static JavaPairRDD shuffle(SparkWork sparkWork, BaseWork parentWork, BaseWork childWork, JavaPairRDD parentRDD) { + if (sparkWork.containWork(parentWork) && sparkWork.containWork(childWork)) { + SparkEdgeProperty edgeProperty = sparkWork.getEdgeProperty(parentWork, childWork); + if (edgeProperty != null) { + int numPartitions = edgeProperty.getNumPartitions(); + if (numPartitions <= 0) { + numPartitions = 1; + } + if (edgeProperty.isShuffleSort()) { + LOG.info("Shuffle Type: SORT " + (edgeProperty.isAscend() ? "ASC" : "DESC")); + Comparator comp = com.google.common.collect.Ordering.natural(); + // Due to HIVE-7540, number of partitions must be set to 1 + return parentRDD.sortByKey(comp, edgeProperty.isAscend(), 1); + } + if (edgeProperty.isShuffleGroup()) { + LOG.info("Shuffle Type: GROUP"); + return parentRDD.groupByKey(numPartitions); + } + if(!edgeProperty.isShuffleNone()) { + LOG.info("Shuffle type unknown, using partition as default."); + return parentRDD.partitionBy(new HashPartitioner(numPartitions)); + } + LOG.info("Shuffle Type: NONE"); + } else { + LOG.info("No connection between parent and child work."); + } + } else { + LOG.info("Cannot find the parent or child work in the SparkWork."); + } + LOG.info("No shuffle injected."); + return parentRDD; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 25eea14..cd5341a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -33,26 +33,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.UnionOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.FileSinkDesc; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.ReduceWork; -import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; -import org.apache.hadoop.hive.ql.plan.SparkWork; -import org.apache.hadoop.hive.ql.plan.UnionWork; +import org.apache.hadoop.hive.ql.plan.*; /** * GenSparkUtils is a collection of shared helper methods to produce SparkWork @@ -145,6 +131,20 @@ public ReduceWork createReduceWork(GenSparkProcContext context, Operator root edgeProp = new SparkEdgeProperty(0); } + if(root instanceof GroupByOperator){ + edgeProp.setShuffleGroup(); + } + String sortOrder=reduceSink.getConf().getOrder(); + if(sortOrder != null && !sortOrder.trim().isEmpty()) { + if(sortOrder.startsWith("+")) { + edgeProp.setShuffleSort(); + edgeProp.setAscend(true); + } else if (sortOrder.startsWith("-")) { + edgeProp.setShuffleSort(); + edgeProp.setAscend(false); + } + } + sparkWork.connect( context.preceedingWork, reduceWork, edgeProp); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index ceb7b6c..cb939bd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -28,11 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.OperatorFactory; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -271,6 +267,19 @@ public Object process(Node nd, Stack stack, } else { edgeProp = new SparkEdgeProperty(0/*EdgeType.SIMPLE_EDGE*/); } + if(rWork.getReducer() instanceof GroupByOperator){ + edgeProp.setShuffleGroup(); + } + String sortOrder=rs.getConf().getOrder(); + if(sortOrder!=null && !sortOrder.trim().isEmpty()){ + if(sortOrder.startsWith("+")){ + edgeProp.setShuffleSort(); + edgeProp.setAscend(true); + } else if(sortOrder.startsWith("-")){ + edgeProp.setShuffleSort(); + edgeProp.setAscend(false); + } + } sparkWork.connect(work, rWork, edgeProp); context.connectedReduceSinks.add(rs); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java index 9447578..5e11327 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java @@ -28,6 +28,8 @@ private long edgeType; private int numPartitions; + + private boolean ascend = true; public SparkEdgeProperty(long edgeType, int numPartitions) { this.edgeType = edgeType; @@ -95,5 +97,13 @@ public int getNumPartitions() { public void setNumPartitions(int numPartitions) { this.numPartitions = numPartitions; } + + public boolean isAscend() { + return ascend; + } + + public void setAscend(boolean ascend) { + this.ascend = ascend; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java index 86d14f1..1c66c00 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java @@ -314,4 +314,8 @@ public ReduceWork getReduceWork() { return null; } + public boolean containWork(BaseWork work) { + return workGraph.containsKey(work) && invertedWorkGraph.containsKey(work); + } + }