diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 56289c9..0a6ee20 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2853,7 +2853,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_ALLOW_PERMANENT_FNS("hive.llap.allow.permanent.fns", true, "Whether LLAP decider should allow permanent UDFs."), LLAP_EXECUTION_MODE("hive.llap.execution.mode", "none", - new StringSet("auto", "none", "all", "map"), + new StringSet("auto", "none", "all", "map", "llap_only"), "Chooses whether query fragments will run in container or in llap"), LLAP_OBJECT_CACHE_ENABLED("hive.llap.object.cache.enabled", true, "Cache objects (plans, hashtables, etc) in llap"), diff --git data/conf/llap/hive-site.xml data/conf/llap/hive-site.xml index d93f8b7..05ab6ee 100644 --- data/conf/llap/hive-site.xml +++ data/conf/llap/hive-site.xml @@ -254,10 +254,11 @@ llap - + hive.tez.java.opts diff --git itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index d06c869..257dc28 100644 --- itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -224,7 +224,7 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM switch (miniClusterType) { case TEZ: // TODO: This should be making use of confDir to load configs setup for Tez, etc. - mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 2, uriString); + mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 2, uriString, false); break; case LLAP: if (usePortsFromConf) { @@ -232,7 +232,7 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM } llapCluster = LlapItUtils.startAndGetMiniLlapCluster(hiveConf, null, null); - mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 2, uriString); + mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 2, uriString, true); break; case MR: mr = ShimLoader.getHadoopShims().getMiniMrCluster(hiveConf, 2, uriString, 1); diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index bfb9645..adb2c0e 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -662,7 +662,8 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws if (EnumSet.of(MiniClusterType.llap_local, MiniClusterType.tez_local).contains(clusterType)) { mr = shims.getLocalMiniTezCluster(conf, clusterType == MiniClusterType.llap_local); } else { - mr = shims.getMiniTezCluster(conf, numTrackers, uriString); + mr = shims.getMiniTezCluster(conf, numTrackers, uriString, + EnumSet.of(MiniClusterType.llap, MiniClusterType.llap_local).contains(clusterType)); } } else if (clusterType == MiniClusterType.miniSparkOnYarn) { mr = shims.getMiniSparkCluster(conf, 2, uriString, 1); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index f46e2d7..c7b855e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.all; import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.auto; +import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.llap_only; import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.map; import static org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode.none; @@ -94,8 +95,9 @@ public enum LlapMode { map, // map operators only - all, // all operators + all, // all operators. Launch containers if user code etc prevents running inside llap. none, // no operators + llap_only, // Try running everything in llap, fail if that is not possible (non blessed user code, script, etc) auto // please hive, choose for me } @@ -107,6 +109,7 @@ private final boolean arePermanentFnsAllowed; private final boolean shouldUber; private List mapJoinOpList; + private final Map rules; public LlapDecisionDispatcher(PhysicalContext pctx, LlapMode mode) { conf = pctx.getConf(); @@ -115,6 +118,7 @@ public LlapDecisionDispatcher(PhysicalContext pctx, LlapMode mode) { // Don't user uber in "all" mode - everything can go into LLAP, which is better than uber. shouldUber = HiveConf.getBoolVar(conf, ConfVars.LLAP_AUTO_ALLOW_UBER) && (mode != all); mapJoinOpList = new ArrayList(); + rules = getRules(); } @Override @@ -175,6 +179,7 @@ private boolean evaluateWork(TezWork tezWork, BaseWork work) return false; } + // first we check if we *can* run in llap. If we need to use // user code to do so (script/udf) we don't. /*if (work instanceof MapWork && ((MapWork)work).isUseOneNullRowInputFormat()) { @@ -184,6 +189,11 @@ private boolean evaluateWork(TezWork tezWork, BaseWork work) if (!evaluateOperators(work)) { LOG.info("some operators cannot be run in llap"); + if (mode == llap_only) { + throw new RuntimeException("Cannot run all parts of query in llap. Failing since " + + ConfVars.LLAP_EXECUTION_MODE.varname + " is set to " + llap_only.name()); + } + return false; } @@ -388,7 +398,7 @@ public Object process(Node n, Stack s, NodeProcessorCtx c, Object... os) { private boolean evaluateOperators(BaseWork work) throws SemanticException { // lets take a look at the operators. we're checking for user // code in those. we will not run that in llap. - Dispatcher disp = new DefaultRuleDispatcher(null, getRules(), null); + Dispatcher disp = new DefaultRuleDispatcher(null, rules, null); GraphWalker ogw = new DefaultGraphWalker(disp); ArrayList topNodes = new ArrayList(); @@ -400,7 +410,6 @@ private boolean evaluateOperators(BaseWork work) throws SemanticException { for (Node n : nodeOutput.keySet()) { if (nodeOutput.get(n) != null) { if (!((Boolean)nodeOutput.get(n))) { - LOG.info("Cannot run in LLAP mode."); return false; } } diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index e56463b..73feec6 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -364,8 +364,10 @@ public void setupConfiguration(Configuration conf) { // conf.set("fs.defaultFS", "file:///"); // conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, "/tmp"); - if (!isLlap) { + if (!isLlap) { // Conf for non-llap conf.setBoolean("hive.llap.io.enabled", false); + } else { // Conf for llap + conf.set("hive.llap.execution.mode", "llap_only"); } } @@ -380,8 +382,8 @@ public void shutdown() throws IOException { */ @Override public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode) throws IOException { - return new MiniTezShim(conf, numberOfTaskTrackers, nameNode); + String nameNode, boolean usingLlap) throws IOException { + return new MiniTezShim(conf, numberOfTaskTrackers, nameNode, usingLlap); } /** @@ -391,8 +393,10 @@ public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers private final MiniTezCluster mr; private final Configuration conf; + private final boolean isLlap; - public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode) throws IOException { + public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode, + boolean usingLlap) throws IOException { mr = new MiniTezCluster("hive", numberOfTaskTrackers); conf.setInt(YarnConfiguration.YARN_MINICLUSTER_NM_PMEM_MB, 512); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); @@ -410,6 +414,7 @@ public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode mr.init(conf); mr.start(); this.conf = mr.getConfig(); + this.isLlap = usingLlap; } @Override @@ -442,6 +447,9 @@ public void setupConfiguration(Configuration conf) { conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 24); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB, 10); conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.4f); + if (isLlap) { + conf.set("hive.llap.execution.mode", "all"); + } } } diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index f987814..9c6901d 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -90,7 +90,7 @@ public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException; public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode) throws IOException; + String nameNode, boolean usingLlap) throws IOException; public MiniMrShim getLocalMiniTezCluster(Configuration conf, boolean usingLlap);