diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 716a6b6..f3b14c5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -96,14 +96,20 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { workToTranMap.clear(); workToParentWorkTranMap.clear(); - for (BaseWork work : sparkWork.getAllWork()) { - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); - SparkTran tran = generate(work); - SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work); - sparkPlan.addTran(tran); - sparkPlan.connect(parentTran, tran); - workToTranMap.put(work, tran); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); + try { + for (BaseWork work : sparkWork.getAllWork()) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); + SparkTran tran = generate(work); + SparkTran parentTran = generateParentTran(sparkPlan, sparkWork, work); + sparkPlan.addTran(tran); + sparkPlan.connect(parentTran, tran); + workToTranMap.put(work, tran); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_CREATE_TRAN + work.getName()); + } + } finally { + // clear all ThreadLocal cached MapWork/ReduceWork after plan generation + // as this may executed in a pool thread. + Utilities.clearWorkMap(); } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_BUILD_PLAN); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 6710c14..1de7e40 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -580,6 +580,10 @@ public int hashCode() { if (oldPaths != null) { job.set(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname, oldPaths); } + + // clear work from ThreadLocal after splits generated in case of thread is reused in pool. + Utilities.clearWorkMapForConf(job); + LOG.info("Number of all splits " + result.size()); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return result.toArray(new InputSplit[result.size()]);