diff --git itests/hive-unit/pom.xml itests/hive-unit/pom.xml index 7219f1d..ae231de 100644 --- itests/hive-unit/pom.xml +++ itests/hive-unit/pom.xml @@ -210,14 +210,12 @@ org.apache.hbase hbase-server ${hbase.version} - test org.apache.hbase hbase-server ${hbase.version} test-jar - test org.apache.hbase diff --git itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 26caa8d..8f5377f 100644 --- itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -195,7 +195,7 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM if (usePortsFromConf) { hiveConf.setBoolean("minillap.usePortsFromConf", true); } - llapCluster = LlapItUtils.startAndGetMiniLlapCluster(hiveConf, null); + llapCluster = LlapItUtils.startAndGetMiniLlapCluster(hiveConf, null, null); mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString); break; diff --git itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java index cb4aba5..c1a32c9 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java +++ itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; @@ -36,7 +37,9 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapItUtils.class); - public static MiniLlapCluster startAndGetMiniLlapCluster(Configuration conf, String confDir) throws + public static MiniLlapCluster startAndGetMiniLlapCluster(Configuration conf, + MiniZooKeeperCluster miniZkCluster, + String confDir) throws IOException { MiniLlapCluster llapCluster; LOG.info("Using conf dir: {}", confDir); @@ -57,11 +60,14 @@ public static MiniLlapCluster startAndGetMiniLlapCluster(Configuration conf, Str // enabling this will cause test failures in Mac OS X final boolean directMemoryEnabled = false; final int numLocalDirs = 1; - LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + " memoryForCache: " + memoryForCache + LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + + " memoryForCache: " + memoryForCache + " totalExecutorMemory: " + totalExecutorMemory + " numExecutors: " + numExecutors + " asyncIOEnabled: " + asyncIOEnabled + " directMemoryEnabled: " + directMemoryEnabled + " numLocalDirs: " + numLocalDirs); llapCluster = MiniLlapCluster.create(clusterName, + miniZkCluster, + 1, numExecutors, totalExecutorMemory, asyncIOEnabled, diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 5ccbcba..dddc6ca 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -429,6 +429,9 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, fs = dfs.getFileSystem(); } + setup = new QTestSetup(); + setup.preTest(conf); + String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString()); if (clusterType == MiniClusterType.tez) { if (confDir != null && !confDir.isEmpty()) { @@ -437,7 +440,7 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, } mr = shims.getMiniTezCluster(conf, 4, uriString); } else if (clusterType == MiniClusterType.llap) { - llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, confDir); + llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, setup.zooKeeperCluster, confDir); mr = shims.getMiniTezCluster(conf, 2, uriString); } else if (clusterType == MiniClusterType.miniSparkOnYarn) { mr = shims.getMiniSparkCluster(conf, 4, uriString, 1); @@ -471,8 +474,6 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, overWrite = "true".equalsIgnoreCase(System.getProperty("test.output.overwrite")); - setup = new QTestSetup(); - setup.preTest(conf); init(); } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index a09c0b2..9871702 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -14,12 +14,11 @@ package org.apache.hadoop.hive.llap.daemon; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Iterator; -import java.util.Map; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -41,47 +40,57 @@ private static final Logger LOG = LoggerFactory.getLogger(MiniLlapCluster.class); private final File testWorkDir; + private final String clusterNameTrimmed; + private final long numInstances; private final long execBytesPerService; private final boolean llapIoEnabled; private final boolean ioIsDirect; private final long ioBytesPerService; private final int numExecutorsPerService; + private final File zkWorkDir; private final String[] localDirs; private final Configuration clusterSpecificConfiguration = new Configuration(false); - private LlapDaemon llapDaemon; + private final LlapDaemon [] llapDaemons; + private MiniZooKeeperCluster miniZooKeeperCluster; + private final boolean ownZkCluster; - public static MiniLlapCluster create(String clusterName, int numExecutorsPerService, - long execBytePerService, boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, - int numLocalDirs) { - return new MiniLlapCluster(clusterName, numExecutorsPerService, execBytePerService, + + public static MiniLlapCluster create(String clusterName, + @Nullable MiniZooKeeperCluster miniZkCluster, + int numInstances, + int numExecutorsPerService, + long execBytePerService, boolean llapIoEnabled, + boolean ioIsDirect, long ioBytesPerService, + int numLocalDirs) { + return new MiniLlapCluster(clusterName, miniZkCluster, numInstances, numExecutorsPerService, + execBytePerService, llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs); } - public static MiniLlapCluster createAndLaunch(Configuration conf, String clusterName, - int numExecutorsPerService, long execBytePerService, boolean llapIoEnabled, - boolean ioIsDirect, long ioBytesPerService, int numLocalDirs) { - MiniLlapCluster miniLlapCluster = create(clusterName, numExecutorsPerService, - execBytePerService, llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs); - miniLlapCluster.init(conf); - miniLlapCluster.start(); - Configuration llapConf = miniLlapCluster.getClusterSpecificConfiguration(); - Iterator> confIter = llapConf.iterator(); - while (confIter.hasNext()) { - Map.Entry entry = confIter.next(); - conf.set(entry.getKey(), entry.getValue()); - } - return miniLlapCluster; + public static MiniLlapCluster create(String clusterName, + @Nullable MiniZooKeeperCluster miniZkCluster, + int numExecutorsPerService, + long execBytePerService, boolean llapIoEnabled, + boolean ioIsDirect, long ioBytesPerService, + int numLocalDirs) { + return create(clusterName, miniZkCluster, 1, numExecutorsPerService, execBytePerService, + llapIoEnabled, + ioIsDirect, ioBytesPerService, numLocalDirs); } - // TODO Add support for multiple instances - private MiniLlapCluster(String clusterName, int numExecutorsPerService, long execMemoryPerService, - boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, int numLocalDirs) { + private MiniLlapCluster(String clusterName, @Nullable MiniZooKeeperCluster miniZkCluster, + int numInstances, int numExecutorsPerService, long execMemoryPerService, + boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, + int numLocalDirs) { super(clusterName + "_" + MiniLlapCluster.class.getSimpleName()); Preconditions.checkArgument(numExecutorsPerService > 0); Preconditions.checkArgument(execMemoryPerService > 0); Preconditions.checkArgument(numLocalDirs > 0); - String clusterNameTrimmed = clusterName.replace("$", "") + "_" + MiniLlapCluster.class.getSimpleName(); + this.numInstances = numInstances; + + this.clusterNameTrimmed = clusterName.replace("$", "") + "_" + MiniLlapCluster.class.getSimpleName(); + this.llapDaemons = new LlapDaemon[numInstances]; File targetWorkDir = new File("target", clusterNameTrimmed); try { FileContext.getLocalFSFileContext().delete( @@ -123,8 +132,18 @@ private MiniLlapCluster(String clusterName, int numExecutorsPerService, long exe this.testWorkDir = link; } else { + targetWorkDir.mkdir(); this.testWorkDir = targetWorkDir; } + if (miniZkCluster == null) { + ownZkCluster = true; + this.zkWorkDir = new File(testWorkDir, "mini-zk-cluster"); + zkWorkDir.mkdir(); + } else { + miniZooKeeperCluster = miniZkCluster; + ownZkCluster = false; + this.zkWorkDir = null; + } this.numExecutorsPerService = numExecutorsPerService; this.execBytesPerService = execMemoryPerService; this.ioIsDirect = ioIsDirect; @@ -142,12 +161,13 @@ private MiniLlapCluster(String clusterName, int numExecutorsPerService, long exe } @Override - public void serviceInit(Configuration conf) { + public void serviceInit(Configuration conf) throws IOException, InterruptedException { int rpcPort = 0; int mngPort = 0; int shufflePort = 0; int webPort = 0; boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", false); + LOG.info("MiniLlap configured to use ports from conf: {}", usePortsFromConf); if (usePortsFromConf) { rpcPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT); mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT); @@ -155,43 +175,61 @@ public void serviceInit(Configuration conf) { webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); } - llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, - ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort); - llapDaemon.init(conf); + if (ownZkCluster) { + miniZooKeeperCluster = new MiniZooKeeperCluster(); + miniZooKeeperCluster.startup(zkWorkDir); + } else { + // Already setup in the create method + } + + conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + clusterNameTrimmed); + conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, "localhost"); + conf.setInt(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, miniZooKeeperCluster.getClientPort()); + + LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); + for (int i = 0 ;i < numInstances ; i++) { + llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, + ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort); + llapDaemons[i].init(new Configuration(conf)); + } + LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); } @Override public void serviceStart() { - llapDaemon.start(); - - clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, - getServiceAddress().getHostName()); - clusterSpecificConfiguration.setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname, - getServiceAddress().getPort()); - - clusterSpecificConfiguration.setInt( - ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, - numExecutorsPerService); - clusterSpecificConfiguration.setLong( - ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, execBytesPerService); + LOG.info("Starting {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); + for (int i = 0 ;i < numInstances ; i++) { + llapDaemons[i].start(); + } + LOG.info("Started {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); + // Optimize local fetch does not work with LLAP due to different local directories // used by containers and LLAP clusterSpecificConfiguration .setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); + clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + clusterNameTrimmed); } @Override - public void serviceStop() { - if (llapDaemon != null) { - llapDaemon.stop(); - llapDaemon = null; + public void serviceStop() throws IOException { + for (int i = 0 ; i < numInstances ; i++) { + if (llapDaemons[i] != null) { + llapDaemons[i].stop(); + llapDaemons[i] = null; + } + } + if (ownZkCluster) { + if (miniZooKeeperCluster != null) { + LOG.info("Stopping MiniZooKeeper cluster"); + miniZooKeeperCluster.shutdown(); + miniZooKeeperCluster = null; + LOG.info("Stopped MiniZooKeeper cluster"); + } + } else { + LOG.info("Not stopping MiniZK cluster since it is now owned by us"); } } - private InetSocketAddress getServiceAddress() { - Preconditions.checkState(getServiceState() == Service.STATE.STARTED); - return llapDaemon.getListenerAddress(); - } public Configuration getClusterSpecificConfiguration() { Preconditions.checkState(getServiceState() == Service.STATE.STARTED); @@ -200,7 +238,10 @@ public Configuration getClusterSpecificConfiguration() { // Mainly for verification public long getNumSubmissions() { - return llapDaemon.getNumSubmissions(); + int numSubmissions = 0; + for (int i = 0 ; i < numInstances ; i++) { + numSubmissions += llapDaemons[i].getNumSubmissions(); + } + return numSubmissions; } - }