diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index d565d01..fe6c2e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -265,15 +265,31 @@ public static MapredWork getMapRedWork(Configuration conf) { return w; } + public static void setMapWork(Configuration conf, MapWork work) { + setBaseWork(conf, MAP_PLAN_NAME, work); + } + public static MapWork getMapWork(Configuration conf) { return (MapWork) getBaseWork(conf, MAP_PLAN_NAME); } + public static void setReduceWork(Configuration conf, ReduceWork work) { + setBaseWork(conf, REDUCE_PLAN_NAME, work); + } + public static ReduceWork getReduceWork(Configuration conf) { return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME); } /** + * Pushes work into the global work map + */ + public static void setBaseWork(Configuration conf, String name, BaseWork work) { + Path path = getPlanPath(conf, name); + gWorkMap.put(path, work); + } + + /** * Returns the Map or Reduce plan * Side effect: the BaseWork returned is also placed in the gWorkMap * @param conf @@ -288,8 +304,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { try { path = getPlanPath(conf, name); assert path != null; - gWork = gWorkMap.get(path); - if (gWork == null) { + if (!gWorkMap.containsKey(path)) { Path localPath; if (ShimLoader.getHadoopShims().isLocalMode(conf)) { localPath = path; @@ -334,6 +349,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { gWorkMap.put(path, gWork); } else { LOG.debug("Found plan in cache."); + gWork = gWorkMap.get(path); } return gWork; } catch (FileNotFoundException fnf) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 9361f7a..2f5f60c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -110,6 +110,8 @@ public void configure(JobConf job) { if (mrwork == null) { mrwork = Utilities.getMapWork(job); cache.cache(PLAN_KEY, mrwork); + } else { + Utilities.setMapWork(job, mrwork); } if (mrwork.getVectorMode()) { mo = new VectorMapOperator(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index 2447812..20d3ca0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -122,6 +122,8 @@ public void configure(JobConf job) { if (gWork == null) { gWork = Utilities.getReduceWork(job); cache.cache(PLAN_KEY, gWork); + } else { + Utilities.setReduceWork(job, gWork); } reducer = gWork.getReducer(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index e498b8f..380f21d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -95,6 +95,8 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in for (String s: mapWork.getAliases()) { l4j.info("Alias: "+s); } + } else { + Utilities.setMapWork(jconf, mapWork); } if (mapWork.getVectorMode()) { mapOp = new VectorMapOperator(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java index 6d7977d..318ba8e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java @@ -36,14 +36,16 @@ @Override public void cache(String key, Object value) { - LOG.info("Adding " + key + " to cache"); + LOG.info("Adding " + key + " to cache with value " + value); registry.add(ObjectLifeCycle.VERTEX, key, value); } @Override public Object retrieve(String key) { Object o = registry.get(key); - LOG.info("Found " + key + " in cache with value: " + o); + if (o != null) { + LOG.info("Found " + key + " in cache with value: " + o); + } return o; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index a650d64..534022b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -104,6 +104,8 @@ void init(JobConf jconf, MRTaskReporter mrReporter, Map in if (redWork == null) { redWork = Utilities.getReduceWork(jconf); cache.cache(REDUCE_PLAN_KEY, redWork); + } else { + Utilities.setReduceWork(jconf, redWork); } reducer = redWork.getReducer();