diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 61b3a2f..7ef49c2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -67,6 +67,8 @@ private Map cloneToWork; private final Map workToTranMap; private final Map workToParentWorkTranMap; + // a map from each BaseWork to its cloned JobConf + private final Map workToJobConf; public SparkPlanGenerator( JavaSparkContext sc, @@ -82,6 +84,7 @@ public SparkPlanGenerator( this.workToTranMap = new HashMap(); this.workToParentWorkTranMap = new HashMap(); this.sparkReporter = sparkReporter; + this.workToJobConf = new HashMap(); } public SparkPlan generate(SparkWork sparkWork) throws Exception { @@ -211,6 +214,9 @@ private SparkTran generate(BaseWork work) throws Exception { } private JobConf cloneJobConf(BaseWork work) throws Exception { + if (workToJobConf.containsKey(work)) { + return workToJobConf.get(work); + } JobConf cloned = new JobConf(jobConf); // Make sure we'll use a different plan path from the original one HiveConf.setVar(cloned, HiveConf.ConfVars.PLAN, ""); @@ -238,6 +244,8 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { } else { cloned.set(Utilities.MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); } + // remember the JobConf cloned for each MapWork, so we won't clone for it again + workToJobConf.put(work, cloned); } else if (work instanceof ReduceWork) { Utilities.setReduceWork(cloned, (ReduceWork) work, scratchDir, false); Utilities.createTmpDirs(cloned, (ReduceWork) work);