diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index a6a4c32..20afdcb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -181,6 +181,9 @@ public void reduce(Object key, Iterator values, OutputCollector output, // propagate reporter and output collector to all operators oc = output; rp = reporter; + // To accommodate engines like spark, in which ExecReducer can also contain RS + org.apache.hadoop.hive.ql.exec.OperatorUtils.setChildrenCollector( + reducer.getChildOperators(), output); reducer.setReporter(rp); MapredContext.get().setReporter(reporter); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 45eff67..434d25f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -47,7 +47,7 @@ public class SparkPlanGenerator { private JavaSparkContext sc; - private JobConf jobConf; + private final JobConf jobConf; private Context context; private Path scratchDir; @@ -86,7 +86,6 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { private JavaPairRDD generateRDD(MapWork mapWork) throws Exception { List inputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, context, false); Utilities.setInputPaths(jobConf, inputPaths); - Utilities.setMapWork(jobConf, mapWork, scratchDir, true); Class ifClass = HiveInputFormat.class; // The mapper class is expected by the HiveInputFormat. @@ -119,7 +118,7 @@ private SparkTran generate(BaseWork bw) throws IOException, HiveException { private MapTran generate(MapWork mw) throws IOException { MapTran result = new MapTran(); - Utilities.setMapWork(jobConf, mw, scratchDir, true); + Utilities.setMapWork(jobConf, mw, scratchDir, false); Utilities.createTmpDirs(jobConf, mw); jobConf.set("mapred.mapper.class", ExecMapper.class.getName()); byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); @@ -137,11 +136,15 @@ private SparkShuffler generate(SparkEdgeProperty edge) { private ReduceTran generate(ReduceWork rw) throws IOException { ReduceTran result = new ReduceTran(); - Utilities.setReduceWork(jobConf, rw, scratchDir, true); - Utilities.createTmpDirs(jobConf, rw); - byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); - HiveReduceFunction mapFunc = new HiveReduceFunction(confBytes); - result.setReduceFunction(mapFunc); + // Clone jobConf for each ReduceWork so we can have multiple of them + JobConf newJobConf = new JobConf(jobConf); + // Make sure we'll use a different plan path from the original one + HiveConf.setVar(newJobConf, HiveConf.ConfVars.PLAN, ""); + Utilities.setReduceWork(newJobConf, rw, scratchDir, false); + Utilities.createTmpDirs(newJobConf, rw); + byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf); + HiveReduceFunction redFunc = new HiveReduceFunction(confBytes); + result.setReduceFunction(redFunc); return result; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java index beeeab8..3debae5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java @@ -26,9 +26,9 @@ public static long SHUFFLE_SORT = 2; // Shuffle, keys are sorted private long edgeType; - + private int numPartitions; - + public SparkEdgeProperty(long edgeType, int numPartitions) { this.edgeType = edgeType; this.numPartitions = numPartitions;