diff --git a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 32a0ae8..6e0986c 100644
--- a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -181,7 +181,7 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM
// Initialize the execution engine based on cluster type
switch (miniClusterType) {
case TEZ:
- mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString);
+ mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString, false, null);
break;
case MR:
mr = ShimLoader.getHadoopShims().getMiniMrCluster(hiveConf, 4, uriString, 1);
diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml
index 6c2be68..c276db9 100644
--- a/itests/qtest/pom.xml
+++ b/itests/qtest/pom.xml
@@ -97,19 +97,6 @@
test
- org.apache.hive
- hive-llap-client
- ${project.version}
- test
-
-
- org.apache.hive
- hive-llap-server
- ${project.version}
- test-jar
- test
-
-
org.apache.hadoop
hadoop-yarn-registry
${hadoop-23.version}
@@ -270,6 +257,19 @@
test
+ org.apache.hive
+ hive-llap-server
+ ${project.version}
+ test
+
+
+ org.apache.hive
+ hive-llap-server
+ ${project.version}
+ test-jar
+ test
+
+
commons-logging
commons-logging
${commons-logging.version}
diff --git a/itests/util/pom.xml b/itests/util/pom.xml
index d641189..0743f01 100644
--- a/itests/util/pom.xml
+++ b/itests/util/pom.xml
@@ -54,20 +54,6 @@
hive-common
${project.version}
-
-
- org.apache.hive
- hive-llap-server
- ${project.version}
-
-
-
- org.apache.hive
- hive-llap-server
- test-jar
- ${project.version}
-
-
org.apache.hive
hive-cli
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 520f0ec..e460d86 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -45,10 +45,8 @@
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
@@ -60,7 +58,6 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -73,9 +70,6 @@
import org.apache.hadoop.hive.common.io.SortPrintStream;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
@@ -154,7 +148,6 @@
private HadoopShims.MiniMrShim mr = null;
private HadoopShims.MiniDFSShim dfs = null;
private HadoopShims.HdfsEncryptionShim hes = null;
- private MiniLlapCluster llapCluster = null;
private final boolean miniMr = false;
private String hadoopVer = null;
private QTestSetup setup = null;
@@ -409,48 +402,15 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType,
String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString());
if (clusterType == MiniClusterType.tez) {
- mr = shims.getMiniTezCluster(conf, 4, uriString);
+ mr = shims.getMiniTezCluster(conf, 4, uriString, false, null);
} else if (clusterType == MiniClusterType.llap) {
- Configuration daemonConf;
+ final String daemonConfURL;
if (confDir != null && !confDir.isEmpty()) {
- URL llapDaemonConfURL = new URL("file://"
- + new File(confDir).toURI().getPath() + "/llap-daemon-site.xml");
- daemonConf = new LlapConfiguration(conf, llapDaemonConfURL);
+ daemonConfURL = "file://" + new File(confDir).toURI().getPath() + "/llap-daemon-site.xml";
} else {
- daemonConf = new LlapConfiguration(conf);
+ daemonConfURL = null;
}
- final String clusterName = "llap";
- final long maxMemory = LlapDaemon.getTotalHeapSize();
- // 15% for io cache
- final long memoryForCache = (long) (0.15f * maxMemory);
- // 75% for 4 executors
- final long totalExecutorMemory = (long) (0.75f * maxMemory);
- final int numExecutors = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
- LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT);
- final boolean asyncIOEnabled = true;
- // enabling this will cause test failures in Mac OS X
- final boolean directMemoryEnabled = false;
- final int numLocalDirs = 1;
- LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + " memoryForCache: " + memoryForCache
- + " totalExecutorMemory: " + totalExecutorMemory + " numExecutors: " + numExecutors
- + " asyncIOEnabled: " + asyncIOEnabled + " directMemoryEnabled: " + directMemoryEnabled
- + " numLocalDirs: " + numLocalDirs);
- llapCluster = MiniLlapCluster.create(clusterName,
- numExecutors,
- totalExecutorMemory,
- asyncIOEnabled,
- directMemoryEnabled,
- memoryForCache,
- numLocalDirs);
- llapCluster.init(daemonConf);
- llapCluster.start();
- Configuration llapConf = llapCluster.getClusterSpecificConfiguration();
- Iterator> confIter = llapConf.iterator();
- while (confIter.hasNext()) {
- Entry entry = confIter.next();
- conf.set(entry.getKey(), entry.getValue());
- }
- mr = shims.getMiniTezCluster(conf, 2, uriString);
+ mr = shims.getMiniTezCluster(conf, 2, uriString, true, daemonConfURL);
} else if (clusterType == MiniClusterType.miniSparkOnYarn) {
mr = shims.getMiniSparkCluster(conf, 4, uriString, 1);
} else {
@@ -501,10 +461,6 @@ public void shutdown() throws Exception {
sparkSession = null;
}
}
- if (llapCluster != null) {
- llapCluster.stop();
- llapCluster = null;
- }
if (mr != null) {
mr.shutdown();
mr = null;
diff --git a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
index 42d8c3d..92a98a8 100644
--- a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
+++ b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.shims;
import java.io.IOException;
-import java.lang.Override;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
@@ -69,12 +68,11 @@
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.KerberosName;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.tools.distcp2.DistCp;
import org.apache.hadoop.tools.distcp2.DistCpOptions;
import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.VersionInfo;
@@ -233,7 +231,7 @@ public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers,
@Override
public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers,
- String nameNode) throws IOException {
+ String nameNode, boolean isLlap, String daemonConfURL) throws IOException {
throw new IOException("Cannot run tez on current hadoop, Version: " + VersionInfo.getVersion());
}
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index f762724..4ed96b4 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -21,10 +21,14 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
+import java.net.URL;
import java.nio.ByteBuffer;
import java.security.AccessControlException;
import java.security.NoSuchAlgorithmException;
@@ -65,13 +69,11 @@
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
-import org.apache.hadoop.hdfs.client.HdfsAdmin;
-import org.apache.hadoop.hdfs.protocol.EncryptionZone;
-import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ClusterStatus;
@@ -93,9 +95,9 @@
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.util.Progressable;
@@ -372,8 +374,8 @@ public void setupConfiguration(Configuration conf) {
*/
@Override
public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers,
- String nameNode) throws IOException {
- return new MiniTezShim(conf, numberOfTaskTrackers, nameNode);
+ String nameNode, boolean isLlap, String daemonConfURL) throws IOException {
+ return new MiniTezShim(conf, numberOfTaskTrackers, nameNode, isLlap, daemonConfURL);
}
/**
@@ -383,9 +385,16 @@ public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers
private final MiniTezCluster mr;
private final Configuration conf;
+ private Class> miniLlapKlass;
+ private Object miniLlapCluster;
- public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode)
- throws IOException {
+ public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode,
+ boolean isLlap, String daemonConfURL) throws IOException {
+ if (isLlap) {
+ createAndLaunchLlapDaemon(conf, daemonConfURL);
+ } else {
+ miniLlapCluster = null;
+ }
mr = new MiniTezCluster("hive", numberOfTaskTrackers);
conf.set("fs.defaultFS", nameNode);
conf.set("tez.am.log.level", "DEBUG");
@@ -395,6 +404,75 @@ public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode
this.conf = mr.getConfig();
}
+ private void createAndLaunchLlapDaemon(final Configuration conf, final String daemonConfURL) {
+ try {
+ Class> configKlass =
+ Class.forName("org.apache.hadoop.hive.llap.configuration.LlapConfiguration",
+ false, ShimLoader.class.getClassLoader());
+ final Object daemonConfig;
+ if (daemonConfURL != null) {
+ Constructor constructor = configKlass.getConstructor(new Class[] {Configuration.class,
+ URL.class});
+ daemonConfig = constructor.newInstance(conf, new URL(daemonConfURL));
+ } else {
+ Constructor constructor = configKlass.getConstructor(new Class[] {Configuration.class});
+ daemonConfig = constructor.newInstance(conf);
+ }
+ final String clusterName = "llap";
+ Class> llapDaemonKlass =
+ Class.forName("org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon",
+ false, ShimLoader.class.getClassLoader());
+ Method totalMemMethod = llapDaemonKlass.getMethod("getTotalHeapSize");
+ final long maxMemory = (long) totalMemMethod.invoke(null);
+ // 15% for io cache
+ final long memoryForCache = (long) (0.15f * maxMemory);
+ // 75% for 4 executors
+ final long totalExecutorMemory = (long) (0.75f * maxMemory);
+ Method getInt = configKlass.getMethod("getInt", new Class[]{String.class,
+ Integer.TYPE});
+ Field numExecField = configKlass.getField("LLAP_DAEMON_NUM_EXECUTORS");
+ Field numExecDefault = configKlass.getField("LLAP_DAEMON_NUM_EXECUTORS_DEFAULT");
+ final int numExecutors = (int) getInt.invoke(daemonConfig, numExecField.get(null),
+ numExecDefault.get(null));
+ final boolean asyncIOEnabled = true;
+ // enabling this will cause test failures in Mac OS X
+ final boolean directMemoryEnabled = false;
+ final int numLocalDirs = 1;
+ LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + " memoryForCache: " + memoryForCache
+ + " totalExecutorMemory: " + totalExecutorMemory + " numExecutors: " + numExecutors
+ + " asyncIOEnabled: " + asyncIOEnabled + " directMemoryEnabled: " + directMemoryEnabled
+ + " numLocalDirs: " + numLocalDirs);
+
+ miniLlapKlass = Class.forName("org.apache.hadoop.hive.llap.daemon.MiniLlapCluster",
+ false, ShimLoader.class.getClassLoader());
+ Method create = miniLlapKlass.getMethod("create", new Class[]{String.class, Integer.TYPE,
+ Long.TYPE, Boolean.TYPE, Boolean.TYPE, Long.TYPE, Integer.TYPE});
+ miniLlapCluster = create.invoke(null, clusterName,
+ numExecutors,
+ totalExecutorMemory,
+ asyncIOEnabled,
+ directMemoryEnabled,
+ memoryForCache,
+ numLocalDirs);
+
+ Method init = miniLlapKlass.getMethod("init", new Class[]{Configuration.class});
+ init.invoke(miniLlapCluster, daemonConfig);
+
+ Method start = miniLlapKlass.getMethod("start", new Class[]{});
+ start.invoke(miniLlapCluster);
+
+ Method getConfig = miniLlapKlass.getMethod("getClusterSpecificConfiguration");
+ Configuration llapConf = (Configuration) getConfig.invoke(miniLlapCluster);
+ Iterator> confIter = llapConf.iterator();
+ while (confIter.hasNext()) {
+ Map.Entry entry = confIter.next();
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to create MiniLlapCluster. Exception: " + e.getMessage());
+ }
+ }
+
@Override
public int getJobTrackerPort() throws UnsupportedOperationException {
String address = conf.get("yarn.resourcemanager.address");
@@ -410,6 +488,15 @@ public int getJobTrackerPort() throws UnsupportedOperationException {
@Override
public void shutdown() throws IOException {
mr.stop();
+
+ if (miniLlapKlass != null && miniLlapCluster != null) {
+ try {
+ Method stop = miniLlapKlass.getMethod("stop", new Class[]{});
+ stop.invoke(miniLlapCluster);
+ } catch (Exception e) {
+ LOG.error("Unable to stop llap daemon. Exception: " + e.getMessage());
+ }
+ }
}
@Override
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index f108068..085df2c 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -97,7 +97,7 @@ public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers,
String nameNode, int numDir) throws IOException;
public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers,
- String nameNode) throws IOException;
+ String nameNode, boolean isLlap, String daemonConfURL) throws IOException;
public MiniMrShim getMiniSparkCluster(Configuration conf, int numberOfTaskTrackers,
String nameNode, int numDir) throws IOException;