diff --git itests/hive-unit/pom.xml itests/hive-unit/pom.xml
index 7219f1d..ae231de 100644
--- itests/hive-unit/pom.xml
+++ itests/hive-unit/pom.xml
@@ -210,14 +210,12 @@
org.apache.hbase
hbase-server
${hbase.version}
- test
org.apache.hbase
hbase-server
${hbase.version}
test-jar
- test
org.apache.hbase
diff --git itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 26caa8d..8f5377f 100644
--- itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -195,7 +195,7 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM
if (usePortsFromConf) {
hiveConf.setBoolean("minillap.usePortsFromConf", true);
}
- llapCluster = LlapItUtils.startAndGetMiniLlapCluster(hiveConf, null);
+ llapCluster = LlapItUtils.startAndGetMiniLlapCluster(hiveConf, null, null);
mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString);
break;
diff --git itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java
index cb4aba5..c1a32c9 100644
--- itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java
+++ itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java
@@ -25,6 +25,7 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
@@ -36,7 +37,9 @@
private static final Logger LOG = LoggerFactory.getLogger(LlapItUtils.class);
- public static MiniLlapCluster startAndGetMiniLlapCluster(Configuration conf, String confDir) throws
+ public static MiniLlapCluster startAndGetMiniLlapCluster(Configuration conf,
+ MiniZooKeeperCluster miniZkCluster,
+ String confDir) throws
IOException {
MiniLlapCluster llapCluster;
LOG.info("Using conf dir: {}", confDir);
@@ -57,11 +60,14 @@ public static MiniLlapCluster startAndGetMiniLlapCluster(Configuration conf, Str
// enabling this will cause test failures in Mac OS X
final boolean directMemoryEnabled = false;
final int numLocalDirs = 1;
- LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + " memoryForCache: " + memoryForCache
+ LOG.info("MiniLlap Configs - maxMemory: " + maxMemory +
+ " memoryForCache: " + memoryForCache
+ " totalExecutorMemory: " + totalExecutorMemory + " numExecutors: " + numExecutors
+ " asyncIOEnabled: " + asyncIOEnabled + " directMemoryEnabled: " + directMemoryEnabled
+ " numLocalDirs: " + numLocalDirs);
llapCluster = MiniLlapCluster.create(clusterName,
+ miniZkCluster,
+ 1,
numExecutors,
totalExecutorMemory,
asyncIOEnabled,
diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 5ccbcba..8473436 100644
--- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -429,6 +429,9 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType,
fs = dfs.getFileSystem();
}
+ setup = new QTestSetup();
+ setup.preTest(conf);
+
String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString());
if (clusterType == MiniClusterType.tez) {
if (confDir != null && !confDir.isEmpty()) {
@@ -437,13 +440,16 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType,
}
mr = shims.getMiniTezCluster(conf, 4, uriString);
} else if (clusterType == MiniClusterType.llap) {
- llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, confDir);
+ llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, setup.zooKeeperCluster, confDir);
mr = shims.getMiniTezCluster(conf, 2, uriString);
} else if (clusterType == MiniClusterType.miniSparkOnYarn) {
mr = shims.getMiniSparkCluster(conf, 4, uriString, 1);
} else {
mr = shims.getMiniMrCluster(conf, 4, uriString, 1);
}
+ } else {
+ setup = new QTestSetup();
+ setup.preTest(conf);
}
initConf();
@@ -471,8 +477,6 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType,
overWrite = "true".equalsIgnoreCase(System.getProperty("test.output.overwrite"));
- setup = new QTestSetup();
- setup.preTest(conf);
init();
}
diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index a09c0b2..9871702 100644
--- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -14,12 +14,11 @@
package org.apache.hadoop.hive.llap.daemon;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.Map;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -41,47 +40,57 @@
private static final Logger LOG = LoggerFactory.getLogger(MiniLlapCluster.class);
private final File testWorkDir;
+ private final String clusterNameTrimmed;
+ private final long numInstances;
private final long execBytesPerService;
private final boolean llapIoEnabled;
private final boolean ioIsDirect;
private final long ioBytesPerService;
private final int numExecutorsPerService;
+ private final File zkWorkDir;
private final String[] localDirs;
private final Configuration clusterSpecificConfiguration = new Configuration(false);
- private LlapDaemon llapDaemon;
+ private final LlapDaemon [] llapDaemons;
+ private MiniZooKeeperCluster miniZooKeeperCluster;
+ private final boolean ownZkCluster;
- public static MiniLlapCluster create(String clusterName, int numExecutorsPerService,
- long execBytePerService, boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService,
- int numLocalDirs) {
- return new MiniLlapCluster(clusterName, numExecutorsPerService, execBytePerService,
+
+ public static MiniLlapCluster create(String clusterName,
+ @Nullable MiniZooKeeperCluster miniZkCluster,
+ int numInstances,
+ int numExecutorsPerService,
+ long execBytePerService, boolean llapIoEnabled,
+ boolean ioIsDirect, long ioBytesPerService,
+ int numLocalDirs) {
+ return new MiniLlapCluster(clusterName, miniZkCluster, numInstances, numExecutorsPerService,
+ execBytePerService,
llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs);
}
- public static MiniLlapCluster createAndLaunch(Configuration conf, String clusterName,
- int numExecutorsPerService, long execBytePerService, boolean llapIoEnabled,
- boolean ioIsDirect, long ioBytesPerService, int numLocalDirs) {
- MiniLlapCluster miniLlapCluster = create(clusterName, numExecutorsPerService,
- execBytePerService, llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs);
- miniLlapCluster.init(conf);
- miniLlapCluster.start();
- Configuration llapConf = miniLlapCluster.getClusterSpecificConfiguration();
- Iterator> confIter = llapConf.iterator();
- while (confIter.hasNext()) {
- Map.Entry entry = confIter.next();
- conf.set(entry.getKey(), entry.getValue());
- }
- return miniLlapCluster;
+ public static MiniLlapCluster create(String clusterName,
+ @Nullable MiniZooKeeperCluster miniZkCluster,
+ int numExecutorsPerService,
+ long execBytePerService, boolean llapIoEnabled,
+ boolean ioIsDirect, long ioBytesPerService,
+ int numLocalDirs) {
+ return create(clusterName, miniZkCluster, 1, numExecutorsPerService, execBytePerService,
+ llapIoEnabled,
+ ioIsDirect, ioBytesPerService, numLocalDirs);
}
- // TODO Add support for multiple instances
- private MiniLlapCluster(String clusterName, int numExecutorsPerService, long execMemoryPerService,
- boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, int numLocalDirs) {
+ private MiniLlapCluster(String clusterName, @Nullable MiniZooKeeperCluster miniZkCluster,
+ int numInstances, int numExecutorsPerService, long execMemoryPerService,
+ boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService,
+ int numLocalDirs) {
super(clusterName + "_" + MiniLlapCluster.class.getSimpleName());
Preconditions.checkArgument(numExecutorsPerService > 0);
Preconditions.checkArgument(execMemoryPerService > 0);
Preconditions.checkArgument(numLocalDirs > 0);
- String clusterNameTrimmed = clusterName.replace("$", "") + "_" + MiniLlapCluster.class.getSimpleName();
+ this.numInstances = numInstances;
+
+ this.clusterNameTrimmed = clusterName.replace("$", "") + "_" + MiniLlapCluster.class.getSimpleName();
+ this.llapDaemons = new LlapDaemon[numInstances];
File targetWorkDir = new File("target", clusterNameTrimmed);
try {
FileContext.getLocalFSFileContext().delete(
@@ -123,8 +132,18 @@ private MiniLlapCluster(String clusterName, int numExecutorsPerService, long exe
this.testWorkDir = link;
} else {
+ targetWorkDir.mkdir();
this.testWorkDir = targetWorkDir;
}
+ if (miniZkCluster == null) {
+ ownZkCluster = true;
+ this.zkWorkDir = new File(testWorkDir, "mini-zk-cluster");
+ zkWorkDir.mkdir();
+ } else {
+ miniZooKeeperCluster = miniZkCluster;
+ ownZkCluster = false;
+ this.zkWorkDir = null;
+ }
this.numExecutorsPerService = numExecutorsPerService;
this.execBytesPerService = execMemoryPerService;
this.ioIsDirect = ioIsDirect;
@@ -142,12 +161,13 @@ private MiniLlapCluster(String clusterName, int numExecutorsPerService, long exe
}
@Override
- public void serviceInit(Configuration conf) {
+ public void serviceInit(Configuration conf) throws IOException, InterruptedException {
int rpcPort = 0;
int mngPort = 0;
int shufflePort = 0;
int webPort = 0;
boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", false);
+ LOG.info("MiniLlap configured to use ports from conf: {}", usePortsFromConf);
if (usePortsFromConf) {
rpcPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT);
mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT);
@@ -155,43 +175,61 @@ public void serviceInit(Configuration conf) {
webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
}
- llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
- ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort);
- llapDaemon.init(conf);
+ if (ownZkCluster) {
+ miniZooKeeperCluster = new MiniZooKeeperCluster();
+ miniZooKeeperCluster.startup(zkWorkDir);
+ } else {
+ // Already setup in the create method
+ }
+
+ conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + clusterNameTrimmed);
+ conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, "localhost");
+ conf.setInt(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, miniZooKeeperCluster.getClientPort());
+
+ LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
+ for (int i = 0 ;i < numInstances ; i++) {
+ llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
+ ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort);
+ llapDaemons[i].init(new Configuration(conf));
+ }
+ LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
}
@Override
public void serviceStart() {
- llapDaemon.start();
-
- clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname,
- getServiceAddress().getHostName());
- clusterSpecificConfiguration.setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname,
- getServiceAddress().getPort());
-
- clusterSpecificConfiguration.setInt(
- ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
- numExecutorsPerService);
- clusterSpecificConfiguration.setLong(
- ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, execBytesPerService);
+ LOG.info("Starting {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
+ for (int i = 0 ;i < numInstances ; i++) {
+ llapDaemons[i].start();
+ }
+ LOG.info("Started {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
+
// Optimize local fetch does not work with LLAP due to different local directories
// used by containers and LLAP
clusterSpecificConfiguration
.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
+ clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + clusterNameTrimmed);
}
@Override
- public void serviceStop() {
- if (llapDaemon != null) {
- llapDaemon.stop();
- llapDaemon = null;
+ public void serviceStop() throws IOException {
+ for (int i = 0 ; i < numInstances ; i++) {
+ if (llapDaemons[i] != null) {
+ llapDaemons[i].stop();
+ llapDaemons[i] = null;
+ }
+ }
+ if (ownZkCluster) {
+ if (miniZooKeeperCluster != null) {
+ LOG.info("Stopping MiniZooKeeper cluster");
+ miniZooKeeperCluster.shutdown();
+ miniZooKeeperCluster = null;
+ LOG.info("Stopped MiniZooKeeper cluster");
+ }
+ } else {
+ LOG.info("Not stopping MiniZK cluster since it is now owned by us");
}
}
- private InetSocketAddress getServiceAddress() {
- Preconditions.checkState(getServiceState() == Service.STATE.STARTED);
- return llapDaemon.getListenerAddress();
- }
public Configuration getClusterSpecificConfiguration() {
Preconditions.checkState(getServiceState() == Service.STATE.STARTED);
@@ -200,7 +238,10 @@ public Configuration getClusterSpecificConfiguration() {
// Mainly for verification
public long getNumSubmissions() {
- return llapDaemon.getNumSubmissions();
+ int numSubmissions = 0;
+ for (int i = 0 ; i < numInstances ; i++) {
+ numSubmissions += llapDaemons[i].getNumSubmissions();
+ }
+ return numSubmissions;
}
-
}