diff --git data/conf/llap/hive-site.xml data/conf/llap/hive-site.xml
index c2bef58..72bdcfb 100644
--- data/conf/llap/hive-site.xml
+++ data/conf/llap/hive-site.xml
@@ -273,4 +273,48 @@
false
+
+
+
+ hive.llap.daemon.service.hosts
+ localhost
+
+
+
+ hive.llap.daemon.service.port
+ 0
+
+
+
+ hive.llap.daemon.num.executors
+ 4
+
+
+
+ hive.llap.daemon.task.scheduler.wait.queue.size
+ 4
+
+
+
+ hive.llap.cache.allow.synthetic.fileid
+ true
+
+
+
+
+ ipc.client.low-latency
+ true
+
+
+
+ ipc.client.tcpnodelay
+ true
+
+
+
+ ipc.clients-per-factory
+ 4
+
+
+
diff --git data/conf/llap/llap-daemon-site.xml data/conf/llap/llap-daemon-site.xml
deleted file mode 100644
index 98c0f2b..0000000
--- data/conf/llap/llap-daemon-site.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-
-
-
-
-
- hive.llap.daemon.service.hosts
- localhost
-
-
-
- hive.llap.daemon.service.port
- 0
-
-
-
- hive.llap.daemon.num.executors
- 4
-
-
-
- hive.llap.daemon.task.scheduler.wait.queue.size
- 4
-
-
-
- hive.llap.cache.allow.synthetic.fileid
- true
-
-
-
-
- ipc.client.low-latency
- true
-
-
-
- ipc.client.tcpnodelay
- true
-
-
-
- ipc.clients-per-factory
- 4
-
-
-
diff --git itests/hive-unit/pom.xml itests/hive-unit/pom.xml
index 97786d9..ae231de 100644
--- itests/hive-unit/pom.xml
+++ itests/hive-unit/pom.xml
@@ -75,6 +75,11 @@
hive-hcatalog-streaming
${project.version}
+
+ org.apache.hive
+ hive-it-util
+ ${project.version}
+
org.apache.hadoop
@@ -123,12 +128,6 @@
org.apache.hive
- hive-it-util
- ${project.version}
- test
-
-
- org.apache.hive
hive-jdbc
${project.version}
test
@@ -211,14 +210,12 @@
org.apache.hbase
hbase-server
${hbase.version}
- test
org.apache.hbase
hbase-server
${hbase.version}
test-jar
- test
org.apache.hbase
diff --git itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 751d8ea..8f5377f 100644
--- itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -31,6 +31,8 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapItUtils;
+import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.WindowsPathUtil;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -59,6 +61,7 @@
private static final AtomicLong hs2Counter = new AtomicLong();
private MiniMrShim mr;
private MiniDFSShim dfs;
+ private MiniLlapCluster llapCluster = null;
private final FileSystem localFS;
private boolean useMiniKdc = false;
private final String serverPrincipal;
@@ -186,13 +189,15 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM
// Initialize the execution engine based on cluster type
switch (miniClusterType) {
case TEZ:
- mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString, false);
+ mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString);
break;
case LLAP:
if (usePortsFromConf) {
hiveConf.setBoolean("minillap.usePortsFromConf", true);
}
- mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString, true);
+ llapCluster = LlapItUtils.startAndGetMiniLlapCluster(hiveConf, null, null);
+
+ mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString);
break;
case MR:
mr = ShimLoader.getHadoopShims().getMiniMrCluster(hiveConf, 4, uriString, 1);
@@ -284,6 +289,9 @@ public void stop() {
hiveServer2.stop();
setStarted(false);
try {
+ if (llapCluster != null) {
+ llapCluster.stop();
+ }
if (mr != null) {
mr.shutdown();
mr = null;
diff --git itests/util/pom.xml itests/util/pom.xml
index aaafc0a..4789586 100644
--- itests/util/pom.xml
+++ itests/util/pom.xml
@@ -97,6 +97,17 @@
tests
+ org.apache.hive
+ hive-llap-server
+ ${project.version}
+
+
+ org.apache.hive
+ hive-llap-server
+ ${project.version}
+ test-jar
+
+
org.apache.hadoop
hadoop-common
${hadoop.version}
diff --git itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java
new file mode 100644
index 0000000..c1a32c9
--- /dev/null
+++ itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
+import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapItUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LlapItUtils.class);
+
+ public static MiniLlapCluster startAndGetMiniLlapCluster(Configuration conf,
+ MiniZooKeeperCluster miniZkCluster,
+ String confDir) throws
+ IOException {
+ MiniLlapCluster llapCluster;
+ LOG.info("Using conf dir: {}", confDir);
+ if (confDir != null && !confDir.isEmpty()) {
+ conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
+ + "/tez-site.xml"));
+ }
+
+ Configuration daemonConf = new LlapDaemonConfiguration(conf);
+ final String clusterName = "llap";
+ final long maxMemory = LlapDaemon.getTotalHeapSize();
+ // 15% for io cache
+ final long memoryForCache = (long) (0.15f * maxMemory);
+ // 75% for 4 executors
+ final long totalExecutorMemory = (long) (0.75f * maxMemory);
+ final int numExecutors = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
+ final boolean asyncIOEnabled = true;
+ // enabling this will cause test failures in Mac OS X
+ final boolean directMemoryEnabled = false;
+ final int numLocalDirs = 1;
+ LOG.info("MiniLlap Configs - maxMemory: " + maxMemory +
+ " memoryForCache: " + memoryForCache
+ + " totalExecutorMemory: " + totalExecutorMemory + " numExecutors: " + numExecutors
+ + " asyncIOEnabled: " + asyncIOEnabled + " directMemoryEnabled: " + directMemoryEnabled
+ + " numLocalDirs: " + numLocalDirs);
+ llapCluster = MiniLlapCluster.create(clusterName,
+ miniZkCluster,
+ 1,
+ numExecutors,
+ totalExecutorMemory,
+ asyncIOEnabled,
+ directMemoryEnabled,
+ memoryForCache,
+ numLocalDirs);
+ llapCluster.init(daemonConf);
+ llapCluster.start();
+
+ // Augment conf with the settings from the started llap configuration.
+ Configuration llapConf = llapCluster.getClusterSpecificConfiguration();
+ Iterator> confIter = llapConf.iterator();
+ while (confIter.hasNext()) {
+ Map.Entry entry = confIter.next();
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ return llapCluster;
+ }
+
+}
diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 5e81e98..dddc6ca 100644
--- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -51,6 +51,7 @@
import java.util.Comparator;
import java.util.Deque;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -82,6 +83,10 @@
import org.apache.hadoop.hive.common.io.SortPrintStream;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapItUtils;
+import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
+import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.Index;
@@ -166,6 +171,7 @@
private HadoopShims.MiniMrShim mr = null;
private HadoopShims.MiniDFSShim dfs = null;
private HadoopShims.HdfsEncryptionShim hes = null;
+ private MiniLlapCluster llapCluster = null;
private String hadoopVer = null;
private QTestSetup setup = null;
private TezSessionState tezSessionState = null;
@@ -423,21 +429,19 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType,
fs = dfs.getFileSystem();
}
+ setup = new QTestSetup();
+ setup.preTest(conf);
+
String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString());
if (clusterType == MiniClusterType.tez) {
if (confDir != null && !confDir.isEmpty()) {
conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
+ "/tez-site.xml"));
}
- mr = shims.getMiniTezCluster(conf, 4, uriString, false);
+ mr = shims.getMiniTezCluster(conf, 4, uriString);
} else if (clusterType == MiniClusterType.llap) {
- if (confDir != null && !confDir.isEmpty()) {
- conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
- + "/tez-site.xml"));
- conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
- + "/llap-daemon-site.xml"));
- }
- mr = shims.getMiniTezCluster(conf, 2, uriString, true);
+ llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, setup.zooKeeperCluster, confDir);
+ mr = shims.getMiniTezCluster(conf, 2, uriString);
} else if (clusterType == MiniClusterType.miniSparkOnYarn) {
mr = shims.getMiniSparkCluster(conf, 4, uriString, 1);
} else {
@@ -470,8 +474,6 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType,
overWrite = "true".equalsIgnoreCase(System.getProperty("test.output.overwrite"));
- setup = new QTestSetup();
- setup.preTest(conf);
init();
}
diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index c611d1a..ba38fb8 100644
--- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -31,7 +31,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -68,8 +67,6 @@
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
@@ -285,8 +282,10 @@ public void register() throws IOException {
// No node exists, throw exception
throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper.");
}
- LOG.info("Created a znode on ZooKeeper for LLAP instance: {} znodePath: {}", rpcEndpoint,
- znodePath);
+ LOG.info(
+ "Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, shuffle: {}," +
+ " webui: {}, mgmt: {}, znodePath: {} ",
+ rpcEndpoint, getShuffleEndpoint(), getServicesEndpoint(), getMngEndpoint(), znodePath);
} catch (Exception e) {
LOG.error("Unable to create a znode for this server instance", e);
CloseableUtils.closeQuietly(znode);
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
index 51e8509..88f3b19 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java
@@ -14,6 +14,7 @@
package org.apache.hadoop.hive.llap.configuration;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -21,6 +22,7 @@
/**
* Configuration for LLAP daemon processes only. This should not be used by any clients.
*/
+@InterfaceAudience.Private
public class LlapDaemonConfiguration extends Configuration {
@InterfaceAudience.Private
@@ -46,4 +48,10 @@ public LlapDaemonConfiguration() {
}
addResource(LLAP_DAEMON_SITE);
}
+
+ @VisibleForTesting
+ public LlapDaemonConfiguration(Configuration conf) {
+ this();
+ addResource(conf);
+ }
}
\ No newline at end of file
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 22d7eec..57edffa 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -99,7 +99,7 @@
public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes,
boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort,
- int mngPort, int shufflePort) {
+ int mngPort, int shufflePort, int webPort) {
super("LlapDaemon");
initializeLogging();
@@ -139,6 +139,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor
"numExecutors=" + numExecutors +
", rpcListenerPort=" + srvPort +
", mngListenerPort=" + mngPort +
+ ", webPort=" + webPort +
", workDirs=" + Arrays.toString(localDirs) +
", shufflePort=" + shufflePort +
", executorMemory=" + executorMemoryBytes +
@@ -205,12 +206,11 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor
amReporter, executorClassLoader);
addIfService(containerRunner);
- this.registry = new LlapRegistryService(true);
- addIfService(registry);
+
if (HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.HIVE_IN_TEST)) {
this.webServices = null;
} else {
- this.webServices = new LlapWebServices();
+ this.webServices = new LlapWebServices(webPort);
addIfService(webServices);
}
// Bring up the server only after all other components have started.
@@ -218,6 +218,9 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor
// AMReporter after the server so that it gets the correct address. It knows how to deal with
// requests before it is started.
addIfService(amReporter);
+
+ // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties.
+ this.registry = new LlapRegistryService(true);
}
private void initializeLogging() {
@@ -288,11 +291,29 @@ public void serviceStart() throws Exception {
ShuffleHandler.initializeAndStart(shuffleHandlerConf);
LOG.info("Setting shuffle port to: " + ShuffleHandler.get().getPort());
this.shufflePort.set(ShuffleHandler.get().getPort());
+ getConfig()
+ .setInt(ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT.varname, ShuffleHandler.get().getPort());
super.serviceStart();
- LOG.info("LlapDaemon serviceStart complete");
+
+ // Setup the actual ports in the configuration.
+ getConfig().setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname, server.getBindAddress().getPort());
+ getConfig().setInt(ConfVars.LLAP_MANAGEMENT_RPC_PORT.varname, server.getManagementBindAddress().getPort());
+ if (webServices != null) {
+ getConfig().setInt(ConfVars.LLAP_DAEMON_WEB_PORT.varname, webServices.getPort());
+ }
+
+ this.registry.init(getConfig());
+ this.registry.start();
+ LOG.info(
+ "LlapDaemon serviceStart complete. RPC Port={}, ManagementPort={}, ShuflePort={}, WebPort={}",
+ server.getBindAddress().getPort(), server.getManagementBindAddress().getPort(),
+ ShuffleHandler.get().getPort(), (webServices == null ? "" : webServices.getPort()));
}
public void serviceStop() throws Exception {
+ if (registry != null) {
+ this.registry.stop();
+ }
super.serviceStop();
ShuffleHandler.shutdown();
shutdown();
@@ -343,6 +364,7 @@ public static void main(String[] args) throws Exception {
int mngPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_MANAGEMENT_RPC_PORT);
int shufflePort = daemonConf
.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
+ int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT);
long executorMemoryBytes = HiveConf.getIntVar(
daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l;
@@ -350,7 +372,7 @@ public static void main(String[] args) throws Exception {
boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT);
boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true);
llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo,
- isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort);
+ isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort);
LOG.info("Adding shutdown hook for LlapDaemon");
ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1);
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index 3a25a66..e99e689 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -19,7 +19,6 @@
import java.security.PrivilegedAction;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.google.protobuf.BlockingService;
@@ -189,11 +188,15 @@ public void serviceStop() {
}
@InterfaceAudience.Private
- @VisibleForTesting
InetSocketAddress getBindAddress() {
return srvAddress.get();
}
+ @InterfaceAudience.Private
+ InetSocketAddress getManagementBindAddress() {
+ return mngAddress.get();
+ }
+
private RPC.Server createServer(Class> pbProtocol, InetSocketAddress addr, Configuration conf,
int numHandlers, BlockingService blockingService) throws
IOException {
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index afb59c0..e4c622e 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -20,9 +20,9 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hive.http.HttpServer;
@@ -38,13 +38,14 @@
private boolean useSSL = false;
private boolean useSPNEGO = false;
- public LlapWebServices() {
+ public LlapWebServices(int port) {
super("LlapWebServices");
+ this.port = port;
}
@Override
public void serviceInit(Configuration conf) {
- this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
+
this.useSSL = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL);
this.useSPNEGO = HiveConf.getBoolVar(conf, ConfVars.LLAP_WEB_AUTO_AUTH);
String bindAddress = "0.0.0.0";
@@ -69,6 +70,11 @@ public void serviceInit(Configuration conf) {
}
}
+ @InterfaceAudience.Private
+ public int getPort() {
+ return this.http.getPort();
+ }
+
@Override
public void serviceStart() throws Exception {
if (this.http != null) {
@@ -76,6 +82,7 @@ public void serviceStart() throws Exception {
}
}
+ @Override
public void serviceStop() throws Exception {
if (this.http != null) {
this.http.stop();
diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index c920c24..9871702 100644
--- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -14,12 +14,11 @@
package org.apache.hadoop.hive.llap.daemon;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.Map;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -41,47 +40,57 @@
private static final Logger LOG = LoggerFactory.getLogger(MiniLlapCluster.class);
private final File testWorkDir;
+ private final String clusterNameTrimmed;
+ private final long numInstances;
private final long execBytesPerService;
private final boolean llapIoEnabled;
private final boolean ioIsDirect;
private final long ioBytesPerService;
private final int numExecutorsPerService;
+ private final File zkWorkDir;
private final String[] localDirs;
private final Configuration clusterSpecificConfiguration = new Configuration(false);
- private LlapDaemon llapDaemon;
+ private final LlapDaemon [] llapDaemons;
+ private MiniZooKeeperCluster miniZooKeeperCluster;
+ private final boolean ownZkCluster;
- public static MiniLlapCluster create(String clusterName, int numExecutorsPerService,
- long execBytePerService, boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService,
- int numLocalDirs) {
- return new MiniLlapCluster(clusterName, numExecutorsPerService, execBytePerService,
+
+ public static MiniLlapCluster create(String clusterName,
+ @Nullable MiniZooKeeperCluster miniZkCluster,
+ int numInstances,
+ int numExecutorsPerService,
+ long execBytePerService, boolean llapIoEnabled,
+ boolean ioIsDirect, long ioBytesPerService,
+ int numLocalDirs) {
+ return new MiniLlapCluster(clusterName, miniZkCluster, numInstances, numExecutorsPerService,
+ execBytePerService,
llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs);
}
- public static MiniLlapCluster createAndLaunch(Configuration conf, String clusterName,
- int numExecutorsPerService, long execBytePerService, boolean llapIoEnabled,
- boolean ioIsDirect, long ioBytesPerService, int numLocalDirs) {
- MiniLlapCluster miniLlapCluster = create(clusterName, numExecutorsPerService,
- execBytePerService, llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs);
- miniLlapCluster.init(conf);
- miniLlapCluster.start();
- Configuration llapConf = miniLlapCluster.getClusterSpecificConfiguration();
- Iterator> confIter = llapConf.iterator();
- while (confIter.hasNext()) {
- Map.Entry entry = confIter.next();
- conf.set(entry.getKey(), entry.getValue());
- }
- return miniLlapCluster;
+ public static MiniLlapCluster create(String clusterName,
+ @Nullable MiniZooKeeperCluster miniZkCluster,
+ int numExecutorsPerService,
+ long execBytePerService, boolean llapIoEnabled,
+ boolean ioIsDirect, long ioBytesPerService,
+ int numLocalDirs) {
+ return create(clusterName, miniZkCluster, 1, numExecutorsPerService, execBytePerService,
+ llapIoEnabled,
+ ioIsDirect, ioBytesPerService, numLocalDirs);
}
- // TODO Add support for multiple instances
- private MiniLlapCluster(String clusterName, int numExecutorsPerService, long execMemoryPerService,
- boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, int numLocalDirs) {
+ private MiniLlapCluster(String clusterName, @Nullable MiniZooKeeperCluster miniZkCluster,
+ int numInstances, int numExecutorsPerService, long execMemoryPerService,
+ boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService,
+ int numLocalDirs) {
super(clusterName + "_" + MiniLlapCluster.class.getSimpleName());
Preconditions.checkArgument(numExecutorsPerService > 0);
Preconditions.checkArgument(execMemoryPerService > 0);
Preconditions.checkArgument(numLocalDirs > 0);
- String clusterNameTrimmed = clusterName.replace("$", "") + "_" + MiniLlapCluster.class.getSimpleName();
+ this.numInstances = numInstances;
+
+ this.clusterNameTrimmed = clusterName.replace("$", "") + "_" + MiniLlapCluster.class.getSimpleName();
+ this.llapDaemons = new LlapDaemon[numInstances];
File targetWorkDir = new File("target", clusterNameTrimmed);
try {
FileContext.getLocalFSFileContext().delete(
@@ -123,8 +132,18 @@ private MiniLlapCluster(String clusterName, int numExecutorsPerService, long exe
this.testWorkDir = link;
} else {
+ targetWorkDir.mkdir();
this.testWorkDir = targetWorkDir;
}
+ if (miniZkCluster == null) {
+ ownZkCluster = true;
+ this.zkWorkDir = new File(testWorkDir, "mini-zk-cluster");
+ zkWorkDir.mkdir();
+ } else {
+ miniZooKeeperCluster = miniZkCluster;
+ ownZkCluster = false;
+ this.zkWorkDir = null;
+ }
this.numExecutorsPerService = numExecutorsPerService;
this.execBytesPerService = execMemoryPerService;
this.ioIsDirect = ioIsDirect;
@@ -142,54 +161,75 @@ private MiniLlapCluster(String clusterName, int numExecutorsPerService, long exe
}
@Override
- public void serviceInit(Configuration conf) {
+ public void serviceInit(Configuration conf) throws IOException, InterruptedException {
int rpcPort = 0;
int mngPort = 0;
int shufflePort = 0;
+ int webPort = 0;
boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", false);
+ LOG.info("MiniLlap configured to use ports from conf: {}", usePortsFromConf);
if (usePortsFromConf) {
rpcPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT);
mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT);
shufflePort = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
+ webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
}
- llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
- ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort);
- llapDaemon.init(conf);
+ if (ownZkCluster) {
+ miniZooKeeperCluster = new MiniZooKeeperCluster();
+ miniZooKeeperCluster.startup(zkWorkDir);
+ } else {
+ // Already setup in the create method
+ }
+
+ conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + clusterNameTrimmed);
+ conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, "localhost");
+ conf.setInt(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, miniZooKeeperCluster.getClientPort());
+
+ LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
+ for (int i = 0 ;i < numInstances ; i++) {
+ llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled,
+ ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort);
+ llapDaemons[i].init(new Configuration(conf));
+ }
+ LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
}
@Override
public void serviceStart() {
- llapDaemon.start();
-
- clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname,
- getServiceAddress().getHostName());
- clusterSpecificConfiguration.setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname,
- getServiceAddress().getPort());
-
- clusterSpecificConfiguration.setInt(
- ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
- numExecutorsPerService);
- clusterSpecificConfiguration.setLong(
- ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, execBytesPerService);
+ LOG.info("Starting {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
+ for (int i = 0 ;i < numInstances ; i++) {
+ llapDaemons[i].start();
+ }
+ LOG.info("Started {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);
+
// Optimize local fetch does not work with LLAP due to different local directories
// used by containers and LLAP
clusterSpecificConfiguration
.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
+ clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + clusterNameTrimmed);
}
@Override
- public void serviceStop() {
- if (llapDaemon != null) {
- llapDaemon.stop();
- llapDaemon = null;
+ public void serviceStop() throws IOException {
+ for (int i = 0 ; i < numInstances ; i++) {
+ if (llapDaemons[i] != null) {
+ llapDaemons[i].stop();
+ llapDaemons[i] = null;
+ }
+ }
+ if (ownZkCluster) {
+ if (miniZooKeeperCluster != null) {
+ LOG.info("Stopping MiniZooKeeper cluster");
+ miniZooKeeperCluster.shutdown();
+ miniZooKeeperCluster = null;
+ LOG.info("Stopped MiniZooKeeper cluster");
+ }
+ } else {
+ LOG.info("Not stopping MiniZK cluster since it is now owned by us");
}
}
- private InetSocketAddress getServiceAddress() {
- Preconditions.checkState(getServiceState() == Service.STATE.STARTED);
- return llapDaemon.getListenerAddress();
- }
public Configuration getClusterSpecificConfiguration() {
Preconditions.checkState(getServiceState() == Service.STATE.STARTED);
@@ -198,7 +238,10 @@ public Configuration getClusterSpecificConfiguration() {
// Mainly for verification
public long getNumSubmissions() {
- return llapDaemon.getNumSubmissions();
+ int numSubmissions = 0;
+ for (int i = 0 ; i < numInstances ; i++) {
+ numSubmissions += llapDaemons[i].getNumSubmissions();
+ }
+ return numSubmissions;
}
-
}
diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 9a3a31c..e028212 100644
--- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -370,8 +370,8 @@ public void setupConfiguration(Configuration conf) {
*/
@Override
public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers,
- String nameNode, boolean isLlap) throws IOException {
- return new MiniTezShim(conf, numberOfTaskTrackers, nameNode, isLlap);
+ String nameNode) throws IOException {
+ return new MiniTezShim(conf, numberOfTaskTrackers, nameNode);
}
/**
@@ -381,11 +381,8 @@ public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers
private final MiniTezCluster mr;
private final Configuration conf;
- private Class> miniLlapKlass;
- private Object miniLlapCluster;
- public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode,
- boolean isLlap) throws IOException {
+ public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode) throws IOException {
mr = new MiniTezCluster("hive", numberOfTaskTrackers);
conf.set("fs.defaultFS", nameNode);
conf.set("tez.am.log.level", "DEBUG");
@@ -393,54 +390,6 @@ public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode
mr.init(conf);
mr.start();
this.conf = mr.getConfig();
- if (isLlap) {
- createAndLaunchLlapDaemon(this.conf);
- } else {
- miniLlapCluster = null;
- }
- }
-
- private void createAndLaunchLlapDaemon(final Configuration conf)
- throws IOException {
- try {
- final String clusterName = "llap";
- Class> llapDaemonKlass =
- Class.forName("org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon",
- false, ShimLoader.class.getClassLoader());
- Method totalMemMethod = llapDaemonKlass.getMethod("getTotalHeapSize");
- final long maxMemory = (long) totalMemMethod.invoke(null);
- // 15% for io cache
- final long memoryForCache = (long) (0.15f * maxMemory);
- // 75% for executors
- final long totalExecutorMemory = (long) (0.75f * maxMemory);
- final int numExecutors = conf.getInt("llap.daemon.num.executors", 4);
- final boolean asyncIOEnabled = true;
- // enabling this will cause test failures in Mac OS X
- final boolean directMemoryEnabled = false;
- final int numLocalDirs = 1;
- LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + " memoryForCache: " + memoryForCache
- + " totalExecutorMemory: " + totalExecutorMemory + " numExecutors: " + numExecutors
- + " asyncIOEnabled: " + asyncIOEnabled + " directMemoryEnabled: " + directMemoryEnabled
- + " numLocalDirs: " + numLocalDirs);
-
- miniLlapKlass = Class.forName("org.apache.hadoop.hive.llap.daemon.MiniLlapCluster",
- false, ShimLoader.class.getClassLoader());
- Method create = miniLlapKlass.getMethod("createAndLaunch", new Class[]{Configuration.class,
- String.class, Integer.TYPE, Long.TYPE, Boolean.TYPE, Boolean.TYPE,
- Long.TYPE, Integer.TYPE});
- miniLlapCluster = create.invoke(null,
- conf,
- clusterName,
- numExecutors,
- totalExecutorMemory,
- asyncIOEnabled,
- directMemoryEnabled,
- memoryForCache,
- numLocalDirs);
- } catch (Exception e) {
- LOG.error("Unable to create MiniLlapCluster. Exception: " + e.getMessage());
- throw new IOException(e);
- }
}
@Override
@@ -458,15 +407,6 @@ public int getJobTrackerPort() throws UnsupportedOperationException {
@Override
public void shutdown() throws IOException {
mr.stop();
-
- if (miniLlapKlass != null && miniLlapCluster != null) {
- try {
- Method stop = miniLlapKlass.getMethod("stop", new Class[]{});
- stop.invoke(miniLlapCluster);
- } catch (Exception e) {
- LOG.error("Unable to stop llap daemon. Exception: " + e.getMessage());
- }
- }
}
@Override
diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 7a5a9b5..a44d0c0 100644
--- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -90,7 +90,7 @@ public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers,
String nameNode, int numDir) throws IOException;
public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers,
- String nameNode, boolean isLlap) throws IOException;
+ String nameNode) throws IOException;
public MiniMrShim getMiniSparkCluster(Configuration conf, int numberOfTaskTrackers,
String nameNode, int numDir) throws IOException;