diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index c9e469c..9e59d19 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -321,4 +321,8 @@ public void close() { Utilities.clearWorkMap(); } } + + public Operator getReducer() { + return reducer; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java index f877d35..2b7e538 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java @@ -18,12 +18,16 @@ package org.apache.hadoop.hive.ql.exec.spark; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.Reporter; import scala.Tuple2; import java.io.IOException; +import java.util.Arrays; import java.util.Iterator; public class HiveReduceFunctionResultList extends @@ -41,6 +45,7 @@ public HiveReduceFunctionResultList(Configuration conf, ExecReducer reducer) { super(conf, inputIterator); this.reducer = reducer; + setOutputCollector(); } @Override @@ -58,4 +63,11 @@ protected boolean processingDone() { protected void closeRecordProcessor() { reducer.close(); } + + private void setOutputCollector() { + if (reducer != null && reducer.getReducer() != null) { + OperatorUtils.setChildrenCollector( + Arrays.>asList(reducer.getReducer()), this); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 45eff67..434d25f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -47,7 +47,7 @@ public class SparkPlanGenerator { private JavaSparkContext sc; - private JobConf jobConf; + private final JobConf jobConf; private Context context; private Path scratchDir; @@ -86,7 +86,6 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { private JavaPairRDD generateRDD(MapWork mapWork) throws Exception { List inputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, context, false); Utilities.setInputPaths(jobConf, inputPaths); - Utilities.setMapWork(jobConf, mapWork, scratchDir, true); Class ifClass = HiveInputFormat.class; // The mapper class is expected by the HiveInputFormat. @@ -119,7 +118,7 @@ private SparkTran generate(BaseWork bw) throws IOException, HiveException { private MapTran generate(MapWork mw) throws IOException { MapTran result = new MapTran(); - Utilities.setMapWork(jobConf, mw, scratchDir, true); + Utilities.setMapWork(jobConf, mw, scratchDir, false); Utilities.createTmpDirs(jobConf, mw); jobConf.set("mapred.mapper.class", ExecMapper.class.getName()); byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); @@ -137,11 +136,15 @@ private SparkShuffler generate(SparkEdgeProperty edge) { private ReduceTran generate(ReduceWork rw) throws IOException { ReduceTran result = new ReduceTran(); - Utilities.setReduceWork(jobConf, rw, scratchDir, true); - Utilities.createTmpDirs(jobConf, rw); - byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); - HiveReduceFunction mapFunc = new HiveReduceFunction(confBytes); - result.setReduceFunction(mapFunc); + // Clone jobConf for each ReduceWork so we can have multiple of them + JobConf newJobConf = new JobConf(jobConf); + // Make sure we'll use a different plan path from the original one + HiveConf.setVar(newJobConf, HiveConf.ConfVars.PLAN, ""); + Utilities.setReduceWork(newJobConf, rw, scratchDir, false); + Utilities.createTmpDirs(newJobConf, rw); + byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf); + HiveReduceFunction redFunc = new HiveReduceFunction(confBytes); + result.setReduceFunction(redFunc); return result; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java index beeeab8..3debae5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java @@ -26,9 +26,9 @@ public static long SHUFFLE_SORT = 2; // Shuffle, keys are sorted private long edgeType; - + private int numPartitions; - + public SparkEdgeProperty(long edgeType, int numPartitions) { this.edgeType = edgeType; this.numPartitions = numPartitions;