diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index ee42f4c..8d6a8e94 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -31,7 +31,6 @@ import java.io.DataInput; import java.io.EOFException; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -236,8 +235,13 @@ private Utilities() { // prevent instantiation } - private static Map gWorkMap = Collections - .synchronizedMap(new HashMap()); + private static ThreadLocal> gWorkMap = + new ThreadLocal>() { + protected Map initialValue() { + return new HashMap(); + } + }; + private static final String CLASS_NAME = Utilities.class.getName(); private static final Log LOG = LogFactory.getLog(CLASS_NAME); @@ -344,7 +348,7 @@ public static void cacheBaseWork(Configuration conf, String name, BaseWork work, */ public static void setBaseWork(Configuration conf, String name, BaseWork work) { Path path = getPlanPath(conf, name); - gWorkMap.put(path, work); + gWorkMap.get().put(path, work); } /** @@ -356,15 +360,14 @@ public static void setBaseWork(Configuration conf, String name, BaseWork work) { * @throws RuntimeException if the configuration files are not proper or if plan can not be loaded */ private static BaseWork getBaseWork(Configuration conf, String name) { - BaseWork gWork = null; Path path = null; InputStream in = null; try { path = getPlanPath(conf, name); LOG.info("PLAN PATH = " + path); assert path != null; - if (!gWorkMap.containsKey(path) || - HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + BaseWork gWork = gWorkMap.get().get(path); + if (gWork == null) { Path localPath; if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) { localPath = new Path(name); @@ -414,10 +417,9 @@ private static BaseWork getBaseWork(Configuration conf, String name) { } else if (name.contains(MERGE_PLAN_NAME)) { gWork = deserializePlan(in, MapWork.class, conf); } - gWorkMap.put(path, gWork); - } else { + gWorkMap.get().put(path, gWork); + } else if (LOG.isDebugEnabled()) { LOG.debug("Found plan in cache for name: " + name); - gWork = gWorkMap.get(path); } return gWork; } catch (FileNotFoundException fnf) { @@ -709,7 +711,7 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch } // Cache the plan in this process - gWorkMap.put(planPath, w); + gWorkMap.get().put(planPath, w); return planPath; } catch (Exception e) { String msg = "Error caching " + name + ": " + e; @@ -3628,15 +3630,15 @@ public static void clearWorkMapForConf(Configuration conf) { Path mapPath = getPlanPath(conf, MAP_PLAN_NAME); Path reducePath = getPlanPath(conf, REDUCE_PLAN_NAME); if (mapPath != null) { - gWorkMap.remove(mapPath); + gWorkMap.get().remove(mapPath); } if (reducePath != null) { - gWorkMap.remove(reducePath); + gWorkMap.get().remove(reducePath); } } public static void clearWorkMap() { - gWorkMap.clear(); + gWorkMap.get().clear(); } /**