diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 45eff67..c07f23c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -47,15 +47,17 @@ public class SparkPlanGenerator { private JavaSparkContext sc; - private JobConf jobConf; + private final JobConf jobConf; private Context context; private Path scratchDir; + private JobConf jobConfClone; public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf jobConf, Path scratchDir) { this.sc = sc; this.context = context; this.jobConf = jobConf; this.scratchDir = scratchDir; + this.jobConfClone = this.jobConf; } public SparkPlan generate(SparkWork sparkWork) throws Exception { @@ -65,7 +67,7 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { assert(roots != null && roots.size() == 1); BaseWork w = roots.iterator().next(); MapWork mapWork = (MapWork) w; - trans.add(generate(w)); + trans.add(generate(mapWork)); while (sparkWork.getChildren(w).size() > 0) { ReduceWork child = (ReduceWork) sparkWork.getChildren(w).get(0); SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child); @@ -86,7 +88,6 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { private JavaPairRDD generateRDD(MapWork mapWork) throws Exception { List inputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, context, false); Utilities.setInputPaths(jobConf, inputPaths); - Utilities.setMapWork(jobConf, mapWork, scratchDir, true); Class ifClass = HiveInputFormat.class; // The mapper class is expected by the HiveInputFormat. @@ -119,7 +120,7 @@ private SparkTran generate(BaseWork bw) throws IOException, HiveException { private MapTran generate(MapWork mw) throws IOException { MapTran result = new MapTran(); - Utilities.setMapWork(jobConf, mw, scratchDir, true); + Utilities.setMapWork(jobConf, mw, scratchDir, false); Utilities.createTmpDirs(jobConf, mw); jobConf.set("mapred.mapper.class", ExecMapper.class.getName()); byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); @@ -137,11 +138,16 @@ private SparkShuffler generate(SparkEdgeProperty edge) { private ReduceTran generate(ReduceWork rw) throws IOException { ReduceTran result = new ReduceTran(); - Utilities.setReduceWork(jobConf, rw, scratchDir, true); - Utilities.createTmpDirs(jobConf, rw); - byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); - HiveReduceFunction mapFunc = new HiveReduceFunction(confBytes); - result.setReduceFunction(mapFunc); + // Clone jobConf for each ReduceWork so we can have multiple of them + JobConf newJobConf = new JobConf(jobConfClone); + // Make sure we'll use a different plan path from the original one + HiveConf.setVar(newJobConf, HiveConf.ConfVars.PLAN, ""); + Utilities.setReduceWork(newJobConf, rw, scratchDir, false); + Utilities.createTmpDirs(newJobConf, rw); + jobConfClone = newJobConf; + byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf); + HiveReduceFunction redFunc = new HiveReduceFunction(confBytes); + result.setReduceFunction(redFunc); return result; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java index beeeab8..f22870a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java @@ -28,7 +28,7 @@ private long edgeType; private int numPartitions; - + public SparkEdgeProperty(long edgeType, int numPartitions) { this.edgeType = edgeType; this.numPartitions = numPartitions;