diff --git a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 32a0ae8..6e0986c 100644 --- a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -181,7 +181,7 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM // Initialize the execution engine based on cluster type switch (miniClusterType) { case TEZ: - mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString); + mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString, false, null); break; case MR: mr = ShimLoader.getHadoopShims().getMiniMrCluster(hiveConf, 4, uriString, 1); diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml index 6c2be68..c276db9 100644 --- a/itests/qtest/pom.xml +++ b/itests/qtest/pom.xml @@ -97,19 +97,6 @@ test - org.apache.hive - hive-llap-client - ${project.version} - test - - - org.apache.hive - hive-llap-server - ${project.version} - test-jar - test - - org.apache.hadoop hadoop-yarn-registry ${hadoop-23.version} @@ -270,6 +257,19 @@ test + org.apache.hive + hive-llap-server + ${project.version} + test + + + org.apache.hive + hive-llap-server + ${project.version} + test-jar + test + + commons-logging commons-logging ${commons-logging.version} diff --git a/itests/util/pom.xml b/itests/util/pom.xml index d641189..0743f01 100644 --- a/itests/util/pom.xml +++ b/itests/util/pom.xml @@ -54,20 +54,6 @@ hive-common ${project.version} - - - org.apache.hive - hive-llap-server - ${project.version} - - - - org.apache.hive - hive-llap-server - test-jar - ${project.version} - - org.apache.hive hive-cli diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 520f0ec..e460d86 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -45,10 +45,8 @@ import java.util.Collection; import java.util.Deque; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -60,7 +58,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -73,9 +70,6 @@ import org.apache.hadoop.hive.common.io.SortPrintStream; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; -import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; @@ -154,7 +148,6 @@ private HadoopShims.MiniMrShim mr = null; private HadoopShims.MiniDFSShim dfs = null; private HadoopShims.HdfsEncryptionShim hes = null; - private MiniLlapCluster llapCluster = null; private final boolean miniMr = false; private String hadoopVer = null; private QTestSetup setup = null; @@ -409,48 +402,15 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString()); if (clusterType == MiniClusterType.tez) { - mr = shims.getMiniTezCluster(conf, 4, uriString); + mr = shims.getMiniTezCluster(conf, 4, uriString, false, null); } else if (clusterType == MiniClusterType.llap) { - Configuration daemonConf; + final String daemonConfURL; if (confDir != null && !confDir.isEmpty()) { - URL llapDaemonConfURL = new URL("file://" - + new File(confDir).toURI().getPath() + "/llap-daemon-site.xml"); - daemonConf = new LlapConfiguration(conf, llapDaemonConfURL); + daemonConfURL = "file://" + new File(confDir).toURI().getPath() + "/llap-daemon-site.xml"; } else { - daemonConf = new LlapConfiguration(conf); + daemonConfURL = null; } - final String clusterName = "llap"; - final long maxMemory = LlapDaemon.getTotalHeapSize(); - // 15% for io cache - final long memoryForCache = (long) (0.15f * maxMemory); - // 75% for 4 executors - final long totalExecutorMemory = (long) (0.75f * maxMemory); - final int numExecutors = daemonConf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, - LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT); - final boolean asyncIOEnabled = true; - // enabling this will cause test failures in Mac OS X - final boolean directMemoryEnabled = false; - final int numLocalDirs = 1; - LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + " memoryForCache: " + memoryForCache - + " totalExecutorMemory: " + totalExecutorMemory + " numExecutors: " + numExecutors - + " asyncIOEnabled: " + asyncIOEnabled + " directMemoryEnabled: " + directMemoryEnabled - + " numLocalDirs: " + numLocalDirs); - llapCluster = MiniLlapCluster.create(clusterName, - numExecutors, - totalExecutorMemory, - asyncIOEnabled, - directMemoryEnabled, - memoryForCache, - numLocalDirs); - llapCluster.init(daemonConf); - llapCluster.start(); - Configuration llapConf = llapCluster.getClusterSpecificConfiguration(); - Iterator> confIter = llapConf.iterator(); - while (confIter.hasNext()) { - Entry entry = confIter.next(); - conf.set(entry.getKey(), entry.getValue()); - } - mr = shims.getMiniTezCluster(conf, 2, uriString); + mr = shims.getMiniTezCluster(conf, 2, uriString, true, daemonConfURL); } else if (clusterType == MiniClusterType.miniSparkOnYarn) { mr = shims.getMiniSparkCluster(conf, 4, uriString, 1); } else { @@ -501,10 +461,6 @@ public void shutdown() throws Exception { sparkSession = null; } } - if (llapCluster != null) { - llapCluster.stop(); - llapCluster = null; - } if (mr != null) { mr.shutdown(); mr = null; diff --git a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index 42d8c3d..92a98a8 100644 --- a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.shims; import java.io.IOException; -import java.lang.Override; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URI; @@ -69,12 +68,11 @@ import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.KerberosName; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.tools.distcp2.DistCp; import org.apache.hadoop.tools.distcp2.DistCpOptions; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.VersionInfo; @@ -233,7 +231,7 @@ public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, @Override public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode) throws IOException { + String nameNode, boolean isLlap, String daemonConfURL) throws IOException { throw new IOException("Cannot run tez on current hadoop, Version: " + VersionInfo.getVersion()); } diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index f762724..4ed96b4 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -21,10 +21,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URI; +import java.net.URL; import java.nio.ByteBuffer; import java.security.AccessControlException; import java.security.NoSuchAlgorithmException; @@ -65,13 +69,11 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; -import org.apache.hadoop.hdfs.client.HdfsAdmin; -import org.apache.hadoop.hdfs.protocol.EncryptionZone; -import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.ClusterStatus; @@ -93,9 +95,9 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.util.Progressable; @@ -372,8 +374,8 @@ public void setupConfiguration(Configuration conf) { */ @Override public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode) throws IOException { - return new MiniTezShim(conf, numberOfTaskTrackers, nameNode); + String nameNode, boolean isLlap, String daemonConfURL) throws IOException { + return new MiniTezShim(conf, numberOfTaskTrackers, nameNode, isLlap, daemonConfURL); } /** @@ -383,9 +385,16 @@ public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers private final MiniTezCluster mr; private final Configuration conf; + private Class miniLlapKlass; + private Object miniLlapCluster; - public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode) - throws IOException { + public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode, + boolean isLlap, String daemonConfURL) throws IOException { + if (isLlap) { + createAndLaunchLlapDaemon(conf, daemonConfURL); + } else { + miniLlapCluster = null; + } mr = new MiniTezCluster("hive", numberOfTaskTrackers); conf.set("fs.defaultFS", nameNode); conf.set("tez.am.log.level", "DEBUG"); @@ -395,6 +404,75 @@ public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode this.conf = mr.getConfig(); } + private void createAndLaunchLlapDaemon(final Configuration conf, final String daemonConfURL) { + try { + Class configKlass = + Class.forName("org.apache.hadoop.hive.llap.configuration.LlapConfiguration", + false, ShimLoader.class.getClassLoader()); + final Object daemonConfig; + if (daemonConfURL != null) { + Constructor constructor = configKlass.getConstructor(new Class[] {Configuration.class, + URL.class}); + daemonConfig = constructor.newInstance(conf, new URL(daemonConfURL)); + } else { + Constructor constructor = configKlass.getConstructor(new Class[] {Configuration.class}); + daemonConfig = constructor.newInstance(conf); + } + final String clusterName = "llap"; + Class llapDaemonKlass = + Class.forName("org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon", + false, ShimLoader.class.getClassLoader()); + Method totalMemMethod = llapDaemonKlass.getMethod("getTotalHeapSize"); + final long maxMemory = (long) totalMemMethod.invoke(null); + // 15% for io cache + final long memoryForCache = (long) (0.15f * maxMemory); + // 75% for 4 executors + final long totalExecutorMemory = (long) (0.75f * maxMemory); + Method getInt = configKlass.getMethod("getInt", new Class[]{String.class, + Integer.TYPE}); + Field numExecField = configKlass.getField("LLAP_DAEMON_NUM_EXECUTORS"); + Field numExecDefault = configKlass.getField("LLAP_DAEMON_NUM_EXECUTORS_DEFAULT"); + final int numExecutors = (int) getInt.invoke(daemonConfig, numExecField.get(null), + numExecDefault.get(null)); + final boolean asyncIOEnabled = true; + // enabling this will cause test failures in Mac OS X + final boolean directMemoryEnabled = false; + final int numLocalDirs = 1; + LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + " memoryForCache: " + memoryForCache + + " totalExecutorMemory: " + totalExecutorMemory + " numExecutors: " + numExecutors + + " asyncIOEnabled: " + asyncIOEnabled + " directMemoryEnabled: " + directMemoryEnabled + + " numLocalDirs: " + numLocalDirs); + + miniLlapKlass = Class.forName("org.apache.hadoop.hive.llap.daemon.MiniLlapCluster", + false, ShimLoader.class.getClassLoader()); + Method create = miniLlapKlass.getMethod("create", new Class[]{String.class, Integer.TYPE, + Long.TYPE, Boolean.TYPE, Boolean.TYPE, Long.TYPE, Integer.TYPE}); + miniLlapCluster = create.invoke(null, clusterName, + numExecutors, + totalExecutorMemory, + asyncIOEnabled, + directMemoryEnabled, + memoryForCache, + numLocalDirs); + + Method init = miniLlapKlass.getMethod("init", new Class[]{Configuration.class}); + init.invoke(miniLlapCluster, daemonConfig); + + Method start = miniLlapKlass.getMethod("start", new Class[]{}); + start.invoke(miniLlapCluster); + + Method getConfig = miniLlapKlass.getMethod("getClusterSpecificConfiguration"); + Configuration llapConf = (Configuration) getConfig.invoke(miniLlapCluster); + Iterator> confIter = llapConf.iterator(); + while (confIter.hasNext()) { + Map.Entry entry = confIter.next(); + conf.set(entry.getKey(), entry.getValue()); + } + } catch (Exception e) { + LOG.error("Unable to create MiniLlapCluster. Exception: " + e.getMessage()); + } + } + @Override public int getJobTrackerPort() throws UnsupportedOperationException { String address = conf.get("yarn.resourcemanager.address"); @@ -410,6 +488,15 @@ public int getJobTrackerPort() throws UnsupportedOperationException { @Override public void shutdown() throws IOException { mr.stop(); + + if (miniLlapKlass != null && miniLlapCluster != null) { + try { + Method stop = miniLlapKlass.getMethod("stop", new Class[]{}); + stop.invoke(miniLlapCluster); + } catch (Exception e) { + LOG.error("Unable to stop llap daemon. Exception: " + e.getMessage()); + } + } } @Override diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index f108068..085df2c 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -97,7 +97,7 @@ public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException; public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode) throws IOException; + String nameNode, boolean isLlap, String daemonConfURL) throws IOException; public MiniMrShim getMiniSparkCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException;