diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index 79158ca..97786d9 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -55,6 +55,17 @@ ${project.version} + org.apache.hive + hive-llap-server + ${project.version} + + + org.apache.hive + hive-llap-server + ${project.version} + test-jar + + org.apache.hive.hcatalog hive-hcatalog-core ${project.version} @@ -340,6 +351,12 @@ test + org.apache.hadoop + hadoop-yarn-registry + ${hadoop.version} + true + + org.apache.curator curator-test ${curator.version} diff --git a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 488ba93..751d8ea 100644 --- a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -68,6 +68,7 @@ public enum MiniClusterType { MR, TEZ, + LLAP, DFS_ONLY; } @@ -79,6 +80,7 @@ private String serverKeytab; private boolean isHTTPTransMode = false; private boolean isMetastoreRemote; + private boolean usePortsFromConf = false; public Builder() { } @@ -125,7 +127,7 @@ public MiniHS2 build() throws Exception { hiveConf.setVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE, HS2_BINARY_MODE); } return new MiniHS2(hiveConf, miniClusterType, useMiniKdc, serverPrincipal, serverKeytab, - isMetastoreRemote); + isMetastoreRemote, usePortsFromConf); } } @@ -162,8 +164,11 @@ public boolean isUseMiniKdc() { } private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useMiniKdc, - String serverPrincipal, String serverKeytab, boolean isMetastoreRemote) throws Exception { - super(hiveConf, "localhost", MetaStoreUtils.findFreePort(), MetaStoreUtils.findFreePort()); + String serverPrincipal, String serverKeytab, boolean isMetastoreRemote, + boolean usePortsFromConf) throws Exception { + super(hiveConf, "localhost", + (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT) : MetaStoreUtils.findFreePort()), + (usePortsFromConf ? hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT) : MetaStoreUtils.findFreePort())); this.miniClusterType = miniClusterType; this.useMiniKdc = useMiniKdc; this.serverPrincipal = serverPrincipal; @@ -183,6 +188,12 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM case TEZ: mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString, false); break; + case LLAP: + if (usePortsFromConf) { + hiveConf.setBoolean("minillap.usePortsFromConf", true); + } + mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString, true); + break; case MR: mr = ShimLoader.getHadoopShims().getMiniMrCluster(hiveConf, 4, uriString, 1); break; @@ -214,8 +225,10 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM setWareHouseDir(wareHouseDir.toString()); System.setProperty(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, metaStoreURL); hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, metaStoreURL); - // reassign a new port, just in case if one of the MR services grabbed the last one - setBinaryPort(MetaStoreUtils.findFreePort()); + if (!usePortsFromConf) { + // reassign a new port, just in case if one of the MR services grabbed the last one + setBinaryPort(MetaStoreUtils.findFreePort()); + } hiveConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, getHost()); hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, getBinaryPort()); hiveConf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT, getHttpPort()); @@ -236,7 +249,12 @@ public MiniHS2(HiveConf hiveConf) throws Exception { } public MiniHS2(HiveConf hiveConf, MiniClusterType clusterType) throws Exception { - this(hiveConf, clusterType, false, null, null, false); + this(hiveConf, clusterType, false); + } + + public MiniHS2(HiveConf hiveConf, MiniClusterType clusterType, + boolean usePortsFromConf) throws Exception { + this(hiveConf, clusterType, false, null, null, false, usePortsFromConf); } public void start(Map confOverlay) throws Exception { diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestMiniHS2Cluster.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestMiniHS2Cluster.java new file mode 100644 index 0000000..30925d2 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/miniHS2/TestMiniHS2Cluster.java @@ -0,0 +1,70 @@ +package org.apache.hive.jdbc.miniHS2; + +import java.io.File; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMiniHS2Cluster { + private static MiniHS2 miniHS2 = null; + + /** + * Not a unit test - this simply runs a MiniHS2 cluster, which can be used for manual testing. + */ + @Test + public void testRunCluster() throws Exception { + if (!Boolean.parseBoolean(System.getProperty("miniHS2.run", "false"))) { + return; + } + + MiniClusterType clusterType = MiniClusterType.valueOf(System.getProperty("miniHS2.clusterType", "MR").toUpperCase()); + String confFilesProperty = System.getProperty("miniHS2.conf", "../../data/conf/hive-site.xml"); + boolean usePortsFromConf = Boolean.valueOf(System.getProperty("miniHS2.usePortsFromConf", "false")); + + // Load conf files + String[] confFiles = confFilesProperty.split(","); + int idx; + for (idx = 0; idx < confFiles.length; ++idx) { + String confFile = confFiles[idx]; + if (confFile.isEmpty()) { + continue; + } + HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confFile).toURI().getPath())); + break; + } + HiveConf conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN, true); + + for (; idx < confFiles.length; ++idx) { + String confFile = confFiles[idx]; + if (confFile.isEmpty()) { + continue; + } + conf.addResource(new URL("file://" + new File(confFile).toURI().getPath())); + } + + miniHS2 = new MiniHS2(conf, clusterType, usePortsFromConf); + Map confOverlay = new HashMap(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + + System.out.println("JDBC URL avaailable at " + miniHS2.getJdbcURL()); + + // MiniHS2 cluster is up .. let it run until someone kills the test + while (true) { + Thread.sleep(1000); + } + } +} diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index deade5f..1c565b1 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -25,9 +25,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; +import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.Shell; @@ -142,8 +144,18 @@ private MiniLlapCluster(String clusterName, int numExecutorsPerService, long exe @Override public void serviceInit(Configuration conf) { + int rpcPort = 0; + int mngPort = 0; + int shufflePort = 0; + boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", false); + if (usePortsFromConf) { + rpcPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT); + mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT); + shufflePort = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); + } + llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, - ioIsDirect, ioBytesPerService, localDirs, 0, 0, 0); + ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort); llapDaemon.init(conf); }