diff --git data/conf/llap/hive-site.xml data/conf/llap/hive-site.xml index c2bef58..72bdcfb 100644 --- data/conf/llap/hive-site.xml +++ data/conf/llap/hive-site.xml @@ -273,4 +273,48 @@ false + + + + hive.llap.daemon.service.hosts + localhost + + + + hive.llap.daemon.service.port + 0 + + + + hive.llap.daemon.num.executors + 4 + + + + hive.llap.daemon.task.scheduler.wait.queue.size + 4 + + + + hive.llap.cache.allow.synthetic.fileid + true + + + + + ipc.client.low-latency + true + + + + ipc.client.tcpnodelay + true + + + + ipc.clients-per-factory + 4 + + + diff --git data/conf/llap/llap-daemon-site.xml data/conf/llap/llap-daemon-site.xml deleted file mode 100644 index 98c0f2b..0000000 --- data/conf/llap/llap-daemon-site.xml +++ /dev/null @@ -1,61 +0,0 @@ - - - - - - hive.llap.daemon.service.hosts - localhost - - - - hive.llap.daemon.service.port - 0 - - - - hive.llap.daemon.num.executors - 4 - - - - hive.llap.daemon.task.scheduler.wait.queue.size - 4 - - - - hive.llap.cache.allow.synthetic.fileid - true - - - - - ipc.client.low-latency - true - - - - ipc.client.tcpnodelay - true - - - - ipc.clients-per-factory - 4 - - - diff --git itests/hive-unit/pom.xml itests/hive-unit/pom.xml index 97786d9..ae231de 100644 --- itests/hive-unit/pom.xml +++ itests/hive-unit/pom.xml @@ -75,6 +75,11 @@ hive-hcatalog-streaming ${project.version} + + org.apache.hive + hive-it-util + ${project.version} + org.apache.hadoop @@ -123,12 +128,6 @@ org.apache.hive - hive-it-util - ${project.version} - test - - - org.apache.hive hive-jdbc ${project.version} test @@ -211,14 +210,12 @@ org.apache.hbase hbase-server ${hbase.version} - test org.apache.hbase hbase-server ${hbase.version} test-jar - test org.apache.hbase diff --git itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 751d8ea..8f5377f 100644 --- itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -31,6 +31,8 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapItUtils; +import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.WindowsPathUtil; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -59,6 +61,7 @@ private static final AtomicLong hs2Counter = new AtomicLong(); private MiniMrShim mr; private MiniDFSShim dfs; + private MiniLlapCluster llapCluster = null; private final FileSystem localFS; private boolean useMiniKdc = false; private final String serverPrincipal; @@ -186,13 +189,15 @@ private MiniHS2(HiveConf hiveConf, MiniClusterType miniClusterType, boolean useM // Initialize the execution engine based on cluster type switch (miniClusterType) { case TEZ: - mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString, false); + mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString); break; case LLAP: if (usePortsFromConf) { hiveConf.setBoolean("minillap.usePortsFromConf", true); } - mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString, true); + llapCluster = LlapItUtils.startAndGetMiniLlapCluster(hiveConf, null, null); + + mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString); break; case MR: mr = ShimLoader.getHadoopShims().getMiniMrCluster(hiveConf, 4, uriString, 1); @@ -284,6 +289,9 @@ public void stop() { hiveServer2.stop(); setStarted(false); try { + if (llapCluster != null) { + llapCluster.stop(); + } if (mr != null) { mr.shutdown(); mr = null; diff --git itests/util/pom.xml itests/util/pom.xml index aaafc0a..4789586 100644 --- itests/util/pom.xml +++ itests/util/pom.xml @@ -97,6 +97,17 @@ tests + org.apache.hive + hive-llap-server + ${project.version} + + + org.apache.hive + hive-llap-server + ${project.version} + test-jar + + org.apache.hadoop hadoop-common ${hadoop.version} diff --git itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java new file mode 100644 index 0000000..c1a32c9 --- /dev/null +++ itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; +import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LlapItUtils { + + private static final Logger LOG = LoggerFactory.getLogger(LlapItUtils.class); + + public static MiniLlapCluster startAndGetMiniLlapCluster(Configuration conf, + MiniZooKeeperCluster miniZkCluster, + String confDir) throws + IOException { + MiniLlapCluster llapCluster; + LOG.info("Using conf dir: {}", confDir); + if (confDir != null && !confDir.isEmpty()) { + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + } + + Configuration daemonConf = new LlapDaemonConfiguration(conf); + final String clusterName = "llap"; + final long maxMemory = LlapDaemon.getTotalHeapSize(); + // 15% for io cache + final long memoryForCache = (long) (0.15f * maxMemory); + // 75% for 4 executors + final long totalExecutorMemory = (long) (0.75f * maxMemory); + final int numExecutors = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + final boolean asyncIOEnabled = true; + // enabling this will cause test failures in Mac OS X + final boolean directMemoryEnabled = false; + final int numLocalDirs = 1; + LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + + " memoryForCache: " + memoryForCache + + " totalExecutorMemory: " + totalExecutorMemory + " numExecutors: " + numExecutors + + " asyncIOEnabled: " + asyncIOEnabled + " directMemoryEnabled: " + directMemoryEnabled + + " numLocalDirs: " + numLocalDirs); + llapCluster = MiniLlapCluster.create(clusterName, + miniZkCluster, + 1, + numExecutors, + totalExecutorMemory, + asyncIOEnabled, + directMemoryEnabled, + memoryForCache, + numLocalDirs); + llapCluster.init(daemonConf); + llapCluster.start(); + + // Augment conf with the settings from the started llap configuration. + Configuration llapConf = llapCluster.getClusterSpecificConfiguration(); + Iterator> confIter = llapConf.iterator(); + while (confIter.hasNext()) { + Map.Entry entry = confIter.next(); + conf.set(entry.getKey(), entry.getValue()); + } + return llapCluster; + } + +} diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 5e81e98..dddc6ca 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -51,6 +51,7 @@ import java.util.Comparator; import java.util.Deque; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -82,6 +83,10 @@ import org.apache.hadoop.hive.common.io.SortPrintStream; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapItUtils; +import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; +import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Index; @@ -166,6 +171,7 @@ private HadoopShims.MiniMrShim mr = null; private HadoopShims.MiniDFSShim dfs = null; private HadoopShims.HdfsEncryptionShim hes = null; + private MiniLlapCluster llapCluster = null; private String hadoopVer = null; private QTestSetup setup = null; private TezSessionState tezSessionState = null; @@ -423,21 +429,19 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, fs = dfs.getFileSystem(); } + setup = new QTestSetup(); + setup.preTest(conf); + String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString()); if (clusterType == MiniClusterType.tez) { if (confDir != null && !confDir.isEmpty()) { conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); } - mr = shims.getMiniTezCluster(conf, 4, uriString, false); + mr = shims.getMiniTezCluster(conf, 4, uriString); } else if (clusterType == MiniClusterType.llap) { - if (confDir != null && !confDir.isEmpty()) { - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() - + "/tez-site.xml")); - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() - + "/llap-daemon-site.xml")); - } - mr = shims.getMiniTezCluster(conf, 2, uriString, true); + llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, setup.zooKeeperCluster, confDir); + mr = shims.getMiniTezCluster(conf, 2, uriString); } else if (clusterType == MiniClusterType.miniSparkOnYarn) { mr = shims.getMiniSparkCluster(conf, 4, uriString, 1); } else { @@ -470,8 +474,6 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, overWrite = "true".equalsIgnoreCase(System.getProperty("test.output.overwrite")); - setup = new QTestSetup(); - setup.preTest(conf); init(); } diff --git llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index c611d1a..ba38fb8 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -68,8 +67,6 @@ import org.apache.hadoop.security.authentication.util.KerberosUtil; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.client.ZooKeeperSaslClient; import org.apache.zookeeper.data.ACL; @@ -285,8 +282,10 @@ public void register() throws IOException { // No node exists, throw exception throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper."); } - LOG.info("Created a znode on ZooKeeper for LLAP instance: {} znodePath: {}", rpcEndpoint, - znodePath); + LOG.info( + "Registered node. Created a znode on ZooKeeper for LLAP instance: rpc: {}, shuffle: {}," + + " webui: {}, mgmt: {}, znodePath: {} ", + rpcEndpoint, getShuffleEndpoint(), getServicesEndpoint(), getMngEndpoint(), znodePath); } catch (Exception e) { LOG.error("Unable to create a znode for this server instance", e); CloseableUtils.closeQuietly(znode); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java index 51e8509..88f3b19 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/configuration/LlapDaemonConfiguration.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.configuration; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; @@ -21,6 +22,7 @@ /** * Configuration for LLAP daemon processes only. This should not be used by any clients. */ +@InterfaceAudience.Private public class LlapDaemonConfiguration extends Configuration { @InterfaceAudience.Private @@ -46,4 +48,10 @@ public LlapDaemonConfiguration() { } addResource(LLAP_DAEMON_SITE); } + + @VisibleForTesting + public LlapDaemonConfiguration(Configuration conf) { + this(); + addResource(conf); + } } \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 22d7eec..57edffa 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -99,7 +99,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes, boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort, - int mngPort, int shufflePort) { + int mngPort, int shufflePort, int webPort) { super("LlapDaemon"); initializeLogging(); @@ -139,6 +139,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor "numExecutors=" + numExecutors + ", rpcListenerPort=" + srvPort + ", mngListenerPort=" + mngPort + + ", webPort=" + webPort + ", workDirs=" + Arrays.toString(localDirs) + ", shufflePort=" + shufflePort + ", executorMemory=" + executorMemoryBytes + @@ -205,12 +206,11 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor amReporter, executorClassLoader); addIfService(containerRunner); - this.registry = new LlapRegistryService(true); - addIfService(registry); + if (HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.HIVE_IN_TEST)) { this.webServices = null; } else { - this.webServices = new LlapWebServices(); + this.webServices = new LlapWebServices(webPort); addIfService(webServices); } // Bring up the server only after all other components have started. @@ -218,6 +218,9 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor // AMReporter after the server so that it gets the correct address. It knows how to deal with // requests before it is started. addIfService(amReporter); + + // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties. + this.registry = new LlapRegistryService(true); } private void initializeLogging() { @@ -288,11 +291,29 @@ public void serviceStart() throws Exception { ShuffleHandler.initializeAndStart(shuffleHandlerConf); LOG.info("Setting shuffle port to: " + ShuffleHandler.get().getPort()); this.shufflePort.set(ShuffleHandler.get().getPort()); + getConfig() + .setInt(ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT.varname, ShuffleHandler.get().getPort()); super.serviceStart(); - LOG.info("LlapDaemon serviceStart complete"); + + // Setup the actual ports in the configuration. + getConfig().setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname, server.getBindAddress().getPort()); + getConfig().setInt(ConfVars.LLAP_MANAGEMENT_RPC_PORT.varname, server.getManagementBindAddress().getPort()); + if (webServices != null) { + getConfig().setInt(ConfVars.LLAP_DAEMON_WEB_PORT.varname, webServices.getPort()); + } + + this.registry.init(getConfig()); + this.registry.start(); + LOG.info( + "LlapDaemon serviceStart complete. RPC Port={}, ManagementPort={}, ShuflePort={}, WebPort={}", + server.getBindAddress().getPort(), server.getManagementBindAddress().getPort(), + ShuffleHandler.get().getPort(), (webServices == null ? "" : webServices.getPort())); } public void serviceStop() throws Exception { + if (registry != null) { + this.registry.stop(); + } super.serviceStop(); ShuffleHandler.shutdown(); shutdown(); @@ -343,6 +364,7 @@ public static void main(String[] args) throws Exception { int mngPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_MANAGEMENT_RPC_PORT); int shufflePort = daemonConf .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); + int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT); long executorMemoryBytes = HiveConf.getIntVar( daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l; @@ -350,7 +372,7 @@ public static void main(String[] args) throws Exception { boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT); boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true); llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo, - isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort); + isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort); LOG.info("Adding shutdown hook for LlapDaemon"); ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java index 3a25a66..e99e689 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java @@ -19,7 +19,6 @@ import java.security.PrivilegedAction; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; import com.google.protobuf.BlockingService; @@ -189,11 +188,15 @@ public void serviceStop() { } @InterfaceAudience.Private - @VisibleForTesting InetSocketAddress getBindAddress() { return srvAddress.get(); } + @InterfaceAudience.Private + InetSocketAddress getManagementBindAddress() { + return mngAddress.get(); + } + private RPC.Server createServer(Class pbProtocol, InetSocketAddress addr, Configuration conf, int numHandlers, BlockingService blockingService) throws IOException { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java index afb59c0..e4c622e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java @@ -20,9 +20,9 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hive.http.HttpServer; @@ -38,13 +38,14 @@ private boolean useSSL = false; private boolean useSPNEGO = false; - public LlapWebServices() { + public LlapWebServices(int port) { super("LlapWebServices"); + this.port = port; } @Override public void serviceInit(Configuration conf) { - this.port = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); + this.useSSL = HiveConf.getBoolVar(conf, ConfVars.LLAP_DAEMON_WEB_SSL); this.useSPNEGO = HiveConf.getBoolVar(conf, ConfVars.LLAP_WEB_AUTO_AUTH); String bindAddress = "0.0.0.0"; @@ -69,6 +70,11 @@ public void serviceInit(Configuration conf) { } } + @InterfaceAudience.Private + public int getPort() { + return this.http.getPort(); + } + @Override public void serviceStart() throws Exception { if (this.http != null) { @@ -76,6 +82,7 @@ public void serviceStart() throws Exception { } } + @Override public void serviceStop() throws Exception { if (this.http != null) { this.http.stop(); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index c920c24..9871702 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -14,12 +14,11 @@ package org.apache.hadoop.hive.llap.daemon; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Iterator; -import java.util.Map; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -41,47 +40,57 @@ private static final Logger LOG = LoggerFactory.getLogger(MiniLlapCluster.class); private final File testWorkDir; + private final String clusterNameTrimmed; + private final long numInstances; private final long execBytesPerService; private final boolean llapIoEnabled; private final boolean ioIsDirect; private final long ioBytesPerService; private final int numExecutorsPerService; + private final File zkWorkDir; private final String[] localDirs; private final Configuration clusterSpecificConfiguration = new Configuration(false); - private LlapDaemon llapDaemon; + private final LlapDaemon [] llapDaemons; + private MiniZooKeeperCluster miniZooKeeperCluster; + private final boolean ownZkCluster; - public static MiniLlapCluster create(String clusterName, int numExecutorsPerService, - long execBytePerService, boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, - int numLocalDirs) { - return new MiniLlapCluster(clusterName, numExecutorsPerService, execBytePerService, + + public static MiniLlapCluster create(String clusterName, + @Nullable MiniZooKeeperCluster miniZkCluster, + int numInstances, + int numExecutorsPerService, + long execBytePerService, boolean llapIoEnabled, + boolean ioIsDirect, long ioBytesPerService, + int numLocalDirs) { + return new MiniLlapCluster(clusterName, miniZkCluster, numInstances, numExecutorsPerService, + execBytePerService, llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs); } - public static MiniLlapCluster createAndLaunch(Configuration conf, String clusterName, - int numExecutorsPerService, long execBytePerService, boolean llapIoEnabled, - boolean ioIsDirect, long ioBytesPerService, int numLocalDirs) { - MiniLlapCluster miniLlapCluster = create(clusterName, numExecutorsPerService, - execBytePerService, llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs); - miniLlapCluster.init(conf); - miniLlapCluster.start(); - Configuration llapConf = miniLlapCluster.getClusterSpecificConfiguration(); - Iterator> confIter = llapConf.iterator(); - while (confIter.hasNext()) { - Map.Entry entry = confIter.next(); - conf.set(entry.getKey(), entry.getValue()); - } - return miniLlapCluster; + public static MiniLlapCluster create(String clusterName, + @Nullable MiniZooKeeperCluster miniZkCluster, + int numExecutorsPerService, + long execBytePerService, boolean llapIoEnabled, + boolean ioIsDirect, long ioBytesPerService, + int numLocalDirs) { + return create(clusterName, miniZkCluster, 1, numExecutorsPerService, execBytePerService, + llapIoEnabled, + ioIsDirect, ioBytesPerService, numLocalDirs); } - // TODO Add support for multiple instances - private MiniLlapCluster(String clusterName, int numExecutorsPerService, long execMemoryPerService, - boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, int numLocalDirs) { + private MiniLlapCluster(String clusterName, @Nullable MiniZooKeeperCluster miniZkCluster, + int numInstances, int numExecutorsPerService, long execMemoryPerService, + boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, + int numLocalDirs) { super(clusterName + "_" + MiniLlapCluster.class.getSimpleName()); Preconditions.checkArgument(numExecutorsPerService > 0); Preconditions.checkArgument(execMemoryPerService > 0); Preconditions.checkArgument(numLocalDirs > 0); - String clusterNameTrimmed = clusterName.replace("$", "") + "_" + MiniLlapCluster.class.getSimpleName(); + this.numInstances = numInstances; + + this.clusterNameTrimmed = clusterName.replace("$", "") + "_" + MiniLlapCluster.class.getSimpleName(); + this.llapDaemons = new LlapDaemon[numInstances]; File targetWorkDir = new File("target", clusterNameTrimmed); try { FileContext.getLocalFSFileContext().delete( @@ -123,8 +132,18 @@ private MiniLlapCluster(String clusterName, int numExecutorsPerService, long exe this.testWorkDir = link; } else { + targetWorkDir.mkdir(); this.testWorkDir = targetWorkDir; } + if (miniZkCluster == null) { + ownZkCluster = true; + this.zkWorkDir = new File(testWorkDir, "mini-zk-cluster"); + zkWorkDir.mkdir(); + } else { + miniZooKeeperCluster = miniZkCluster; + ownZkCluster = false; + this.zkWorkDir = null; + } this.numExecutorsPerService = numExecutorsPerService; this.execBytesPerService = execMemoryPerService; this.ioIsDirect = ioIsDirect; @@ -142,54 +161,75 @@ private MiniLlapCluster(String clusterName, int numExecutorsPerService, long exe } @Override - public void serviceInit(Configuration conf) { + public void serviceInit(Configuration conf) throws IOException, InterruptedException { int rpcPort = 0; int mngPort = 0; int shufflePort = 0; + int webPort = 0; boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", false); + LOG.info("MiniLlap configured to use ports from conf: {}", usePortsFromConf); if (usePortsFromConf) { rpcPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT); mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT); shufflePort = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); + webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); } - llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, - ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort); - llapDaemon.init(conf); + if (ownZkCluster) { + miniZooKeeperCluster = new MiniZooKeeperCluster(); + miniZooKeeperCluster.startup(zkWorkDir); + } else { + // Already setup in the create method + } + + conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + clusterNameTrimmed); + conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, "localhost"); + conf.setInt(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, miniZooKeeperCluster.getClientPort()); + + LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); + for (int i = 0 ;i < numInstances ; i++) { + llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, + ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort); + llapDaemons[i].init(new Configuration(conf)); + } + LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); } @Override public void serviceStart() { - llapDaemon.start(); - - clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, - getServiceAddress().getHostName()); - clusterSpecificConfiguration.setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname, - getServiceAddress().getPort()); - - clusterSpecificConfiguration.setInt( - ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, - numExecutorsPerService); - clusterSpecificConfiguration.setLong( - ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, execBytesPerService); + LOG.info("Starting {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); + for (int i = 0 ;i < numInstances ; i++) { + llapDaemons[i].start(); + } + LOG.info("Started {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); + // Optimize local fetch does not work with LLAP due to different local directories // used by containers and LLAP clusterSpecificConfiguration .setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); + clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + clusterNameTrimmed); } @Override - public void serviceStop() { - if (llapDaemon != null) { - llapDaemon.stop(); - llapDaemon = null; + public void serviceStop() throws IOException { + for (int i = 0 ; i < numInstances ; i++) { + if (llapDaemons[i] != null) { + llapDaemons[i].stop(); + llapDaemons[i] = null; + } + } + if (ownZkCluster) { + if (miniZooKeeperCluster != null) { + LOG.info("Stopping MiniZooKeeper cluster"); + miniZooKeeperCluster.shutdown(); + miniZooKeeperCluster = null; + LOG.info("Stopped MiniZooKeeper cluster"); + } + } else { + LOG.info("Not stopping MiniZK cluster since it is now owned by us"); } } - private InetSocketAddress getServiceAddress() { - Preconditions.checkState(getServiceState() == Service.STATE.STARTED); - return llapDaemon.getListenerAddress(); - } public Configuration getClusterSpecificConfiguration() { Preconditions.checkState(getServiceState() == Service.STATE.STARTED); @@ -198,7 +238,10 @@ public Configuration getClusterSpecificConfiguration() { // Mainly for verification public long getNumSubmissions() { - return llapDaemon.getNumSubmissions(); + int numSubmissions = 0; + for (int i = 0 ; i < numInstances ; i++) { + numSubmissions += llapDaemons[i].getNumSubmissions(); + } + return numSubmissions; } - } diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 9a3a31c..e028212 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -370,8 +370,8 @@ public void setupConfiguration(Configuration conf) { */ @Override public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode, boolean isLlap) throws IOException { - return new MiniTezShim(conf, numberOfTaskTrackers, nameNode, isLlap); + String nameNode) throws IOException { + return new MiniTezShim(conf, numberOfTaskTrackers, nameNode); } /** @@ -381,11 +381,8 @@ public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers private final MiniTezCluster mr; private final Configuration conf; - private Class miniLlapKlass; - private Object miniLlapCluster; - public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode, - boolean isLlap) throws IOException { + public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode) throws IOException { mr = new MiniTezCluster("hive", numberOfTaskTrackers); conf.set("fs.defaultFS", nameNode); conf.set("tez.am.log.level", "DEBUG"); @@ -393,54 +390,6 @@ public MiniTezShim(Configuration conf, int numberOfTaskTrackers, String nameNode mr.init(conf); mr.start(); this.conf = mr.getConfig(); - if (isLlap) { - createAndLaunchLlapDaemon(this.conf); - } else { - miniLlapCluster = null; - } - } - - private void createAndLaunchLlapDaemon(final Configuration conf) - throws IOException { - try { - final String clusterName = "llap"; - Class llapDaemonKlass = - Class.forName("org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon", - false, ShimLoader.class.getClassLoader()); - Method totalMemMethod = llapDaemonKlass.getMethod("getTotalHeapSize"); - final long maxMemory = (long) totalMemMethod.invoke(null); - // 15% for io cache - final long memoryForCache = (long) (0.15f * maxMemory); - // 75% for executors - final long totalExecutorMemory = (long) (0.75f * maxMemory); - final int numExecutors = conf.getInt("llap.daemon.num.executors", 4); - final boolean asyncIOEnabled = true; - // enabling this will cause test failures in Mac OS X - final boolean directMemoryEnabled = false; - final int numLocalDirs = 1; - LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + " memoryForCache: " + memoryForCache - + " totalExecutorMemory: " + totalExecutorMemory + " numExecutors: " + numExecutors - + " asyncIOEnabled: " + asyncIOEnabled + " directMemoryEnabled: " + directMemoryEnabled - + " numLocalDirs: " + numLocalDirs); - - miniLlapKlass = Class.forName("org.apache.hadoop.hive.llap.daemon.MiniLlapCluster", - false, ShimLoader.class.getClassLoader()); - Method create = miniLlapKlass.getMethod("createAndLaunch", new Class[]{Configuration.class, - String.class, Integer.TYPE, Long.TYPE, Boolean.TYPE, Boolean.TYPE, - Long.TYPE, Integer.TYPE}); - miniLlapCluster = create.invoke(null, - conf, - clusterName, - numExecutors, - totalExecutorMemory, - asyncIOEnabled, - directMemoryEnabled, - memoryForCache, - numLocalDirs); - } catch (Exception e) { - LOG.error("Unable to create MiniLlapCluster. Exception: " + e.getMessage()); - throw new IOException(e); - } } @Override @@ -458,15 +407,6 @@ public int getJobTrackerPort() throws UnsupportedOperationException { @Override public void shutdown() throws IOException { mr.stop(); - - if (miniLlapKlass != null && miniLlapCluster != null) { - try { - Method stop = miniLlapKlass.getMethod("stop", new Class[]{}); - stop.invoke(miniLlapCluster); - } catch (Exception e) { - LOG.error("Unable to stop llap daemon. Exception: " + e.getMessage()); - } - } } @Override diff --git shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 7a5a9b5..a44d0c0 100644 --- shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -90,7 +90,7 @@ public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException; public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, - String nameNode, boolean isLlap) throws IOException; + String nameNode) throws IOException; public MiniMrShim getMiniSparkCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException;