diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index baa0967..1c18656 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1883,6 +1883,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) {
HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr", new StringSet("mr", "tez", "spark"),
"Chooses execution engine. Options are: mr (Map reduce, default), tez (hadoop 2 only), spark"),
+
+ HIVE_EXECUTION_MODE("hive.execution.mode", "container", new StringSet("container", "llap"),
+ "Chooses whether query fragments will run in container or in llap"),
+
HIVE_JAR_DIRECTORY("hive.jar.directory", null,
"This is the location hive in tez mode will look for to find a site wide \n" +
"installed hive instance."),
diff --git pom.xml pom.xml
index b478778..766e0ff 100644
--- pom.xml
+++ pom.xml
@@ -154,7 +154,7 @@
1.0.1
1.7.5
4.0.4
- 0.5.2
+ 0.7.0-SNAPSHOT
2.2.0
1.2.0
2.10
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
index 851ea1b..fd74ee6 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
@@ -26,6 +26,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
@@ -85,6 +86,8 @@
private final transient LongWritable recordCounter = new LongWritable();
protected transient long numRows = 0;
protected transient long cntr = 1;
+ private final Map connectedOperators
+ = new TreeMap();
// input path --> {operator --> context}
private final Map, MapOpCtx>> opCtxMap =
@@ -620,7 +623,7 @@ public OperatorType getType() {
@Override
public Map getTagToOperatorTree() {
- return MapRecordProcessor.getConnectOps();
+ return connectedOperators;
}
public void initializeContexts() {
@@ -634,4 +637,12 @@ public Deserializer getCurrentDeserializer() {
return currentCtxs[0].deserializer;
}
+
+ public void clearConnectedOperators() {
+ connectedOperators.clear();
+ }
+
+ public void setConnectedOperators(int tag, DummyStoreOperator dummyOp) {
+ connectedOperators.put(tag, dummyOp);
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
index 271d943..c099668 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
@@ -35,7 +35,8 @@ private ObjectCacheFactory() {
* Returns the appropriate cache
*/
public static ObjectCache getCache(Configuration conf) {
- if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") &&
+ HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("container")) {
return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache();
} else {
return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 9ed2c61..a812c0d 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -365,7 +365,7 @@ private static BaseWork getBaseWork(Configuration conf, String name) {
LOG.info("PLAN PATH = " + path);
assert path != null;
if (!gWorkMap.containsKey(path)
- || HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ || !HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("mr")) {
Path localPath;
if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) {
localPath = new Path(name);
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index bc7603e..f47a626 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -79,8 +79,6 @@
protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
private MapWork mapWork;
List mergeWorkList = null;
- private static Map connectOps =
- new TreeMap();
public MapRecordProcessor(JobConf jconf) throws Exception {
ObjectCache cache = ObjectCacheFactory.getCache(jconf);
@@ -157,7 +155,7 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep
mapOp = new MapOperator();
}
- connectOps.clear();
+ mapOp.clearConnectedOperators();
if (mergeWorkList != null) {
MapOperator mergeMapOp = null;
for (MapWork mergeMapWork : mergeWorkList) {
@@ -176,7 +174,7 @@ void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrRep
mergeMapOp.setChildren(jconf);
if (foundCachedMergeWork == false) {
DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp);
- connectOps.put(mergeMapWork.getTag(), dummyOp);
+ mapOp.setConnectedOperators(mergeMapWork.getTag(), dummyOp);
}
mergeMapOp.setExecContext(new ExecMapperContext(jconf));
mergeMapOp.initializeLocalWork(jconf);
@@ -338,10 +336,6 @@ void close(){
}
}
- public static Map getConnectOps() {
- return connectOps;
- }
-
private MRInputLegacy getMRInput(Map inputs) throws Exception {
// there should be only one MRInput
MRInputLegacy theMRInput = null;
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index aa80510..132b9b7 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -68,7 +68,7 @@
private boolean abort = false;
- private static Deserializer inputKeyDeserializer;
+ private Deserializer inputKeyDeserializer;
// Input value serde needs to be an array to support different SerDe
// for different tags
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
index 04c9644..bcc14a8 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
@@ -36,16 +36,19 @@
*/
public class IOContext {
- /**
- * Spark uses this thread local
- */
- private static final ThreadLocal threadLocal = new ThreadLocal(){
- @Override
- protected synchronized IOContext initialValue() { return new IOContext(); }
- };
+ public static String DEFAULT_CONTEXT = "";
+
+ private static final ThreadLocal