diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index e2fce32..f464444 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -874,7 +874,7 @@ public String cliInit(String tname, boolean recreate) throws Exception { HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, "org.apache.hadoop.hive.ql.security.DummyAuthenticator"); - Utilities.clearWorkMap(); + Utilities.clearWorkMap(conf); if (QTestUtil.clusterType == MiniClusterType.tezlocal) { conf.setBoolean("tez.local.mode", true); conf.set("fs.defaultFS", "file:///"); @@ -937,6 +937,7 @@ public String cliInit(String tname, boolean recreate) throws Exception { private CliSessionState createSessionState() { return new CliSessionState(conf) { + @Override public void setSparkSession(SparkSession sparkSession) { super.setSparkSession(sparkSession); if (sparkSession != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java new file mode 100644 index 0000000..c381c77 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GlobalWorkMapFactory.java @@ -0,0 +1,106 @@ +package org.apache.hadoop.hive.ql.exec; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.llap.io.api.LlapIoProxy; + +public class GlobalWorkMapFactory { + + private ThreadLocal> threadLocalWorkMap = null; + + private Map gWorkMap = null; + + private static class DummyMap implements Map { + @Override + public void clear() { + } + + @Override + public boolean containsKey(final Object key) { + return false; + } + + @Override + public boolean containsValue(final Object value) { + return false; + } + + @Override + public Set> entrySet() { + return null; + } + + @Override + public V get(final Object key) { + return null; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public Set keySet() { + return null; + } + + @Override + public V put(final K key, final V value) { + return null; + } + + @Override + public void putAll(final Map t) { + } + + @Override + public V remove(final Object key) { + return null; + } + + @Override + public int size() { + return 0; + } + + @Override + public Collection values() { + return null; + } + } + + DummyMap dummy = new DummyMap(); + + public Map get(Configuration conf) { + if (LlapIoProxy.isDaemon()) { + return dummy; + } + + if (HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + if (threadLocalWorkMap == null) { + threadLocalWorkMap = new ThreadLocal>() { + @Override + protected Map initialValue() { + return new HashMap(); + } + }; + } + return threadLocalWorkMap.get(); + } + + if (gWorkMap == null) { + gWorkMap = new HashMap(); + } + return gWorkMap; + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index f99aaa8..9cdd0f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -170,6 +170,17 @@ protected final void completeInitializationOp(Object[] os) throws HiveException hashTblInitedOnce = true; } + alias = (byte) conf.getPosBigTable(); + if (hashMapRowGetters == null) { + hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length]; + MapJoinKey refKey = getRefKey(alias); + for (byte pos = 0; pos < order.length; pos++) { + if (pos != alias) { + hashMapRowGetters[pos] = mapJoinTables[pos].createGetter(refKey); + } + } + } + if (this.getExecContext() != null) { // reset exec context so that initialization of the map operator happens // properly @@ -287,17 +298,6 @@ protected MapJoinKey getRefKey(byte alias) { @Override public void process(Object row, int tag) throws HiveException { try { - alias = (byte) tag; - if (hashMapRowGetters == null) { - hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length]; - MapJoinKey refKey = getRefKey(alias); - for (byte pos = 0; pos < order.length; pos++) { - if (pos != alias) { - hashMapRowGetters[pos] = mapJoinTables[pos].createGetter(refKey); - } - } - } - // compute keys and values as StandardObjects ReusableGetAdaptor firstSetKey = null; int fieldCount = joinKeys[alias].size(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 453ac4a..8a3520f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -240,13 +240,7 @@ private Utilities() { // prevent instantiation } - private static ThreadLocal> gWorkMap = - new ThreadLocal>() { - @Override - protected Map initialValue() { - return new HashMap(); - } - }; + private static GlobalWorkMapFactory gWorkMap = new GlobalWorkMapFactory(); private static final String CLASS_NAME = Utilities.class.getName(); private static final Log LOG = LogFactory.getLog(CLASS_NAME); @@ -354,7 +348,7 @@ public static void cacheBaseWork(Configuration conf, String name, BaseWork work, */ public static void setBaseWork(Configuration conf, String name, BaseWork work) { Path path = getPlanPath(conf, name); - gWorkMap.get().put(path, work); + gWorkMap.get(conf).put(path, work); } /** @@ -384,7 +378,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { path = getPlanPath(conf, name); LOG.info("PLAN PATH = " + path); assert path != null; - BaseWork gWork = gWorkMap.get().get(path); + BaseWork gWork = gWorkMap.get(conf).get(path); if (gWork == null) { Path localPath; if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) { @@ -435,7 +429,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { } else if (name.contains(MERGE_PLAN_NAME)) { gWork = deserializePlan(in, MapWork.class, conf); } - gWorkMap.get().put(path, gWork); + gWorkMap.get(conf).put(path, gWork); } else if (LOG.isDebugEnabled()) { LOG.debug("Found plan in cache for name: " + name); } @@ -729,7 +723,7 @@ private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratch } // Cache the plan in this process - gWorkMap.get().put(planPath, w); + gWorkMap.get(conf).put(planPath, w); return planPath; } catch (Exception e) { String msg = "Error caching " + name + ": " + e; @@ -3663,15 +3657,15 @@ public static void clearWorkMapForConf(Configuration conf) { Path mapPath = getPlanPath(conf, MAP_PLAN_NAME); Path reducePath = getPlanPath(conf, REDUCE_PLAN_NAME); if (mapPath != null) { - gWorkMap.get().remove(mapPath); + gWorkMap.get(conf).remove(mapPath); } if (reducePath != null) { - gWorkMap.get().remove(reducePath); + gWorkMap.get(conf).remove(reducePath); } } - public static void clearWorkMap() { - gWorkMap.get().clear(); + public static void clearWorkMap(Configuration conf) { + gWorkMap.get(conf).clear(); } /** @@ -3804,10 +3798,10 @@ public static boolean isDefaultNameNode(HiveConf conf) { /** * Returns the full path to the Jar containing the class. It always return a JAR. - * + * * @param klass * class. - * + * * @return path to the Jar containing the class. */ @SuppressWarnings("rawtypes") diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index 5999265..1196ae8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -211,7 +211,7 @@ public void close() { } } finally { MapredContext.close(); - Utilities.clearWorkMap(); + Utilities.clearWorkMap(jc); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index f586d3b..6b13ea5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -159,6 +159,7 @@ public void configure(JobConf job) { } } + @Override public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { if (reducer.getDone()) { @@ -288,7 +289,7 @@ public void close() { } } finally { MapredContext.close(); - Utilities.clearWorkMap(); + Utilities.clearWorkMap(jc); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index c7c8146..0b08f1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -195,7 +195,7 @@ public void close() { } } finally { MapredContext.close(); - Utilities.clearWorkMap(); + Utilities.clearWorkMap(jc); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index d4c1973..e778b2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -440,7 +440,7 @@ public void close() { } } finally { MapredContext.close(); - Utilities.clearWorkMap(); + Utilities.clearWorkMap(jc); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 24203b9..fa36435 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -79,7 +79,7 @@ private int position; MRInputLegacy legacyMRInput; MultiMRInput mainWorkMultiMRInput; - private ExecMapperContext execContext; + private final ExecMapperContext execContext; private boolean abort; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; @@ -353,7 +353,7 @@ void close(){ throw new RuntimeException("Hive Runtime Error while closing operators", e); } } finally { - Utilities.clearWorkMap(); + Utilities.clearWorkMap(jconf); MapredContext.close(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java index 8a8fe55..42893c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java @@ -179,7 +179,7 @@ void close() { e); } } finally { - Utilities.clearWorkMap(); + Utilities.clearWorkMap(jconf); MapredContext.close(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index b19cb93..49b039d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -241,7 +241,7 @@ void close(){ + e.getMessage(), e); } } finally { - Utilities.clearWorkMap(); + Utilities.clearWorkMap(jconf); MapredContext.close(); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 2944d3c..12e966b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -1245,8 +1245,8 @@ JobConf createMockExecutionEnvironment(Path workDir, boolean isVectorized, int partitions ) throws IOException { - Utilities.clearWorkMap(); JobConf conf = new JobConf(); + Utilities.clearWorkMap(conf); conf.set("hive.exec.plan", workDir.toString()); conf.set("mapred.job.tracker", "local"); conf.set("hive.vectorized.execution.enabled", Boolean.toString(isVectorized));