diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 91d7c40..d8e3170 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; @@ -42,15 +43,17 @@ public class SparkPlanGenerator { private JavaSparkContext sc; - private JobConf jobConf; + private final JobConf jobConf; private Context context; private Path scratchDir; + private JobConf jobConfClone; public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf jobConf, Path scratchDir) { this.sc = sc; this.context = context; this.jobConf = jobConf; this.scratchDir = scratchDir; + this.jobConfClone = this.jobConf; } public SparkPlan generate(SparkWork sparkWork) throws Exception { @@ -60,7 +63,7 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { assert(roots != null && roots.size() == 1); BaseWork w = roots.iterator().next(); MapWork mapWork = (MapWork) w; - trans.add(generate(w)); + trans.add(generate(mapWork)); while (sparkWork.getChildren(w).size() > 0) { ReduceWork child = (ReduceWork) sparkWork.getChildren(w).get(0); SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child); @@ -81,7 +84,6 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { private JavaPairRDD generateRDD(MapWork mapWork) throws Exception { List inputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, context, false); Utilities.setInputPaths(jobConf, inputPaths); - Utilities.setMapWork(jobConf, mapWork, scratchDir, true); Class ifClass = HiveInputFormat.class; // The mapper class is expected by the HiveInputFormat. @@ -101,7 +103,7 @@ private SparkTran generate(BaseWork bw) throws IOException { private MapTran generate(MapWork mw) throws IOException { MapTran result = new MapTran(); - Utilities.setMapWork(jobConf, mw, scratchDir, true); + Utilities.setMapWork(jobConf, mw, scratchDir, false); Utilities.createTmpDirs(jobConf, mw); jobConf.set("mapred.mapper.class", ExecMapper.class.getName()); byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); @@ -119,11 +121,16 @@ private SparkShuffler generate(SparkEdgeProperty edge) { private ReduceTran generate(ReduceWork rw) throws IOException { ReduceTran result = new ReduceTran(); - Utilities.setReduceWork(jobConf, rw, scratchDir, true); - Utilities.createTmpDirs(jobConf, rw); - byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); - HiveReduceFunction mapFunc = new HiveReduceFunction(confBytes); - result.setReduceFunction(mapFunc); + // Clone jobConf for each ReduceWork so we can have multiple of them + JobConf newJobConf = new JobConf(jobConfClone); + // Make sure we'll use a different plan path from the original one + HiveConf.setVar(newJobConf, HiveConf.ConfVars.PLAN, ""); + Utilities.setReduceWork(newJobConf, rw, scratchDir, false); + Utilities.createTmpDirs(newJobConf, rw); + jobConfClone = newJobConf; + byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf); + HiveReduceFunction redFunc = new HiveReduceFunction(confBytes); + result.setReduceFunction(redFunc); return result; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 6eac4e7..951f1c9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -858,13 +858,6 @@ public static void setKeyAndValueDescForTaskTree(Task ta } else if (task instanceof SparkTask) { SparkWork sw = ((SparkTask)task).getWork(); sw.getMapWork().deriveExplainAttributes(); - HashMap> opMap = sw - .getMapWork().getAliasToWork(); - if (opMap != null && !opMap.isEmpty()) { - for (Operator op : opMap.values()) { - setKeyAndValueDesc(sw.getReduceWork(), op); - } - } } if (task.getChildTasks() == null) {