diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index baa0967..1c18656 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1883,6 +1883,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr", new StringSet("mr", "tez", "spark"), "Chooses execution engine. Options are: mr (Map reduce, default), tez (hadoop 2 only), spark"), + + HIVE_EXECUTION_MODE("hive.execution.mode", "container", new StringSet("container", "llap"), + "Chooses whether query fragments will run in container or in llap"), + HIVE_JAR_DIRECTORY("hive.jar.directory", null, "This is the location hive in tez mode will look for to find a site wide \n" + "installed hive instance."), diff --git pom.xml pom.xml index b478778..766e0ff 100644 --- pom.xml +++ pom.xml @@ -154,7 +154,7 @@ 1.0.1 1.7.5 4.0.4 - 0.5.2 + 0.7.0-SNAPSHOT 2.2.0 1.2.0 2.10 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index 851ea1b..fd74ee6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -26,6 +26,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; @@ -85,6 +86,8 @@ private final transient LongWritable recordCounter = new LongWritable(); protected transient long numRows = 0; protected transient long cntr = 1; + private final Map connectedOperators + = new TreeMap(); // input path --> {operator --> context} private final Map, MapOpCtx>> opCtxMap = @@ -620,7 +623,7 @@ public OperatorType getType() { @Override public Map getTagToOperatorTree() { - return MapRecordProcessor.getConnectOps(); + return connectedOperators; } public void initializeContexts() { @@ -634,4 +637,12 @@ public Deserializer getCurrentDeserializer() { return currentCtxs[0].deserializer; } + + public void clearConnectedOperators() { + connectedOperators.clear(); + } + + public void setConnectedOperators(int tag, DummyStoreOperator dummyOp) { + connectedOperators.put(tag, dummyOp); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java index 271d943..c099668 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java @@ -35,7 +35,8 @@ private ObjectCacheFactory() { * Returns the appropriate cache */ public static ObjectCache getCache(Configuration conf) { - if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") && + HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("container")) { return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache(); } else { return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 9ed2c61..a812c0d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -365,7 +365,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) { LOG.info("PLAN PATH = " + path); assert path != null; if (!gWorkMap.containsKey(path) - || HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + || !HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("mr")) { Path localPath; if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) { localPath = new Path(name); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index bc7603e..f47a626 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -79,8 +79,6 @@ protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; List mergeWorkList = null; - private static Map connectOps = - new TreeMap(); public MapRecordProcessor(JobConf jconf) throws Exception { ObjectCache cache = ObjectCacheFactory.getCache(jconf); @@ -157,7 +155,7 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep mapOp = new MapOperator(); } - connectOps.clear(); + mapOp.clearConnectedOperators(); if (mergeWorkList != null) { MapOperator mergeMapOp = null; for (MapWork mergeMapWork : mergeWorkList) { @@ -176,7 +174,7 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep mergeMapOp.setChildren(jconf); if (foundCachedMergeWork == false) { DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp); - connectOps.put(mergeMapWork.getTag(), dummyOp); + mapOp.setConnectedOperators(mergeMapWork.getTag(), dummyOp); } mergeMapOp.setExecContext(new ExecMapperContext(jconf)); mergeMapOp.initializeLocalWork(jconf); @@ -338,10 +336,6 @@ void close(){ } } - public static Map getConnectOps() { - return connectOps; - } - private MRInputLegacy getMRInput(Map inputs) throws Exception { // there should be only one MRInput MRInputLegacy theMRInput = null; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index aa80510..132b9b7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -68,7 +68,7 @@ private boolean abort = false; - private static Deserializer inputKeyDeserializer; + private Deserializer inputKeyDeserializer; // Input value serde needs to be an array to support different SerDe // for different tags diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index 04c9644..bcc14a8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -36,16 +36,19 @@ */ public class IOContext { - /** - * Spark uses this thread local - */ - private static final ThreadLocal threadLocal = new ThreadLocal(){ - @Override - protected synchronized IOContext initialValue() { return new IOContext(); } - }; + public static String DEFAULT_CONTEXT = ""; + + private static final ThreadLocal> threadLocal = new ThreadLocal>() { + @Override + protected synchronized Map initialValue() { + Map map = new HashMap(); + map.put(DEFAULT_CONTEXT, new IOContext()); + return map; + } + }; private static IOContext get() { - return IOContext.threadLocal.get(); + return IOContext.threadLocal.get().get(DEFAULT_CONTEXT); } /** @@ -53,12 +56,14 @@ private static IOContext get() { */ private static final Map inputNameIOContextMap = new HashMap(); - public static IOContext get(Configuration conf) { - if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { - return get(); - } String inputName = conf.get(Utilities.INPUT_NAME); + Map inputNameIOContextMap = threadLocal.get(); + + if (inputName == null) { + inputName = DEFAULT_CONTEXT; + } + if (!inputNameIOContextMap.containsKey(inputName)) { IOContext ioContext = new IOContext(); inputNameIOContextMap.put(inputName, ioContext); @@ -68,8 +73,7 @@ public static IOContext get(Configuration conf) { } public static void clear() { - IOContext.threadLocal.remove(); - inputNameIOContextMap.clear(); + threadLocal.remove(); } private long currentBlockStart;