diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java index 6f533df..0fa62da 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -34,6 +34,13 @@ import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; @@ -71,6 +78,7 @@ import org.json.JSONObject; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; public class LlapServiceDriver { @@ -152,12 +160,22 @@ static void populateConfWithLlapProperties(Configuration conf, Properties proper } } } + + private static abstract class NamedCallable implements Callable { + public final String taskName; + public NamedCallable (String name) { + this.taskName = name; + } + public String getName() { + return taskName; + } + } private void run(String[] args) throws Exception { LlapOptionsProcessor optionsProcessor = new LlapOptionsProcessor(); - LlapOptions options = optionsProcessor.processOptions(args); + final LlapOptions options = optionsProcessor.processOptions(args); - Properties propsDirectOptions = new Properties(); + final Properties propsDirectOptions = new Properties(); if (options == null) { // help @@ -170,347 +188,419 @@ private void run(String[] args) throws Exception { if (conf == null) { throw new Exception("Cannot load any configuration to run command"); } + + final long t0 = System.nanoTime(); - FileSystem fs = FileSystem.get(conf); - FileSystem lfs = FileSystem.getLocal(conf).getRawFileSystem(); - - // needed so that the file is actually loaded into configuration. - for (String f : NEEDED_CONFIGS) { - conf.addResource(f); - if (conf.getResource(f) == null) { - throw new Exception("Unable to find required config file: " + f); - } - } - for (String f : OPTIONAL_CONFIGS) { - conf.addResource(f); - } - - conf.reloadConfiguration(); - - populateConfWithLlapProperties(conf, options.getConfig()); - + final FileSystem fs = FileSystem.get(conf); + final FileSystem lfs = FileSystem.getLocal(conf).getRawFileSystem(); - if (options.getName() != null) { - // update service registry configs - caveat: this has nothing to do with the actual settings - // as read by the AM - // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between - // instances - conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + options.getName()); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, - "@" + options.getName()); - } + final ExecutorService executor = + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2, + new ThreadFactoryBuilder().setNameFormat("llap-pkg-%d").build()); + final CompletionService asyncRunner = new ExecutorCompletionService(executor); - if (options.getLogger() != null) { - HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, options.getLogger()); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, options.getLogger()); - } + try { - if (options.getSize() != -1) { - if (options.getCache() != -1) { - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false) { - // direct heap allocations need to be safer - Preconditions.checkArgument(options.getCache() < options.getSize(), - "Cache size (" + humanReadableByteCount(options.getCache()) + ") has to be smaller" + - " than the container sizing (" + humanReadableByteCount(options.getSize()) + ")"); - } else if (options.getCache() < options.getSize()) { - LOG.warn("Note that this might need YARN physical memory monitoring to be turned off " - + "(yarn.nodemanager.pmem-check-enabled=false)"); + // needed so that the file is actually loaded into configuration. + for (String f : NEEDED_CONFIGS) { + conf.addResource(f); + if (conf.getResource(f) == null) { + throw new Exception("Unable to find required config file: " + f); } } - if (options.getXmx() != -1) { - Preconditions.checkArgument(options.getXmx() < options.getSize(), - "Working memory (Xmx=" + humanReadableByteCount(options.getXmx()) + ") has to be" + - " smaller than the container sizing (" + - humanReadableByteCount(options.getSize()) + ")"); - } - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT) - && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) { - // direct and not memory mapped - Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(), - "Working memory + cache (Xmx="+ humanReadableByteCount(options.getXmx()) + - " + cache=" + humanReadableByteCount(options.getCache()) + ")" - + " has to be smaller than the container sizing (" + - humanReadableByteCount(options.getSize()) + ")"); + for (String f : OPTIONAL_CONFIGS) { + conf.addResource(f); } - } - // This parameter is read in package.py - and nowhere else. Does not need to be part of HiveConf - that's just confusing. - final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1); - long containerSize = -1; - if (options.getSize() != -1) { - containerSize = options.getSize() / (1024 * 1024); - Preconditions.checkArgument(containerSize >= minAlloc, - "Container size (" + humanReadableByteCount(options.getSize()) + ") should be greater" + - " than minimum allocation(" + humanReadableByteCount(minAlloc * 1024L * 1024L) + ")"); - conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, String.valueOf(containerSize)); - } + conf.reloadConfiguration(); - if (options.getExecutors() != -1) { - conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors()); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, String.valueOf(options.getExecutors())); - // TODO: vcpu settings - possibly when DRFA works right - } + populateConfWithLlapProperties(conf, options.getConfig()); - if (options.getIoThreads() != -1) { - conf.setLong(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, options.getIoThreads()); - propsDirectOptions.setProperty(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, - String.valueOf(options.getIoThreads())); - } + if (options.getName() != null) { + // update service registry configs - caveat: this has nothing to do with the actual settings + // as read by the AM + // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between + // instances + conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + options.getName()); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, + "@" + options.getName()); + } - long cache = -1, xmx = -1; - if (options.getCache() != -1) { - cache = options.getCache(); - conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache)); - propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, - Long.toString(cache)); - } + if (options.getLogger() != null) { + HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, options.getLogger()); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, options.getLogger()); + } - if (options.getXmx() != -1) { - // Needs more explanation here - // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor fraction - // from this, to get actual usable memory before it goes into GC - xmx = options.getXmx(); - long xmxMb = (long)(xmx / (1024 * 1024)); - conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, - String.valueOf(xmxMb)); - } + if (options.getSize() != -1) { + if (options.getCache() != -1) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false) { + // direct heap allocations need to be safer + Preconditions.checkArgument(options.getCache() < options.getSize(), "Cache size (" + + humanReadableByteCount(options.getCache()) + ") has to be smaller" + + " than the container sizing (" + humanReadableByteCount(options.getSize()) + ")"); + } else if (options.getCache() < options.getSize()) { + LOG.warn("Note that this might need YARN physical memory monitoring to be turned off " + + "(yarn.nodemanager.pmem-check-enabled=false)"); + } + } + if (options.getXmx() != -1) { + Preconditions.checkArgument(options.getXmx() < options.getSize(), "Working memory (Xmx=" + + humanReadableByteCount(options.getXmx()) + ") has to be" + + " smaller than the container sizing (" + humanReadableByteCount(options.getSize()) + + ")"); + } + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT) + && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) { + // direct and not memory mapped + Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(), + "Working memory + cache (Xmx=" + humanReadableByteCount(options.getXmx()) + + " + cache=" + humanReadableByteCount(options.getCache()) + ")" + + " has to be smaller than the container sizing (" + + humanReadableByteCount(options.getSize()) + ")"); + } + } - if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) { - conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName()); - propsDirectOptions - .setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName()); - } + // This parameter is read in package.py - and nowhere else. Does not need to be part of + // HiveConf - that's just confusing. + final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1); + long containerSize = -1; + if (options.getSize() != -1) { + containerSize = options.getSize() / (1024 * 1024); + Preconditions.checkArgument(containerSize >= minAlloc, "Container size (" + + humanReadableByteCount(options.getSize()) + ") should be greater" + + " than minimum allocation(" + humanReadableByteCount(minAlloc * 1024L * 1024L) + ")"); + conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, + String.valueOf(containerSize)); + } - URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE); + if (options.getExecutors() != -1) { + conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors()); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, + String.valueOf(options.getExecutors())); + // TODO: vcpu settings - possibly when DRFA works right + } - if (null == logger) { - throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties"); - } + if (options.getIoThreads() != -1) { + conf.setLong(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, options.getIoThreads()); + propsDirectOptions.setProperty(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, + String.valueOf(options.getIoThreads())); + } - Path home = new Path(System.getenv("HIVE_HOME")); - Path scripts = new Path(new Path(new Path(home, "scripts"), "llap"), "bin"); + long cache = -1, xmx = -1; + if (options.getCache() != -1) { + cache = options.getCache(); + conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache)); + propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, + Long.toString(cache)); + } - if (!lfs.exists(home)) { - throw new Exception("Unable to find HIVE_HOME:" + home); - } else if (!lfs.exists(scripts)) { - LOG.warn("Unable to find llap scripts:" + scripts); - } + if (options.getXmx() != -1) { + // Needs more explanation here + // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor fraction + // from this, to get actual usable memory before it goes into GC + xmx = options.getXmx(); + long xmxMb = (long) (xmx / (1024 * 1024)); + conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, + String.valueOf(xmxMb)); + } + if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) { + conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName()); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, + options.getLlapQueueName()); + } - Path libDir = new Path(tmpDir, "lib"); - Path tezDir = new Path(libDir, "tez"); - Path udfDir = new Path(libDir, "udfs"); + final URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE); - String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS); - if (tezLibs == null) { - LOG.warn("Missing tez.lib.uris in tez-site.xml"); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Copying tez libs from " + tezLibs); - } - lfs.mkdirs(tezDir); - fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz")); - CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), tezDir.toString(), true); - lfs.delete(new Path(libDir, "tez.tar.gz"), false); - - Class[] dependencies = new Class[] { - LlapDaemonProtocolProtos.class, // llap-common - LlapTezUtils.class, // llap-tez - LlapInputFormat.class, // llap-server - HiveInputFormat.class, // hive-exec - SslSocketConnector.class, // hive-common (https deps) - RegistryUtils.ServiceRecordMarshal.class, // ZK registry - // log4j2 - com.lmax.disruptor.RingBuffer.class, // disruptor - org.apache.logging.log4j.Logger.class, // log4j-api - org.apache.logging.log4j.core.Appender.class, // log4j-core - org.apache.logging.slf4j.Log4jLogger.class, // log4j-slf4j - // log4j-1.2-API needed for NDC - org.apache.log4j.NDC.class, - }; - - for (Class c : dependencies) { - Path jarPath = new Path(Utilities.jarFinderGetJar(c)); - lfs.copyFromLocalFile(jarPath, libDir); - if (LOG.isDebugEnabled()) { - LOG.debug("Copying " + jarPath + " to " + libDir); + if (null == logger) { + throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties"); } - } - - // copy default aux classes (json/hbase) + Path home = new Path(System.getenv("HIVE_HOME")); + Path scripts = new Path(new Path(new Path(home, "scripts"), "llap"), "bin"); - for (String className : DEFAULT_AUX_CLASSES) { - localizeJarForClass(lfs, libDir, className, false); - } - Collection codecs = conf.getStringCollection("io.compression.codecs"); - if (codecs != null) { - for (String codecClassName : codecs) { - localizeJarForClass(lfs, libDir, codecClassName, false); + if (!lfs.exists(home)) { + throw new Exception("Unable to find HIVE_HOME:" + home); + } else if (!lfs.exists(scripts)) { + LOG.warn("Unable to find llap scripts:" + scripts); } - } - if (options.getIsHBase()) { - try { - localizeJarForClass(lfs, libDir, HBASE_SERDE_CLASS, true); - Job fakeJob = new Job(new JobConf()); // HBase API is convoluted. - TableMapReduceUtil.addDependencyJars(fakeJob); - Collection hbaseJars = fakeJob.getConfiguration().getStringCollection("tmpjars"); - for (String jarPath : hbaseJars) { - if (!jarPath.isEmpty()) { - lfs.copyFromLocalFile(new Path(jarPath), libDir); + final Path libDir = new Path(tmpDir, "lib"); + final Path tezDir = new Path(libDir, "tez"); + final Path udfDir = new Path(libDir, "udfs"); + final Path confPath = new Path(tmpDir, "conf"); + lfs.mkdirs(confPath); + + NamedCallable downloadTez = new NamedCallable("downloadTez") { + @Override + public Void call() throws Exception { + synchronized (fs) { + String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS); + if (tezLibs == null) { + LOG.warn("Missing tez.lib.uris in tez-site.xml"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Copying tez libs from " + tezLibs); + } + lfs.mkdirs(tezDir); + fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz")); + CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), tezDir.toString(), + true); + lfs.delete(new Path(libDir, "tez.tar.gz"), false); } + return null; } - } catch (Throwable t) { - String err = "Failed to add HBase jars. Use --auxhbase=false to avoid localizing them"; - LOG.error(err); - System.err.println(err); - throw new RuntimeException(t); - } - } - - String auxJars = options.getAuxJars(); - if (auxJars != null && !auxJars.isEmpty()) { - // TODO: transitive dependencies warning? - String[] jarPaths = auxJars.split(","); - for (String jarPath : jarPaths) { - if (!jarPath.isEmpty()) { - lfs.copyFromLocalFile(new Path(jarPath), libDir); + }; + + NamedCallable copyLocalJars = new NamedCallable("copyLocalJars") { + @Override + public Void call() throws Exception { + Class[] dependencies = new Class[] { LlapDaemonProtocolProtos.class, // llap-common + LlapTezUtils.class, // llap-tez + LlapInputFormat.class, // llap-server + HiveInputFormat.class, // hive-exec + SslSocketConnector.class, // hive-common (https deps) + RegistryUtils.ServiceRecordMarshal.class, // ZK registry + // log4j2 + com.lmax.disruptor.RingBuffer.class, // disruptor + org.apache.logging.log4j.Logger.class, // log4j-api + org.apache.logging.log4j.core.Appender.class, // log4j-core + org.apache.logging.slf4j.Log4jLogger.class, // log4j-slf4j + // log4j-1.2-API needed for NDC + org.apache.log4j.NDC.class, }; + + for (Class c : dependencies) { + Path jarPath = new Path(Utilities.jarFinderGetJar(c)); + lfs.copyFromLocalFile(jarPath, libDir); + if (LOG.isDebugEnabled()) { + LOG.debug("Copying " + jarPath + " to " + libDir); + } + } + return null; } - } - } + }; - // UDFs - final Set allowedUdfs; - - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) { - allowedUdfs = downloadPermanentFunctions(conf, udfDir); - } else { - allowedUdfs = Collections.emptySet(); - } + // copy default aux classes (json/hbase) - String java_home; - if (options.getJavaPath() == null || options.getJavaPath().isEmpty()) { - java_home = System.getenv("JAVA_HOME"); - String jre_home = System.getProperty("java.home"); - if (java_home == null) { - java_home = jre_home; - } else if (!java_home.equals(jre_home)) { - LOG.warn("Java versions might not match : JAVA_HOME=[{}],process jre=[{}]", - java_home, jre_home); - } - } else { - java_home = options.getJavaPath(); - } - if (java_home == null || java_home.isEmpty()) { - throw new RuntimeException( - "Could not determine JAVA_HOME from command line parameters, environment or system properties"); - } - LOG.info("Using [{}] for JAVA_HOME", java_home); + NamedCallable copyAuxJars = new NamedCallable("copyAuxJars") { + @Override + public Void call() throws Exception { + for (String className : DEFAULT_AUX_CLASSES) { + localizeJarForClass(lfs, libDir, className, false); + } + Collection codecs = conf.getStringCollection("io.compression.codecs"); + if (codecs != null) { + for (String codecClassName : codecs) { + localizeJarForClass(lfs, libDir, codecClassName, false); + } + } - Path confPath = new Path(tmpDir, "conf"); - lfs.mkdirs(confPath); + if (options.getIsHBase()) { + try { + localizeJarForClass(lfs, libDir, HBASE_SERDE_CLASS, true); + Job fakeJob = new Job(new JobConf()); // HBase API is convoluted. + TableMapReduceUtil.addDependencyJars(fakeJob); + Collection hbaseJars = + fakeJob.getConfiguration().getStringCollection("tmpjars"); + for (String jarPath : hbaseJars) { + if (!jarPath.isEmpty()) { + lfs.copyFromLocalFile(new Path(jarPath), libDir); + } + } + } catch (Throwable t) { + String err = + "Failed to add HBase jars. Use --auxhbase=false to avoid localizing them"; + LOG.error(err); + System.err.println(err); + throw new RuntimeException(t); + } + } - // Copy over the mandatory configs for the package. - for (String f : NEEDED_CONFIGS) { - copyConfig(lfs, confPath, f); - } - for (String f : OPTIONAL_CONFIGS) { - try { - copyConfig(lfs, confPath, f); - } catch (Throwable t) { - LOG.info("Error getting an optional config " + f + "; ignoring: " + t.getMessage()); - } - } - createLlapDaemonConfig(lfs, confPath, conf, propsDirectOptions, options.getConfig()); - - // logger can be a resource stream or a real file (cannot use copy) - InputStream loggerContent = logger.openStream(); - IOUtils.copyBytes(loggerContent, - lfs.create(new Path(confPath, "llap-daemon-log4j2.properties"), true), conf, true); - - String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE; - URL metrics2 = conf.getResource(metricsFile); - if (metrics2 == null) { - LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot be found." + - " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE); - metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE; - metrics2 = conf.getResource(metricsFile); - } - if (metrics2 != null) { - InputStream metrics2FileStream = metrics2.openStream(); - IOUtils.copyBytes(metrics2FileStream, lfs.create(new Path(confPath, metricsFile), true), - conf, true); - LOG.info("Copied hadoop metrics2 properties file from " + metrics2); - } else { - LOG.warn("Cannot find " + LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or " + - LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath."); - } + String auxJars = options.getAuxJars(); + if (auxJars != null && !auxJars.isEmpty()) { + // TODO: transitive dependencies warning? + String[] jarPaths = auxJars.split(","); + for (String jarPath : jarPaths) { + if (!jarPath.isEmpty()) { + lfs.copyFromLocalFile(new Path(jarPath), libDir); + } + } + } + return null; + } + }; + + NamedCallable copyUdfJars = new NamedCallable("copyUdfJars") { + @Override + public Void call() throws Exception { + // UDFs + final Set allowedUdfs; + + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) { + synchronized (fs) { + allowedUdfs = downloadPermanentFunctions(conf, udfDir); + } + } else { + allowedUdfs = Collections.emptySet(); + } - PrintWriter udfStream = - new PrintWriter(lfs.create(new Path(confPath, StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST))); - for (String udfClass : allowedUdfs) { - udfStream.println(udfClass); - } - - udfStream.close(); + PrintWriter udfStream = + new PrintWriter(lfs.create(new Path(confPath, + StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST))); + for (String udfClass : allowedUdfs) { + udfStream.println(udfClass); + } + + udfStream.close(); + return null; + } + }; + + String java_home; + if (options.getJavaPath() == null || options.getJavaPath().isEmpty()) { + java_home = System.getenv("JAVA_HOME"); + String jre_home = System.getProperty("java.home"); + if (java_home == null) { + java_home = jre_home; + } else if (!java_home.equals(jre_home)) { + LOG.warn("Java versions might not match : JAVA_HOME=[{}],process jre=[{}]", java_home, + jre_home); + } + } else { + java_home = options.getJavaPath(); + } + if (java_home == null || java_home.isEmpty()) { + throw new RuntimeException( + "Could not determine JAVA_HOME from command line parameters, environment or system properties"); + } + LOG.info("Using [{}] for JAVA_HOME", java_home); + + NamedCallable copyConfigs = new NamedCallable("copyConfigs") { + @Override + public Void call() throws Exception { + // Copy over the mandatory configs for the package. + for (String f : NEEDED_CONFIGS) { + copyConfig(lfs, confPath, f); + } + for (String f : OPTIONAL_CONFIGS) { + try { + copyConfig(lfs, confPath, f); + } catch (Throwable t) { + LOG.info("Error getting an optional config " + f + "; ignoring: " + t.getMessage()); + } + } + createLlapDaemonConfig(lfs, confPath, conf, propsDirectOptions, options.getConfig()); + + // logger can be a resource stream or a real file (cannot use copy) + InputStream loggerContent = logger.openStream(); + IOUtils.copyBytes(loggerContent, + lfs.create(new Path(confPath, "llap-daemon-log4j2.properties"), true), conf, true); + + String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE; + URL metrics2 = conf.getResource(metricsFile); + if (metrics2 == null) { + LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot be found." + + " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE); + metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE; + metrics2 = conf.getResource(metricsFile); + } + if (metrics2 != null) { + InputStream metrics2FileStream = metrics2.openStream(); + IOUtils.copyBytes(metrics2FileStream, + lfs.create(new Path(confPath, metricsFile), true), conf, true); + LOG.info("Copied hadoop metrics2 properties file from " + metrics2); + } else { + LOG.warn("Cannot find " + LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or " + + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath."); + } + return null; + } + }; + + @SuppressWarnings("unchecked") + final NamedCallable[] asyncWork = + new NamedCallable[] { + downloadTez, + copyUdfJars, + copyLocalJars, + copyAuxJars, + copyConfigs }; + @SuppressWarnings("unchecked") + final Future[] asyncResults = new Future[asyncWork.length]; + for (int i = 0; i < asyncWork.length; i++) { + asyncResults[i] = asyncRunner.submit(asyncWork[i]); + } - // extract configs for processing by the python fragments in Slider - JSONObject configs = new JSONObject(); + // extract configs for processing by the python fragments in Slider + JSONObject configs = new JSONObject(); - configs.put("java.home", java_home); + configs.put("java.home", java_home); - configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, HiveConf.getIntVar(conf, - ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB)); - configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize); + configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, + HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB)); + configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize); - configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, - HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE)); + configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, + HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE)); - configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname, - HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)); + configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname, + HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)); - configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, HiveConf.getIntVar(conf, - ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); + configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, + HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); - configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname, HiveConf.getIntVar(conf, - ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE)); + configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname, + HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE)); - configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, HiveConf.getIntVar(conf, - ConfVars.LLAP_DAEMON_NUM_EXECUTORS)); + configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, + HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS)); - // Let YARN pick the queue name, if it isn't provided in hive-site, or via the command-line - if (HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME) != null) { - configs.put(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, - HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME)); - } + // Let YARN pick the queue name, if it isn't provided in hive-site, or via the command-line + if (HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME) != null) { + configs.put(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, + HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME)); + } - // Propagate the cluster name to the script. - String clusterHosts = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); - if (!StringUtils.isEmpty(clusterHosts) && clusterHosts.startsWith("@") && - clusterHosts.length() > 1) { - configs.put(CONFIG_CLUSTER_NAME, clusterHosts.substring(1)); - } + // Propagate the cluster name to the script. + String clusterHosts = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); + if (!StringUtils.isEmpty(clusterHosts) && clusterHosts.startsWith("@") + && clusterHosts.length() > 1) { + configs.put(CONFIG_CLUSTER_NAME, clusterHosts.substring(1)); + } - configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1)); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1)); - configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1)); + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1)); - long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25) ? (long)(cache * 1.25) : -1; - configs.put("max_direct_memory", Long.toString(maxDirect)); + long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25) ? (long) (cache * 1.25) : -1; + configs.put("max_direct_memory", Long.toString(maxDirect)); - FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json")); - OutputStreamWriter w = new OutputStreamWriter(os); - configs.write(w); - w.close(); - os.close(); + FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json")); + OutputStreamWriter w = new OutputStreamWriter(os); + configs.write(w); + w.close(); + os.close(); - lfs.close(); - fs.close(); + if (LOG.isDebugEnabled()) { + LOG.debug("Config generation took " + (System.nanoTime() - t0) + " ns"); + } + for (int i = 0; i < asyncWork.length; i++) { + final long t1 = System.nanoTime(); + asyncResults[i].get(); + final long t2 = System.nanoTime(); + if (LOG.isDebugEnabled()) { + LOG.debug(asyncWork[i].getName() + " waited for " + (t2 - t1) + " ns"); + } + } + } finally { + executor.shutdown(); + lfs.close(); + fs.close(); + } if (LOG.isDebugEnabled()) { LOG.debug("Exiting successfully"); @@ -520,7 +610,12 @@ private void run(String[] args) throws Exception { private Set downloadPermanentFunctions(Configuration conf, Path udfDir) throws HiveException, URISyntaxException, IOException { Map udfs = new HashMap(); - Hive hive = Hive.get(false); + HiveConf hiveConf = new HiveConf(); + // disable expensive operations on the metastore + hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_INIT_METADATA_COUNT_ENABLED, false); + hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, false); + // performance problem: ObjectStore does its own new HiveConf() + Hive hive = Hive.getWithFastCheck(hiveConf, false); ResourceDownloader resourceDownloader = new ResourceDownloader(conf, udfDir.toUri().normalize().getPath()); List fns = hive.getAllFunctions();