diff --git a/bin/ext/llap.sh b/bin/ext/llap.sh index 91a54b3..3eb1573 100644 --- a/bin/ext/llap.sh +++ b/bin/ext/llap.sh @@ -18,7 +18,7 @@ export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} " llap () { TMPDIR=$(mktemp -d /tmp/staging-yarn-XXXXXX) - CLASS=org.apache.hadoop.hive.llap.cli.LlapServiceDriver; + CLASS=org.apache.hadoop.hive.llap.cli.service.LlapServiceDriver; if [ ! -f ${HIVE_LIB}/hive-cli-*.jar ]; then echo "Missing Hive CLI Jar" exit 3; @@ -44,7 +44,7 @@ llap () { } llap_help () { - CLASS=org.apache.hadoop.hive.llap.cli.LlapServiceDriver; + CLASS=org.apache.hadoop.hive.llap.cli.service.LlapServiceDriver; execHiveCmd $CLASS "--help" } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java deleted file mode 100644 index 2445075..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java +++ /dev/null @@ -1,382 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.cli; - -import com.google.common.base.Preconditions; -import jline.TerminalFactory; - -import java.io.IOException; -import java.util.Properties; - -import javax.annotation.Nonnull; - -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.log.LogHelpers; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.util.StringUtils; - -public class LlapOptionsProcessor { - - public static final String OPTION_INSTANCES = "instances"; //forward as arg - public static final String OPTION_NAME = "name"; // forward as arg - public static final String OPTION_DIRECTORY = "directory"; // work-dir - public static final String OPTION_EXECUTORS = "executors"; // llap-daemon-site - public static final String OPTION_CACHE = "cache"; // llap-daemon-site - public static final String OPTION_SIZE = "size"; // forward via config.json - public static final String OPTION_XMX = "xmx"; // forward as arg - public static final String OPTION_AUXJARS = "auxjars"; // used to localize jars - public static final String OPTION_AUXHIVE = "auxhive"; // used to localize jars - public static final String OPTION_AUXHBASE = "auxhbase"; // used to localize jars - public static final String OPTION_JAVA_HOME = "javaHome"; // forward via config.json - public static final String OPTION_HIVECONF = "hiveconf"; // llap-daemon-site if relevant parameter - public static final String OPTION_SERVICE_AM_CONTAINER_MB = "service-am-container-mb"; // forward as arg - public static final String OPTION_SERVICE_APPCONFIG_GLOBAL = "service-appconfig-global"; // forward as arg - public static final String OPTION_LLAP_QUEUE = "queue"; // forward via config.json - public static final String OPTION_IO_THREADS = "iothreads"; // llap-daemon-site - - // Options for the python script that are here because our option parser cannot ignore the unknown ones - public static final String OPTION_ARGS = "args"; // forward as arg - public static final String OPTION_LOGLEVEL = "loglevel"; // forward as arg - public static final String OPTION_LOGGER = "logger"; // forward as arg - public static final String OPTION_SERVICE_KEYTAB_DIR = "service-keytab-dir"; - public static final String OPTION_SERVICE_KEYTAB = "service-keytab"; - public static final String OPTION_SERVICE_PRINCIPAL = "service-principal"; - public static final String OPTION_SERVICE_PLACEMENT = "service-placement"; - public static final String OPTION_SERVICE_DEFAULT_KEYTAB = "service-default-keytab"; - public static final String OPTION_OUTPUT_DIR = "output"; - public static final String OPTION_START = "startImmediately"; - public static final String OPTION_HEALTH_PERCENT = "health-percent"; - public static final String OPTION_HEALTH_TIME_WINDOW_SECS = "health-time-window-secs"; - public static final String OPTION_HEALTH_INIT_DELAY_SECS = "health-init-delay-secs"; - - public static class LlapOptions { - private final int instances; - private final String directory; - private final String name; - private final int executors; - private final int ioThreads; - private final long cache; - private final long size; - private final long xmx; - private final String jars; - private final boolean isHbase; - private final Properties conf; - private final String javaPath; - private final String llapQueueName; - private final String logger; - private final boolean isStarting; - private final String output; - private final boolean isHiveAux; - - public LlapOptions(String name, int instances, String directory, int executors, int ioThreads, - long cache, long size, long xmx, String jars, boolean isHbase, - @Nonnull Properties hiveconf, String javaPath, String llapQueueName, String logger, - boolean isStarting, String output, boolean isHiveAux) throws ParseException { - if (instances <= 0) { - throw new ParseException("Invalid configuration: " + instances - + " (should be greater than 0)"); - } - this.instances = instances; - this.directory = directory; - this.name = name; - this.executors = executors; - this.ioThreads = ioThreads; - this.cache = cache; - this.size = size; - this.xmx = xmx; - this.jars = jars; - this.isHbase = isHbase; - this.isHiveAux = isHiveAux; - this.conf = hiveconf; - this.javaPath = javaPath; - this.llapQueueName = llapQueueName; - this.logger = logger; - this.isStarting = isStarting; - this.output = output; - } - - public String getOutput() { - return output; - } - - public String getName() { - return name; - } - - public int getInstances() { - return instances; - } - - public String getDirectory() { - return directory; - } - - public int getExecutors() { - return executors; - } - - public int getIoThreads() { - return ioThreads; - } - - public long getCache() { - return cache; - } - - public long getSize() { - return size; - } - - public long getXmx() { - return xmx; - } - - public String getAuxJars() { - return jars; - } - - public boolean getIsHBase() { - return isHbase; - } - - public boolean getIsHiveAux() { - return isHiveAux; - } - - public Properties getConfig() { - return conf; - } - - public String getJavaPath() { - return javaPath; - } - - public String getLlapQueueName() { - return llapQueueName; - } - - public String getLogger() { - return logger; - } - - public boolean isStarting() { - return isStarting; - } - } - - protected static final Logger l4j = LoggerFactory.getLogger(LlapOptionsProcessor.class.getName()); - private final Options options = new Options(); - private org.apache.commons.cli.CommandLine commandLine; - - @SuppressWarnings("static-access") - public LlapOptionsProcessor() { - - // set the number of instances on which llap should run - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_INSTANCES).withLongOpt(OPTION_INSTANCES) - .withDescription("Specify the number of instances to run this on").create('i')); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_NAME).withLongOpt(OPTION_NAME) - .withDescription("Cluster name for YARN registry").create('n')); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_DIRECTORY).withLongOpt(OPTION_DIRECTORY) - .withDescription("Temp directory for jars etc.").create('d')); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_ARGS).withLongOpt(OPTION_ARGS) - .withDescription("java arguments to the llap instance").create('a')); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_LOGLEVEL).withLongOpt(OPTION_LOGLEVEL) - .withDescription("log levels for the llap instance").create('l')); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_LOGGER).withLongOpt(OPTION_LOGGER) - .withDescription( - "logger for llap instance ([" + LogHelpers.LLAP_LOGGER_NAME_RFA + "], " + - LogHelpers.LLAP_LOGGER_NAME_QUERY_ROUTING + ", " + LogHelpers.LLAP_LOGGER_NAME_CONSOLE) - .create()); - - options.addOption(OptionBuilder.hasArg(false).withArgName(OPTION_SERVICE_DEFAULT_KEYTAB).withLongOpt(OPTION_SERVICE_DEFAULT_KEYTAB) - .withDescription("try to set default settings for Service AM keytab; mostly for dev testing").create()); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SERVICE_KEYTAB_DIR).withLongOpt(OPTION_SERVICE_KEYTAB_DIR) - .withDescription("Service AM keytab directory on HDFS (where the headless user keytab is stored by Service keytab installation, e.g. .yarn/keytabs/llap)").create()); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SERVICE_KEYTAB).withLongOpt(OPTION_SERVICE_KEYTAB) - .withDescription("Service AM keytab file name inside " + OPTION_SERVICE_KEYTAB_DIR).create()); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SERVICE_PRINCIPAL).withLongOpt(OPTION_SERVICE_PRINCIPAL) - .withDescription("Service AM principal; should be the user running the cluster, e.g. hive@EXAMPLE.COM").create()); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SERVICE_PLACEMENT).withLongOpt(OPTION_SERVICE_PLACEMENT) - .withDescription("Service placement policy; see YARN documentation at https://issues.apache.org/jira/browse/YARN-1042." - + " This is unnecessary if LLAP is going to take more than half of the YARN capacity of a node.").create()); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_HEALTH_PERCENT).withLongOpt(OPTION_HEALTH_PERCENT) - .withDescription("Percentage of running containers after which LLAP application is considered healthy" + - " (Default: 80)").create()); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_HEALTH_INIT_DELAY_SECS) - .withLongOpt(OPTION_HEALTH_INIT_DELAY_SECS) - .withDescription("Delay in seconds after which health percentage is monitored (Default: 400)").create()); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_HEALTH_TIME_WINDOW_SECS) - .withLongOpt(OPTION_HEALTH_TIME_WINDOW_SECS) - .withDescription("Time window in seconds (after initial delay) for which LLAP application is allowed to be in " + - "unhealthy state before being killed (Default: 300)").create()); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_EXECUTORS).withLongOpt(OPTION_EXECUTORS) - .withDescription("executor per instance").create('e')); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_CACHE).withLongOpt(OPTION_CACHE) - .withDescription("cache size per instance").create('c')); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SIZE).withLongOpt(OPTION_SIZE) - .withDescription("container size per instance").create('s')); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_XMX).withLongOpt(OPTION_XMX) - .withDescription("working memory size").create('w')); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_LLAP_QUEUE) - .withLongOpt(OPTION_LLAP_QUEUE) - .withDescription("The queue within which LLAP will be started").create('q')); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_OUTPUT_DIR) - .withLongOpt(OPTION_OUTPUT_DIR) - .withDescription("Output directory for the generated scripts").create()); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_AUXJARS).withLongOpt(OPTION_AUXJARS) - .withDescription("additional jars to package (by default, JSON SerDe jar is packaged" - + " if available)").create('j')); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_AUXHBASE).withLongOpt(OPTION_AUXHBASE) - .withDescription("whether to package the HBase jars (true by default)").create('h')); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_AUXHIVE).withLongOpt(OPTION_AUXHIVE) - .withDescription("whether to package the Hive aux jars (true by default)").create(OPTION_AUXHIVE)); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_JAVA_HOME).withLongOpt(OPTION_JAVA_HOME) - .withDescription( - "Path to the JRE/JDK. This should be installed at the same location on all cluster nodes ($JAVA_HOME, java.home by default)") - .create()); - - // -hiveconf x=y - options.addOption(OptionBuilder.withValueSeparator().hasArgs(2).withArgName("property=value") - .withLongOpt(OPTION_HIVECONF) - .withDescription("Use value for given property. Overridden by explicit parameters") - .create()); - - options.addOption(OptionBuilder.hasArg().withArgName("b") - .withLongOpt(OPTION_SERVICE_AM_CONTAINER_MB) - .withDescription("The size of the service AppMaster container in MB").create('b')); - - options.addOption(OptionBuilder.withValueSeparator().hasArgs(2).withArgName("property=value") - .withLongOpt(OPTION_SERVICE_APPCONFIG_GLOBAL) - .withDescription("Property (key=value) to be set in the global section of the Service appConfig") - .create()); - - options.addOption(OptionBuilder.hasArg().withArgName(OPTION_IO_THREADS) - .withLongOpt(OPTION_IO_THREADS).withDescription("executor per instance").create('t')); - - options.addOption(OptionBuilder.hasArg(false).withArgName(OPTION_START) - .withLongOpt(OPTION_START).withDescription("immediately start the cluster") - .create('z')); - - // [-H|--help] - options.addOption(new Option("H", "help", false, "Print help information")); - } - - private static long parseSuffixed(String value) { - return StringUtils.TraditionalBinaryPrefix.string2long(value); - } - - public LlapOptions processOptions(String argv[]) throws ParseException, IOException { - commandLine = new GnuParser().parse(options, argv); - if (commandLine.hasOption('H') || false == commandLine.hasOption("instances")) { - // needs at least --instances - printUsage(); - return null; - } - - int instances = Integer.parseInt(commandLine.getOptionValue(OPTION_INSTANCES)); - String directory = commandLine.getOptionValue(OPTION_DIRECTORY); - String jars = commandLine.getOptionValue(OPTION_AUXJARS); - - String name = commandLine.getOptionValue(OPTION_NAME, null); - - final int executors = Integer.parseInt(commandLine.getOptionValue(OPTION_EXECUTORS, "-1")); - final int ioThreads = Integer.parseInt( - commandLine.getOptionValue(OPTION_IO_THREADS, Integer.toString(executors))); - final long cache = parseSuffixed(commandLine.getOptionValue(OPTION_CACHE, "-1")); - final long size = parseSuffixed(commandLine.getOptionValue(OPTION_SIZE, "-1")); - final long xmx = parseSuffixed(commandLine.getOptionValue(OPTION_XMX, "-1")); - final boolean isHbase = Boolean.parseBoolean( - commandLine.getOptionValue(OPTION_AUXHBASE, "true")); - final boolean isHiveAux = Boolean.parseBoolean( - commandLine.getOptionValue(OPTION_AUXHIVE, "true")); - final boolean doStart = commandLine.hasOption(OPTION_START); - final String output = commandLine.getOptionValue(OPTION_OUTPUT_DIR, null); - - final String queueName = commandLine.getOptionValue(OPTION_LLAP_QUEUE, - HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.getDefaultValue()); - - final Properties hiveconf; - - if (commandLine.hasOption(OPTION_HIVECONF)) { - hiveconf = commandLine.getOptionProperties(OPTION_HIVECONF); - } else { - hiveconf = new Properties(); - } - - String javaHome = null; - if (commandLine.hasOption(OPTION_JAVA_HOME)) { - javaHome = commandLine.getOptionValue(OPTION_JAVA_HOME); - } - - String logger = null; - if (commandLine.hasOption(OPTION_LOGGER)) { - logger = commandLine.getOptionValue(OPTION_LOGGER); - Preconditions.checkArgument( - logger.equalsIgnoreCase(LogHelpers.LLAP_LOGGER_NAME_QUERY_ROUTING) || - logger.equalsIgnoreCase(LogHelpers.LLAP_LOGGER_NAME_CONSOLE) || - logger.equalsIgnoreCase(LogHelpers.LLAP_LOGGER_NAME_RFA)); - } - - // loglevel & args are parsed by the python processor - - return new LlapOptions(name, instances, directory, executors, ioThreads, cache, size, xmx, - jars, isHbase, hiveconf, javaHome, queueName, logger, doStart, output, isHiveAux); - } - - private void printUsage() { - HelpFormatter hf = new HelpFormatter(); - try { - int width = hf.getWidth(); - int jlineWidth = TerminalFactory.get().getWidth(); - width = Math.min(160, Math.max(jlineWidth, width)); // Ignore potentially incorrect values - hf.setWidth(width); - } catch (Throwable t) { // Ignore - } - hf.printHelp("llap", options); - } -} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java deleted file mode 100644 index ffdd340..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ /dev/null @@ -1,839 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.cli; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hive.common.CompressionUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.LlapUtil; -import org.apache.hadoop.hive.llap.cli.LlapOptionsProcessor.LlapOptions; -import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; -import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants; -import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; -import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat; -import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; -import org.apache.hadoop.hive.metastore.api.Function; -import org.apache.hadoop.hive.metastore.api.ResourceUri; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.io.HiveInputFormat; -import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.util.ResourceDownloader; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.registry.client.binding.RegistryUtils; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.tez.dag.api.TezConfiguration; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.eclipse.jetty.rewrite.handler.Rule; -import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -public class LlapServiceDriver { - protected static final Logger LOG = LoggerFactory.getLogger(LlapServiceDriver.class.getName()); - - private static final String[] - DEFAULT_AUX_CLASSES = - new String[] { "org.apache.hive.hcatalog.data.JsonSerDe", "org.apache.hadoop.hive.druid.DruidStorageHandler", - "org.apache.hive.storage.jdbc.JdbcStorageHandler", "org.apache.commons.dbcp.BasicDataSourceFactory", - "org.apache.commons.pool.impl.GenericObjectPool", "org.apache.hadoop.hive.kafka.KafkaStorageHandler" }; - private static final String HBASE_SERDE_CLASS = "org.apache.hadoop.hive.hbase.HBaseSerDe"; - private static final String[] NEEDED_CONFIGS = LlapDaemonConfiguration.DAEMON_CONFIGS; - private static final String[] OPTIONAL_CONFIGS = LlapDaemonConfiguration.SSL_DAEMON_CONFIGS; - private static final String OUTPUT_DIR_PREFIX = "llap-yarn-"; - - // This is not a config that users set in hive-site. It's only use is to share information - // between the java component of the service driver and the python component. - private static final String CONFIG_CLUSTER_NAME = "private.hive.llap.servicedriver.cluster.name"; - - /** - * This is a working configuration for the instance to merge various variables. - * It is not written out for llap server usage - */ - private final HiveConf conf; - - public LlapServiceDriver() { - SessionState ss = SessionState.get(); - conf = (ss != null) ? ss.getConf() : new HiveConf(SessionState.class); - } - - public static void main(String[] args) throws Exception { - LOG.info("LLAP service driver invoked with arguments={}", args); - int ret = 0; - try { - ret = new LlapServiceDriver().run(args); - } catch (Throwable t) { - System.err.println("Failed: " + t.getMessage()); - t.printStackTrace(); - ret = 3; - } finally { - LOG.info("LLAP service driver finished"); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Completed processing - exiting with " + ret); - } - System.exit(ret); - } - - - private static Configuration resolve(Configuration configured, Properties direct, - Properties hiveconf) { - Configuration conf = new Configuration(false); - - populateConf(configured, conf, hiveconf, "CLI hiveconf"); - populateConf(configured, conf, direct, "CLI direct"); - - return conf; - } - - private static void populateConf(Configuration configured, Configuration target, - Properties properties, String source) { - for (Entry entry : properties.entrySet()) { - String key = (String) entry.getKey(); - String val = configured.get(key); - if (val != null) { - target.set(key, val, source); - } - } - } - - static void populateConfWithLlapProperties(Configuration conf, Properties properties) { - for(Entry props : properties.entrySet()) { - String key = (String) props.getKey(); - if (HiveConf.getLlapDaemonConfVars().contains(key)) { - conf.set(key, (String) props.getValue()); - } else { - if (key.startsWith(HiveConf.PREFIX_LLAP) || key.startsWith(HiveConf.PREFIX_HIVE_LLAP)) { - LOG.warn("Adding key [{}] even though it is not in the set of known llap-server keys"); - conf.set(key, (String) props.getValue()); - } else { - LOG.warn("Ignoring unknown llap server parameter: [{}]", key); - } - } - } - } - - private static abstract class NamedCallable implements Callable { - public final String taskName; - public NamedCallable (String name) { - this.taskName = name; - } - public String getName() { - return taskName; - } - } - - private int run(String[] args) throws Exception { - LlapOptionsProcessor optionsProcessor = new LlapOptionsProcessor(); - final LlapOptions options = optionsProcessor.processOptions(args); - - final Properties propsDirectOptions = new Properties(); - - if (options == null) { - // help - return 1; - } - - // Working directory. - Path tmpDir = new Path(options.getDirectory()); - - if (conf == null) { - throw new Exception("Cannot load any configuration to run command"); - } - - final long t0 = System.nanoTime(); - - final FileSystem fs = FileSystem.get(conf); - final FileSystem lfs = FileSystem.getLocal(conf).getRawFileSystem(); - - int threadCount = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); - final ExecutorService executor = Executors.newFixedThreadPool(threadCount, - new ThreadFactoryBuilder().setNameFormat("llap-pkg-%d").build()); - final CompletionService asyncRunner = new ExecutorCompletionService(executor); - - int rc = 0; - try { - - // needed so that the file is actually loaded into configuration. - for (String f : NEEDED_CONFIGS) { - conf.addResource(f); - if (conf.getResource(f) == null) { - throw new Exception("Unable to find required config file: " + f); - } - } - for (String f : OPTIONAL_CONFIGS) { - conf.addResource(f); - } - - conf.reloadConfiguration(); - - populateConfWithLlapProperties(conf, options.getConfig()); - - if (options.getName() != null) { - // update service registry configs - caveat: this has nothing to do with the actual settings - // as read by the AM - // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between - // instances - conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + options.getName()); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, - "@" + options.getName()); - } - - if (options.getLogger() != null) { - HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, options.getLogger()); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, options.getLogger()); - } - boolean isDirect = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT); - - if (options.getSize() != -1) { - if (options.getCache() != -1) { - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false) { - // direct heap allocations need to be safer - Preconditions.checkArgument(options.getCache() < options.getSize(), "Cache size (" - + LlapUtil.humanReadableByteCount(options.getCache()) + ") has to be smaller" - + " than the container sizing (" + LlapUtil.humanReadableByteCount(options.getSize()) + ")"); - } else if (options.getCache() < options.getSize()) { - LOG.warn("Note that this might need YARN physical memory monitoring to be turned off " - + "(yarn.nodemanager.pmem-check-enabled=false)"); - } - } - if (options.getXmx() != -1) { - Preconditions.checkArgument(options.getXmx() < options.getSize(), "Working memory (Xmx=" - + LlapUtil.humanReadableByteCount(options.getXmx()) + ") has to be" - + " smaller than the container sizing (" + LlapUtil.humanReadableByteCount(options.getSize()) - + ")"); - } - if (isDirect && !HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) { - // direct and not memory mapped - Preconditions.checkArgument(options.getXmx() + options.getCache() <= options.getSize(), - "Working memory (Xmx=" + LlapUtil.humanReadableByteCount(options.getXmx()) + ") + cache size (" - + LlapUtil.humanReadableByteCount(options.getCache()) + ") has to be smaller than the container sizing (" - + LlapUtil.humanReadableByteCount(options.getSize()) + ")"); - } - } - - - if (options.getExecutors() != -1) { - conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors()); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, - String.valueOf(options.getExecutors())); - // TODO: vcpu settings - possibly when DRFA works right - } - - if (options.getIoThreads() != -1) { - conf.setLong(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, options.getIoThreads()); - propsDirectOptions.setProperty(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, - String.valueOf(options.getIoThreads())); - } - - long cache = -1, xmx = -1; - if (options.getCache() != -1) { - cache = options.getCache(); - conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache)); - propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, - Long.toString(cache)); - } - - if (options.getXmx() != -1) { - // Needs more explanation here - // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor fraction - // from this, to get actual usable memory before it goes into GC - xmx = options.getXmx(); - long xmxMb = (xmx / (1024L * 1024L)); - conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, - String.valueOf(xmxMb)); - } - - long size = options.getSize(); - if (size == -1) { - long heapSize = xmx; - if (!isDirect) { - heapSize += cache; - } - size = Math.min((long)(heapSize * 1.2), heapSize + 1024L*1024*1024); - if (isDirect) { - size += cache; - } - } - long containerSize = size / (1024 * 1024); - final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1); - Preconditions.checkArgument(containerSize >= minAlloc, "Container size (" - + LlapUtil.humanReadableByteCount(options.getSize()) + ") should be greater" - + " than minimum allocation(" + LlapUtil.humanReadableByteCount(minAlloc * 1024L * 1024L) + ")"); - conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, - String.valueOf(containerSize)); - - LOG.info("Memory settings: container memory: {} executor memory: {} cache memory: {}", - LlapUtil.humanReadableByteCount(options.getSize()), - LlapUtil.humanReadableByteCount(options.getXmx()), - LlapUtil.humanReadableByteCount(options.getCache())); - - if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) { - conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName()); - propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, - options.getLlapQueueName()); - } - - final URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE); - - if (null == logger) { - throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties"); - } - - Path home = new Path(System.getenv("HIVE_HOME")); - Path scriptParent = new Path(new Path(home, "scripts"), "llap"); - Path scripts = new Path(scriptParent, "bin"); - - if (!lfs.exists(home)) { - throw new Exception("Unable to find HIVE_HOME:" + home); - } else if (!lfs.exists(scripts)) { - LOG.warn("Unable to find llap scripts:" + scripts); - } - - final Path libDir = new Path(tmpDir, "lib"); - final Path tezDir = new Path(libDir, "tez"); - final Path udfDir = new Path(libDir, "udfs"); - final Path confPath = new Path(tmpDir, "conf"); - if (!lfs.mkdirs(confPath)) { - LOG.warn("mkdirs for " + confPath + " returned false"); - } - if (!lfs.mkdirs(tezDir)) { - LOG.warn("mkdirs for " + tezDir + " returned false"); - } - if (!lfs.mkdirs(udfDir)) { - LOG.warn("mkdirs for " + udfDir + " returned false"); - } - - NamedCallable downloadTez = new NamedCallable("downloadTez") { - @Override - public Void call() throws Exception { - synchronized (fs) { - String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS); - if (tezLibs == null) { - LOG.warn("Missing tez.lib.uris in tez-site.xml"); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Copying tez libs from " + tezLibs); - } - lfs.mkdirs(tezDir); - fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz")); - CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), tezDir.toString(), - true); - lfs.delete(new Path(libDir, "tez.tar.gz"), false); - } - return null; - } - }; - - NamedCallable copyLocalJars = new NamedCallable("copyLocalJars") { - @Override - public Void call() throws Exception { - Class[] dependencies = new Class[] { LlapDaemonProtocolProtos.class, // llap-common - LlapTezUtils.class, // llap-tez - LlapInputFormat.class, // llap-server - HiveInputFormat.class, // hive-exec - SslContextFactory.class, // hive-common (https deps) - Rule.class, // Jetty rewrite class - RegistryUtils.ServiceRecordMarshal.class, // ZK registry - // log4j2 - com.lmax.disruptor.RingBuffer.class, // disruptor - org.apache.logging.log4j.Logger.class, // log4j-api - org.apache.logging.log4j.core.Appender.class, // log4j-core - org.apache.logging.slf4j.Log4jLogger.class, // log4j-slf4j - // log4j-1.2-API needed for NDC - org.apache.log4j.config.Log4j1ConfigurationFactory.class, - io.netty.util.NetUtil.class, // netty4 - org.jboss.netty.util.NetUtil.class, //netty3 - org.apache.arrow.vector.types.pojo.ArrowType.class, //arrow-vector - org.apache.arrow.memory.BaseAllocator.class, //arrow-memory - org.apache.arrow.flatbuf.Schema.class, //arrow-format - com.google.flatbuffers.Table.class, //flatbuffers - com.carrotsearch.hppc.ByteArrayDeque.class //hppc - }; - - for (Class c : dependencies) { - Path jarPath = new Path(Utilities.jarFinderGetJar(c)); - lfs.copyFromLocalFile(jarPath, libDir); - if (LOG.isDebugEnabled()) { - LOG.debug("Copying " + jarPath + " to " + libDir); - } - } - return null; - } - }; - - // copy default aux classes (json/hbase) - - NamedCallable copyAuxJars = new NamedCallable("copyAuxJars") { - @Override - public Void call() throws Exception { - for (String className : DEFAULT_AUX_CLASSES) { - localizeJarForClass(lfs, libDir, className, false); - } - Collection codecs = conf.getStringCollection("io.compression.codecs"); - if (codecs != null) { - for (String codecClassName : codecs) { - localizeJarForClass(lfs, libDir, codecClassName, false); - } - } - for (String className : getDbSpecificJdbcJars()) { - localizeJarForClass(lfs, libDir, className, false); - } - if (options.getIsHBase()) { - try { - localizeJarForClass(lfs, libDir, HBASE_SERDE_CLASS, true); - Job fakeJob = new Job(new JobConf()); // HBase API is convoluted. - TableMapReduceUtil.addDependencyJars(fakeJob); - Collection hbaseJars = - fakeJob.getConfiguration().getStringCollection("tmpjars"); - for (String jarPath : hbaseJars) { - if (!jarPath.isEmpty()) { - lfs.copyFromLocalFile(new Path(jarPath), libDir); - } - } - } catch (Throwable t) { - String err = - "Failed to add HBase jars. Use --auxhbase=false to avoid localizing them"; - LOG.error(err); - System.err.println(err); - throw new RuntimeException(t); - } - } - - HashSet auxJars = new HashSet<>(); - // There are many ways to have AUX jars in Hive... sigh - if (options.getIsHiveAux()) { - // Note: we don't add ADDED jars, RELOADABLE jars, etc. That is by design; there are too many ways - // to add jars in Hive, some of which are session/etc. specific. Env + conf + arg should be enough. - addAuxJarsToSet(auxJars, conf.getAuxJars(), ","); - addAuxJarsToSet(auxJars, System.getenv("HIVE_AUX_JARS_PATH"), ":"); - LOG.info("Adding the following aux jars from the environment and configs: " + auxJars); - } - - addAuxJarsToSet(auxJars, options.getAuxJars(), ","); - for (String jarPath : auxJars) { - lfs.copyFromLocalFile(new Path(jarPath), libDir); - } - return null; - } - - private void addAuxJarsToSet(HashSet auxJarSet, String auxJars, String delimiter) { - if (auxJars != null && !auxJars.isEmpty()) { - // TODO: transitive dependencies warning? - String[] jarPaths = auxJars.split(delimiter); - for (String jarPath : jarPaths) { - if (!jarPath.isEmpty()) { - auxJarSet.add(jarPath); - } - } - } - } - }; - - NamedCallable copyUdfJars = new NamedCallable("copyUdfJars") { - @Override - public Void call() throws Exception { - // UDFs - final Set allowedUdfs; - - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) { - synchronized (fs) { - allowedUdfs = downloadPermanentFunctions(conf, udfDir); - } - } else { - allowedUdfs = Collections.emptySet(); - } - - PrintWriter udfStream = - new PrintWriter(lfs.create(new Path(confPath, - StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST))); - for (String udfClass : allowedUdfs) { - udfStream.println(udfClass); - } - - udfStream.close(); - return null; - } - }; - - String java_home; - if (options.getJavaPath() == null || options.getJavaPath().isEmpty()) { - java_home = System.getenv("JAVA_HOME"); - String jre_home = System.getProperty("java.home"); - if (java_home == null) { - java_home = jre_home; - } else if (!java_home.equals(jre_home)) { - LOG.warn("Java versions might not match : JAVA_HOME=[{}],process jre=[{}]", java_home, - jre_home); - } - } else { - java_home = options.getJavaPath(); - } - if (java_home == null || java_home.isEmpty()) { - throw new RuntimeException( - "Could not determine JAVA_HOME from command line parameters, environment or system properties"); - } - LOG.info("Using [{}] for JAVA_HOME", java_home); - - NamedCallable copyConfigs = new NamedCallable("copyConfigs") { - @Override - public Void call() throws Exception { - // Copy over the mandatory configs for the package. - for (String f : NEEDED_CONFIGS) { - copyConfig(lfs, confPath, f); - } - for (String f : OPTIONAL_CONFIGS) { - try { - copyConfig(lfs, confPath, f); - } catch (Throwable t) { - LOG.info("Error getting an optional config " + f + "; ignoring: " + t.getMessage()); - } - } - createLlapDaemonConfig(lfs, confPath, conf, propsDirectOptions, options.getConfig()); - setUpLogAndMetricConfigs(lfs, logger, confPath); - return null; - } - }; - - @SuppressWarnings("unchecked") - final NamedCallable[] asyncWork = - new NamedCallable[] { - downloadTez, - copyUdfJars, - copyLocalJars, - copyAuxJars, - copyConfigs }; - @SuppressWarnings("unchecked") - final Future[] asyncResults = new Future[asyncWork.length]; - for (int i = 0; i < asyncWork.length; i++) { - asyncResults[i] = asyncRunner.submit(asyncWork[i]); - } - - // TODO: need to move from Python to Java for the rest of the script. - JSONObject configs = createConfigJson(containerSize, cache, xmx, java_home); - writeConfigJson(tmpDir, lfs, configs); - - if (LOG.isDebugEnabled()) { - LOG.debug("Config generation took " + (System.nanoTime() - t0) + " ns"); - } - for (int i = 0; i < asyncWork.length; i++) { - final long t1 = System.nanoTime(); - asyncResults[i].get(); - final long t2 = System.nanoTime(); - if (LOG.isDebugEnabled()) { - LOG.debug(asyncWork[i].getName() + " waited for " + (t2 - t1) + " ns"); - } - } - if (options.isStarting()) { - String version = System.getenv("HIVE_VERSION"); - if (version == null || version.isEmpty()) { - version = DateTime.now().toString("ddMMMyyyy"); - } - - String outputDir = options.getOutput(); - Path packageDir = null; - if (outputDir == null) { - outputDir = OUTPUT_DIR_PREFIX + version; - packageDir = new Path(Paths.get(".").toAbsolutePath().toString(), - OUTPUT_DIR_PREFIX + version); - } else { - packageDir = new Path(outputDir); - } - rc = runPackagePy(args, tmpDir, scriptParent, version, outputDir); - if (rc == 0) { - LlapSliderUtils.startCluster(conf, options.getName(), - "llap-" + version + ".tar.gz", packageDir, - HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME)); - } - } else { - rc = 0; - } - } finally { - executor.shutdown(); - lfs.close(); - fs.close(); - } - - if (rc == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Exiting successfully"); - } - } else { - LOG.info("Exiting with rc = " + rc); - } - return rc; - } - - private int runPackagePy(String[] args, Path tmpDir, Path scriptParent, - String version, String outputDir) throws IOException, InterruptedException { - Path scriptPath = new Path(new Path(scriptParent, "yarn"), "package.py"); - List scriptArgs = new ArrayList<>(args.length + 7); - scriptArgs.add("python"); - scriptArgs.add(scriptPath.toString()); - scriptArgs.add("--input"); - scriptArgs.add(tmpDir.toString()); - scriptArgs.add("--output"); - scriptArgs.add(outputDir); - scriptArgs.add("--javaChild"); - for (String arg : args) { - scriptArgs.add(arg); - } - LOG.debug("Calling package.py via: " + scriptArgs); - ProcessBuilder builder = new ProcessBuilder(scriptArgs); - builder.redirectError(ProcessBuilder.Redirect.INHERIT); - builder.redirectOutput(ProcessBuilder.Redirect.INHERIT); - builder.environment().put("HIVE_VERSION", version); - return builder.start().waitFor(); - } - - private void writeConfigJson(Path tmpDir, final FileSystem lfs, - JSONObject configs) throws IOException, JSONException { - FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json")); - OutputStreamWriter w = new OutputStreamWriter(os); - configs.write(w); - w.close(); - os.close(); - } - - private JSONObject createConfigJson(long containerSize, long cache, long xmx, - String java_home) throws JSONException { - // extract configs for processing by the python fragments in YARN Service - JSONObject configs = new JSONObject(); - - configs.put("java.home", java_home); - - configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, - HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB)); - configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize); - - configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, - HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE)); - - configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname, - HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)); - - configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, - HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); - - configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname, - HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE)); - - configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, - HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS)); - - // Let YARN pick the queue name, if it isn't provided in hive-site, or via the command-line - if (HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME) != null) { - configs.put(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, - HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME)); - } - - // Propagate the cluster name to the script. - String clusterHosts = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); - if (!StringUtils.isEmpty(clusterHosts) && clusterHosts.startsWith("@") - && clusterHosts.length() > 1) { - configs.put(CONFIG_CLUSTER_NAME, clusterHosts.substring(1)); - } - - configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1)); - - configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1)); - - long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25) ? (long) (cache * 1.25) : -1; - configs.put("max_direct_memory", Long.toString(maxDirect)); - return configs; - } - - private Set downloadPermanentFunctions(Configuration conf, Path udfDir) throws HiveException, - URISyntaxException, IOException { - Map udfs = new HashMap(); - HiveConf hiveConf = new HiveConf(); - // disable expensive operations on the metastore - hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_INIT_METADATA_COUNT_ENABLED, false); - hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, false); - // performance problem: ObjectStore does its own new HiveConf() - Hive hive = Hive.getWithFastCheck(hiveConf, false); - ResourceDownloader resourceDownloader = - new ResourceDownloader(conf, udfDir.toUri().normalize().getPath()); - List fns = hive.getAllFunctions(); - Set srcUris = new HashSet<>(); - for (Function fn : fns) { - String fqfn = fn.getDbName() + "." + fn.getFunctionName(); - if (udfs.containsKey(fn.getClassName())) { - LOG.warn("Duplicate function names found for " + fn.getClassName() + " with " + fqfn - + " and " + udfs.get(fn.getClassName())); - } - udfs.put(fn.getClassName(), fqfn); - List resources = fn.getResourceUris(); - if (resources == null || resources.isEmpty()) { - LOG.warn("Missing resources for " + fqfn); - continue; - } - for (ResourceUri resource : resources) { - srcUris.add(ResourceDownloader.createURI(resource.getUri())); - } - } - for (URI srcUri : srcUris) { - List localUris = resourceDownloader.downloadExternal(srcUri, null, false); - for(URI dst : localUris) { - LOG.warn("Downloaded " + dst + " from " + srcUri); - } - } - return udfs.keySet(); - } - - private void addJarForClassToListIfExists(String cls, List jarList) { - try { - Class.forName(cls); - jarList.add(cls); - } catch (Exception e) { - } - } - private List getDbSpecificJdbcJars() { - List jdbcJars = new ArrayList(); - addJarForClassToListIfExists("com.mysql.jdbc.Driver", jdbcJars); // add mysql jdbc driver - addJarForClassToListIfExists("org.postgresql.Driver", jdbcJars); // add postgresql jdbc driver - addJarForClassToListIfExists("oracle.jdbc.OracleDriver", jdbcJars); // add oracle jdbc driver - addJarForClassToListIfExists("com.microsoft.sqlserver.jdbc.SQLServerDriver", jdbcJars); // add mssql jdbc driver - return jdbcJars; - } - - private void localizeJarForClass(FileSystem lfs, Path libDir, String className, boolean doThrow) - throws IOException { - String jarPath = null; - boolean hasException = false; - try { - Class auxClass = Class.forName(className); - jarPath = Utilities.jarFinderGetJar(auxClass); - } catch (Throwable t) { - if (doThrow) { - throw (t instanceof IOException) ? (IOException)t : new IOException(t); - } - hasException = true; - String err = "Cannot find a jar for [" + className + "] due to an exception (" - + t.getMessage() + "); not packaging the jar"; - LOG.error(err); - System.err.println(err); - } - if (jarPath != null) { - lfs.copyFromLocalFile(new Path(jarPath), libDir); - } else if (!hasException) { - String err = "Cannot find a jar for [" + className + "]; not packaging the jar"; - if (doThrow) { - throw new IOException(err); - } - LOG.error(err); - System.err.println(err); - } - } - - /** - * - * @param lfs filesystem on which file will be generated - * @param confPath path wher the config will be generated - * @param configured the base configuration instances - * @param direct properties specified directly - i.e. using the properties exact option - * @param hiveconf properties specifried via --hiveconf - * @throws IOException - */ - private void createLlapDaemonConfig(FileSystem lfs, Path confPath, Configuration configured, - Properties direct, Properties hiveconf) throws IOException { - FSDataOutputStream confStream = - lfs.create(new Path(confPath, LlapDaemonConfiguration.LLAP_DAEMON_SITE)); - - Configuration llapDaemonConf = resolve(configured, direct, hiveconf); - - llapDaemonConf.writeXml(confStream); - confStream.close(); - } - - private void copyConfig(FileSystem lfs, Path confPath, String f) throws IOException { - HiveConf.getBoolVar(new Configuration(false), ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS); - // they will be file:// URLs - lfs.copyFromLocalFile(new Path(conf.getResource(f).toString()), confPath); - } - - private void setUpLogAndMetricConfigs(final FileSystem lfs, final URL logger, - final Path confPath) throws IOException { - // logger can be a resource stream or a real file (cannot use copy) - InputStream loggerContent = logger.openStream(); - IOUtils.copyBytes(loggerContent, - lfs.create(new Path(confPath, "llap-daemon-log4j2.properties"), true), conf, true); - - String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE; - URL metrics2 = conf.getResource(metricsFile); - if (metrics2 == null) { - LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot be found." - + " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE); - metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE; - metrics2 = conf.getResource(metricsFile); - } - if (metrics2 != null) { - InputStream metrics2FileStream = metrics2.openStream(); - IOUtils.copyBytes(metrics2FileStream, - lfs.create(new Path(confPath, metricsFile), true), conf, true); - LOG.info("Copied hadoop metrics2 properties file from " + metrics2); - } else { - LOG.warn("Cannot find " + LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or " - + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath."); - } - } -} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java index bdec1c1..a9a216c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java @@ -18,102 +18,14 @@ package org.apache.hadoop.hive.llap.cli; -import java.io.File; -import java.io.IOException; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.client.ServiceClient; -import org.apache.hadoop.yarn.service.utils.CoreFileSystem; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class LlapSliderUtils { - private static final Logger LOG = LoggerFactory.getLogger(LlapSliderUtils.class); - private static final String LLAP_PACKAGE_DIR = ".yarn/package/LLAP/"; - public static ServiceClient createServiceClient(Configuration conf) throws Exception { ServiceClient serviceClient = new ServiceClient(); serviceClient.init(conf); serviceClient.start(); return serviceClient; } - - public static Service getService(Configuration conf, String name) { - LOG.info("Get service details for " + name); - ServiceClient sc; - try { - sc = createServiceClient(conf); - } catch (Exception e) { - throw new RuntimeException(e); - } - Service service = null; - try { - service = sc.getStatus(name); - } catch (YarnException | IOException e) { - // Probably the app does not exist - LOG.info(e.getLocalizedMessage()); - throw new RuntimeException(e); - } finally { - try { - sc.close(); - } catch (IOException e) { - LOG.info("Failed to close service client", e); - } - } - return service; - } - - public static void startCluster(Configuration conf, String name, String packageName, Path packageDir, String queue) { - LOG.info("Starting cluster with " + name + ", " + packageName + ", " + queue + ", " + packageDir); - ServiceClient sc; - try { - sc = createServiceClient(conf); - } catch (Exception e) { - throw new RuntimeException(e); - } - try { - try { - LOG.info("Executing the stop command"); - sc.actionStop(name, true); - } catch (Exception ex) { - // Ignore exceptions from stop - LOG.info(ex.getLocalizedMessage()); - } - try { - LOG.info("Executing the destroy command"); - sc.actionDestroy(name); - } catch (Exception ex) { - // Ignore exceptions from destroy - LOG.info(ex.getLocalizedMessage()); - } - LOG.info("Uploading the app tarball"); - CoreFileSystem fs = new CoreFileSystem(conf); - fs.createWithPermissions(new Path(LLAP_PACKAGE_DIR), - FsPermission.getDirDefault()); - fs.copyLocalFileToHdfs(new File(packageDir.toString(), packageName), - new Path(LLAP_PACKAGE_DIR), new FsPermission("755")); - - LOG.info("Executing the launch command"); - File yarnfile = new File(new Path(packageDir, "Yarnfile").toString()); - Long lifetime = null; // unlimited lifetime - try { - sc.actionLaunch(yarnfile.getAbsolutePath(), name, lifetime, queue); - } finally { - } - LOG.debug("Started the cluster via service API"); - } catch (YarnException | IOException e) { - throw new RuntimeException(e); - } finally { - try { - sc.close(); - } catch (IOException e) { - LOG.info("Failed to close service client", e); - } - } - } - } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java new file mode 100644 index 0000000..7b2e32b --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Copy auxiliary jars for the tarball. */ +class AsyncTaskCopyAuxJars implements Callable { + private static final Logger LOG = LoggerFactory.getLogger(AsyncTaskCopyAuxJars.class.getName()); + + private static final String[] DEFAULT_AUX_CLASSES = + new String[] {"org.apache.hive.hcatalog.data.JsonSerDe", "org.apache.hadoop.hive.druid.DruidStorageHandler", + "org.apache.hive.storage.jdbc.JdbcStorageHandler", "org.apache.commons.dbcp.BasicDataSourceFactory", + "org.apache.commons.pool.impl.GenericObjectPool", "org.apache.hadoop.hive.kafka.KafkaStorageHandler"}; + private static final String HBASE_SERDE_CLASS = "org.apache.hadoop.hive.hbase.HBaseSerDe"; + + private final LlapServiceCommandLine cl; + private final HiveConf conf; + private final FileSystem rawFs; + private final Path libDir; + + AsyncTaskCopyAuxJars(LlapServiceCommandLine cl, HiveConf conf, FileSystem rawFs, Path libDir) { + this.cl = cl; + this.conf = conf; + this.rawFs = rawFs; + this.libDir = libDir; + } + + @Override + public Void call() throws Exception { + localizeJarForClass(Arrays.asList(DEFAULT_AUX_CLASSES), false); + localizeJarForClass(conf.getStringCollection("io.compression.codecs"), false); + localizeJarForClass(getDbSpecificJdbcJars(), false); + + if (cl.getIsHBase()) { + try { + localizeJarForClass(Arrays.asList(HBASE_SERDE_CLASS), true); + Job fakeJob = Job.getInstance(new JobConf()); // HBase API is convoluted. + TableMapReduceUtil.addDependencyJars(fakeJob); + Collection hbaseJars = fakeJob.getConfiguration().getStringCollection("tmpjars"); + for (String jarPath : hbaseJars) { + if (!jarPath.isEmpty()) { + rawFs.copyFromLocalFile(new Path(jarPath), libDir); + } + } + } catch (Throwable t) { + String err = "Failed to add HBase jars. Use --auxhbase=false to avoid localizing them"; + LOG.error(err); + System.err.println(err); + throw new RuntimeException(t); + } + } + + Set auxJars = new HashSet<>(); + // There are many ways to have AUX jars in Hive... sigh + if (cl.getIsHiveAux()) { + // Note: we don't add ADDED jars, RELOADABLE jars, etc. That is by design; there are too many ways + // to add jars in Hive, some of which are session/etc. specific. Env + conf + arg should be enough. + addAuxJarsToSet(auxJars, conf.getAuxJars(), ","); + addAuxJarsToSet(auxJars, System.getenv("HIVE_AUX_JARS_PATH"), ":"); + LOG.info("Adding the following aux jars from the environment and configs: " + auxJars); + } + + addAuxJarsToSet(auxJars, cl.getAuxJars(), ","); + for (String jarPath : auxJars) { + rawFs.copyFromLocalFile(new Path(jarPath), libDir); + } + return null; + } + + private void localizeJarForClass(Collection classNames, boolean doThrow) throws IOException { + if (CollectionUtils.isEmpty(classNames)) { + return; + } + + for (String className : classNames) { + String jarPath = null; + boolean hasException = false; + try { + Class clazz = Class.forName(className); + jarPath = Utilities.jarFinderGetJar(clazz); + } catch (Throwable t) { + if (doThrow) { + throw (t instanceof IOException) ? (IOException)t : new IOException(t); + } + hasException = true; + String err = "Cannot find a jar for [" + className + "] due to an exception (" + + t.getMessage() + "); not packaging the jar"; + LOG.error(err); + System.err.println(err); + } + + if (jarPath != null) { + rawFs.copyFromLocalFile(new Path(jarPath), libDir); + } else if (!hasException) { + String err = "Cannot find a jar for [" + className + "]; not packaging the jar"; + if (doThrow) { + throw new IOException(err); + } + LOG.error(err); + System.err.println(err); + } + } + } + + private List getDbSpecificJdbcJars() { + List jdbcJars = new ArrayList(); + addJarForClassToListIfExists("com.mysql.jdbc.Driver", jdbcJars); // add mysql jdbc driver + addJarForClassToListIfExists("org.postgresql.Driver", jdbcJars); // add postgresql jdbc driver + addJarForClassToListIfExists("oracle.jdbc.OracleDriver", jdbcJars); // add oracle jdbc driver + addJarForClassToListIfExists("com.microsoft.sqlserver.jdbc.SQLServerDriver", jdbcJars); // add mssql jdbc driver + return jdbcJars; + } + + private void addJarForClassToListIfExists(String cls, List jarList) { + try { + Class.forName(cls); + jarList.add(cls); + } catch (Exception e) { + } + } + + private void addAuxJarsToSet(Set auxJarSet, String auxJars, String delimiter) { + if (StringUtils.isNotEmpty(auxJars)) { + // TODO: transitive dependencies warning? + String[] jarPaths = auxJars.split(delimiter); + for (String jarPath : jarPaths) { + if (!jarPath.isEmpty()) { + auxJarSet.add(jarPath); + } + } + } + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyConfigs.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyConfigs.java new file mode 100644 index 0000000..9d5b385 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyConfigs.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli.service; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Properties; +import java.util.Map.Entry; +import java.util.concurrent.Callable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants; +import org.apache.hadoop.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Copy config files for the tarball. */ +class AsyncTaskCopyConfigs implements Callable { + private static final Logger LOG = LoggerFactory.getLogger(AsyncTaskCopyConfigs.class.getName()); + + private final LlapServiceCommandLine cl; + private final HiveConf conf; + private final Properties directProperties; + private final FileSystem rawFs; + private final Path confDir; + + AsyncTaskCopyConfigs(LlapServiceCommandLine cl, HiveConf conf, Properties directProperties, FileSystem rawFs, + Path confDir) { + this.cl = cl; + this.conf = conf; + this.directProperties = directProperties; + this.rawFs = rawFs; + this.confDir = confDir; + } + + @Override + public Void call() throws Exception { + // Copy over the mandatory configs for the package. + for (String f : LlapDaemonConfiguration.DAEMON_CONFIGS) { + copyConfig(f); + } + for (String f : LlapDaemonConfiguration.SSL_DAEMON_CONFIGS) { + try { + copyConfig(f); + } catch (Throwable t) { + LOG.info("Error getting an optional config " + f + "; ignoring: " + t.getMessage()); + } + } + createLlapDaemonConfig(); + setUpLoggerConfig(); + setUpMetricsConfig(); + return null; + } + + private void copyConfig(String f) throws IOException { + HiveConf.getBoolVar(new Configuration(false), ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS); + // they will be file:// URLs + rawFs.copyFromLocalFile(new Path(conf.getResource(f).toString()), confDir); + } + + private void createLlapDaemonConfig() throws IOException { + FSDataOutputStream confStream = rawFs.create(new Path(confDir, LlapDaemonConfiguration.LLAP_DAEMON_SITE)); + + Configuration llapDaemonConf = resolve(); + + llapDaemonConf.writeXml(confStream); + confStream.close(); + } + + private Configuration resolve() { + Configuration target = new Configuration(false); + + populateConf(target, cl.getConfig(), "CLI hiveconf"); + populateConf(target, directProperties, "CLI direct"); + + return target; + } + + private void populateConf(Configuration target, Properties properties, String source) { + for (Entry entry : properties.entrySet()) { + String key = (String) entry.getKey(); + String val = conf.get(key); + if (val != null) { + target.set(key, val, source); + } + } + } + + private void setUpLoggerConfig() throws Exception { + // logger can be a resource stream or a real file (cannot use copy) + URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE); + if (null == logger) { + throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties"); + } + InputStream loggerContent = logger.openStream(); + IOUtils.copyBytes(loggerContent, + rawFs.create(new Path(confDir, "llap-daemon-log4j2.properties"), true), conf, true); + } + + private void setUpMetricsConfig() throws IOException { + String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE; + URL metrics2 = conf.getResource(metricsFile); + if (metrics2 == null) { + LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot be found." + + " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE); + metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE; + metrics2 = conf.getResource(metricsFile); + } + if (metrics2 != null) { + InputStream metrics2FileStream = metrics2.openStream(); + IOUtils.copyBytes(metrics2FileStream, rawFs.create(new Path(confDir, metricsFile), true), conf, true); + LOG.info("Copied hadoop metrics2 properties file from " + metrics2); + } else { + LOG.warn("Cannot find " + LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or " + + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath."); + } + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java new file mode 100644 index 0000000..90f9b2c --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli.service; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.eclipse.jetty.rewrite.handler.Rule; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Copy local jars for the tarball. */ +class AsyncTaskCopyLocalJars implements Callable { + private static final Logger LOG = LoggerFactory.getLogger(AsyncTaskCopyLocalJars.class.getName()); + + private final FileSystem rawFs; + private final Path libDir; + + AsyncTaskCopyLocalJars(FileSystem rawFs, Path libDir) { + this.rawFs = rawFs; + this.libDir = libDir; + } + + @Override + public Void call() throws Exception { + Class[] dependencies = new Class[] { + LlapDaemonProtocolProtos.class, // llap-common + LlapTezUtils.class, // llap-tez + LlapInputFormat.class, // llap-server + HiveInputFormat.class, // hive-exec + SslContextFactory.class, // hive-common (https deps) + Rule.class, // Jetty rewrite class + RegistryUtils.ServiceRecordMarshal.class, // ZK registry + // log4j2 + com.lmax.disruptor.RingBuffer.class, // disruptor + org.apache.logging.log4j.Logger.class, // log4j-api + org.apache.logging.log4j.core.Appender.class, // log4j-core + org.apache.logging.slf4j.Log4jLogger.class, // log4j-slf4j + // log4j-1.2-API needed for NDC + org.apache.log4j.config.Log4j1ConfigurationFactory.class, + io.netty.util.NetUtil.class, // netty4 + org.jboss.netty.util.NetUtil.class, //netty3 + org.apache.arrow.vector.types.pojo.ArrowType.class, //arrow-vector + org.apache.arrow.memory.BaseAllocator.class, //arrow-memory + org.apache.arrow.flatbuf.Schema.class, //arrow-format + com.google.flatbuffers.Table.class, //flatbuffers + com.carrotsearch.hppc.ByteArrayDeque.class //hppc + }; + + for (Class c : dependencies) { + Path jarPath = new Path(Utilities.jarFinderGetJar(c)); + rawFs.copyFromLocalFile(jarPath, libDir); + LOG.debug("Copying " + jarPath + " to " + libDir); + } + return null; + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCreateUdfFile.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCreateUdfFile.java new file mode 100644 index 0000000..430471e --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCreateUdfFile.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli.service; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.ResourceUri; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.util.ResourceDownloader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Create the list of allowed UDFs for the tarball. */ +class AsyncTaskCreateUdfFile implements Callable { + private static final Logger LOG = LoggerFactory.getLogger(AsyncTaskCreateUdfFile.class.getName()); + + private final HiveConf conf; + private final FileSystem fs; + private final FileSystem rawFs; + private final Path udfDir; + private final Path confDir; + + AsyncTaskCreateUdfFile(HiveConf conf, FileSystem fs, FileSystem rawFs, Path udfDir, Path confDir) { + this.conf = conf; + this.fs = fs; + this.rawFs = rawFs; + this.udfDir = udfDir; + this.confDir = confDir; + } + + @Override + public Void call() throws Exception { + // UDFs + final Set allowedUdfs; + + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) { + synchronized (fs) { + allowedUdfs = downloadPermanentFunctions(); + } + } else { + allowedUdfs = Collections.emptySet(); + } + + OutputStream os = rawFs.create(new Path(confDir, StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST)); + OutputStreamWriter osw = new OutputStreamWriter(os, Charset.defaultCharset()); + PrintWriter udfStream = new PrintWriter(osw); + for (String udfClass : allowedUdfs) { + udfStream.println(udfClass); + } + + udfStream.close(); + return null; + } + + private Set downloadPermanentFunctions() throws HiveException, URISyntaxException, IOException { + Map udfs = new HashMap(); + HiveConf hiveConf = new HiveConf(); + // disable expensive operations on the metastore + hiveConf.setBoolean(MetastoreConf.ConfVars.INIT_METADATA_COUNT_ENABLED.getVarname(), false); + hiveConf.setBoolean(MetastoreConf.ConfVars.METRICS_ENABLED.getVarname(), false); + // performance problem: ObjectStore does its own new HiveConf() + Hive hive = Hive.getWithFastCheck(hiveConf, false); + ResourceDownloader resourceDownloader = new ResourceDownloader(conf, udfDir.toUri().normalize().getPath()); + List fns = hive.getAllFunctions(); + Set srcUris = new HashSet<>(); + for (Function fn : fns) { + String fqfn = fn.getDbName() + "." + fn.getFunctionName(); + if (udfs.containsKey(fn.getClassName())) { + LOG.warn("Duplicate function names found for " + fn.getClassName() + " with " + fqfn + " and " + + udfs.get(fn.getClassName())); + } + udfs.put(fn.getClassName(), fqfn); + List resources = fn.getResourceUris(); + if (resources == null || resources.isEmpty()) { + LOG.warn("Missing resources for " + fqfn); + continue; + } + for (ResourceUri resource : resources) { + srcUris.add(ResourceDownloader.createURI(resource.getUri())); + } + } + for (URI srcUri : srcUris) { + List localUris = resourceDownloader.downloadExternal(srcUri, null, false); + for(URI dst : localUris) { + LOG.warn("Downloaded " + dst + " from " + srcUri); + } + } + return udfs.keySet(); + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskDownloadTezJars.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskDownloadTezJars.java new file mode 100644 index 0000000..29b05a6 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskDownloadTezJars.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli.service; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.CompressionUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.tez.dag.api.TezConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Download tez related jars for the tarball. */ +class AsyncTaskDownloadTezJars implements Callable { + private static final Logger LOG = LoggerFactory.getLogger(AsyncTaskDownloadTezJars.class.getName()); + + private final HiveConf conf; + private final FileSystem fs; + private final FileSystem rawFs; + private final Path libDir; + private final Path tezDir; + + AsyncTaskDownloadTezJars(HiveConf conf, FileSystem fs, FileSystem rawFs, Path libDir, Path tezDir) { + this.conf = conf; + this.fs = fs; + this.rawFs = rawFs; + this.libDir = libDir; + this.tezDir = tezDir; + } + + @Override + public Void call() throws Exception { + synchronized (fs) { + String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS); + if (tezLibs == null) { + LOG.warn("Missing tez.lib.uris in tez-site.xml"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Copying tez libs from " + tezLibs); + } + rawFs.mkdirs(tezDir); + fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz")); + CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), tezDir.toString(), true); + rawFs.delete(new Path(libDir, "tez.tar.gz"), false); + } + return null; + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapConfigJsonCreator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapConfigJsonCreator.java new file mode 100644 index 0000000..8e9b939 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapConfigJsonCreator.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli.service; + +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +/** + * Creates the config json for llap start. + */ +class LlapConfigJsonCreator { + // This is not a config that users set in hive-site. It's only use is to share information + // between the java component of the service driver and the python component. + private static final String CONFIG_CLUSTER_NAME = "private.hive.llap.servicedriver.cluster.name"; + + private final HiveConf conf; + private final FileSystem fs; + private final Path tmpDir; + + private final long cache; + private final long xmx; + private final String javaHome; + + LlapConfigJsonCreator(HiveConf conf, FileSystem fs, Path tmpDir, long cache, long xmx, String javaHome) { + this.conf = conf; + this.fs = fs; + this.tmpDir = tmpDir; + this.cache = cache; + this.xmx = xmx; + this.javaHome = javaHome; + } + + void createLlapConfigJson() throws Exception { + JSONObject configs = createConfigJson(); + writeConfigJson(configs); + } + + private JSONObject createConfigJson() throws JSONException { + // extract configs for processing by the python fragments in YARN Service + JSONObject configs = new JSONObject(); + + configs.put("java.home", javaHome); + + configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, + conf.getLongVar(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB)); + + configs.put(ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, conf.getSizeVar(ConfVars.LLAP_IO_MEMORY_MAX_SIZE)); + + configs.put(ConfVars.LLAP_ALLOCATOR_DIRECT.varname, conf.getBoolVar(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)); + + configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, + conf.getIntVar(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); + + configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname, + conf.getIntVar(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE)); + + configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS)); + + // Let YARN pick the queue name, if it isn't provided in hive-site, or via the command-line + if (conf.getVar(ConfVars.LLAP_DAEMON_QUEUE_NAME) != null) { + configs.put(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, conf.getVar(ConfVars.LLAP_DAEMON_QUEUE_NAME)); + } + + // Propagate the cluster name to the script. + String clusterHosts = conf.getVar(ConfVars.LLAP_DAEMON_SERVICE_HOSTS); + if (!StringUtils.isEmpty(clusterHosts) && clusterHosts.startsWith("@") && clusterHosts.length() > 1) { + configs.put(CONFIG_CLUSTER_NAME, clusterHosts.substring(1)); + } + + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1)); + + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1)); + + long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25) ? (long) (cache * 1.25) : -1; + configs.put("max_direct_memory", Long.toString(maxDirect)); + + return configs; + } + + private void writeConfigJson(JSONObject configs) throws Exception { + try (FSDataOutputStream fsdos = fs.create(new Path(tmpDir, "config.json")); + OutputStreamWriter w = new OutputStreamWriter(fsdos, Charset.defaultCharset())) { + configs.write(w); + } + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceCommandLine.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceCommandLine.java new file mode 100644 index 0000000..5323102 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceCommandLine.java @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli.service; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + +import jline.TerminalFactory; + +import java.util.Arrays; +import java.util.Properties; +import java.util.Set; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.log.LogHelpers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; + +@SuppressWarnings("static-access") +class LlapServiceCommandLine { + private static final Logger LOG = LoggerFactory.getLogger(LlapServiceCommandLine.class.getName()); + + private static final Option DIRECTORY = OptionBuilder + .withLongOpt("directory") + .withDescription("Temp directory for jars etc.") + .withArgName("directory") + .hasArg() + .create('d'); + + private static final Option NAME = OptionBuilder + .withLongOpt("name") + .withDescription("Cluster name for YARN registry") + .withArgName("name") + .hasArg() + .create('n'); + + private static final Option EXECUTORS = OptionBuilder + .withLongOpt("executors") + .withDescription("executor per instance") + .withArgName("executors") + .hasArg() + .create('e'); + + private static final Option IO_THREADS = OptionBuilder + .withLongOpt("iothreads") + .withDescription("iothreads per instance") + .withArgName("iothreads") + .hasArg() + .create('t'); + + private static final Option CACHE = OptionBuilder + .withLongOpt("cache") + .withDescription("cache size per instance") + .withArgName("cache") + .hasArg() + .create('c'); + + private static final Option SIZE = OptionBuilder + .withLongOpt("size") + .withDescription("cache size per instance") + .withArgName("size") + .hasArg() + .create('s'); + + private static final Option XMX = OptionBuilder + .withLongOpt("xmx") + .withDescription("working memory size") + .withArgName("xmx") + .hasArg() + .create('w'); + + private static final Option AUXJARS = OptionBuilder + .withLongOpt("auxjars") + .withDescription("additional jars to package (by default, JSON SerDe jar is packaged if available)") + .withArgName("auxjars") + .hasArg() + .create('j'); + + private static final Option AUXHBASE = OptionBuilder + .withLongOpt("auxhbase") + .withDescription("whether to package the HBase jars (true by default)") + .withArgName("auxhbase") + .hasArg() + .create('h'); + + private static final Option HIVECONF = OptionBuilder + .withLongOpt("hiveconf") + .withDescription("Use value for given property. Overridden by explicit parameters") + .withArgName("property=value") + .hasArgs(2) + .withValueSeparator() + .create(); + + private static final Option JAVAHOME = OptionBuilder + .withLongOpt("javaHome") + .withDescription("Path to the JRE/JDK. This should be installed at the same location on all cluster nodes " + + "($JAVA_HOME, java.home by default)") + .withArgName("javaHome") + .hasArg() + .create(); + + private static final Option QUEUE = OptionBuilder + .withLongOpt("queue") + .withDescription("The queue within which LLAP will be started") + .withArgName("queue") + .hasArg() + .create('q'); + + private static final Set VALID_LOGGERS = ImmutableSet.of(LogHelpers.LLAP_LOGGER_NAME_RFA.toLowerCase(), + LogHelpers.LLAP_LOGGER_NAME_QUERY_ROUTING.toLowerCase(), LogHelpers.LLAP_LOGGER_NAME_CONSOLE.toLowerCase()); + + private static final Option LOGGER = OptionBuilder + .withLongOpt("logger") + .withDescription("logger for llap instance ([" + VALID_LOGGERS + "]") + .withArgName("logger") + .hasArg() + .create(); + + private static final Option START = OptionBuilder + .withLongOpt("startImmediately") + .withDescription("immediately start the cluster") + .withArgName("startImmediately") + .hasArg(false) + .create('z'); + + private static final Option OUTPUT = OptionBuilder + .withLongOpt("output") + .withDescription("Output directory for the generated scripts") + .withArgName("output") + .hasArg() + .create(); + + private static final Option AUXHIVE = OptionBuilder + .withLongOpt("auxhive") + .withDescription("whether to package the Hive aux jars (true by default)") + .withArgName("auxhive") + .hasArg() + .create("auxhive"); + + private static final Option HELP = OptionBuilder + .withLongOpt("help") + .withDescription("Print help information") + .withArgName("help") + .hasArg(false) + .create('H'); + + // Options for the python script that are here because our option parser cannot ignore the unknown ones + private static final String OPTION_INSTANCES = "instances"; + private static final String OPTION_ARGS = "args"; + private static final String OPTION_LOGLEVEL = "loglevel"; + private static final String OPTION_SERVICE_KEYTAB_DIR = "service-keytab-dir"; + private static final String OPTION_SERVICE_KEYTAB = "service-keytab"; + private static final String OPTION_SERVICE_PRINCIPAL = "service-principal"; + private static final String OPTION_SERVICE_PLACEMENT = "service-placement"; + private static final String OPTION_SERVICE_DEFAULT_KEYTAB = "service-default-keytab"; + private static final String OPTION_HEALTH_PERCENT = "health-percent"; + private static final String OPTION_HEALTH_TIME_WINDOW_SECS = "health-time-window-secs"; + private static final String OPTION_HEALTH_INIT_DELAY_SECS = "health-init-delay-secs"; + private static final String OPTION_SERVICE_AM_CONTAINER_MB = "service-am-container-mb"; + private static final String OPTION_SERVICE_APPCONFIG_GLOBAL = "service-appconfig-global"; + + private static final Options OPTIONS = new Options(); + static { + OPTIONS.addOption(DIRECTORY); + OPTIONS.addOption(NAME); + OPTIONS.addOption(EXECUTORS); + OPTIONS.addOption(IO_THREADS); + OPTIONS.addOption(CACHE); + OPTIONS.addOption(SIZE); + OPTIONS.addOption(XMX); + OPTIONS.addOption(AUXJARS); + OPTIONS.addOption(AUXHBASE); + OPTIONS.addOption(HIVECONF); + OPTIONS.addOption(JAVAHOME); + OPTIONS.addOption(QUEUE); + OPTIONS.addOption(LOGGER); + OPTIONS.addOption(START); + OPTIONS.addOption(OUTPUT); + OPTIONS.addOption(AUXHIVE); + OPTIONS.addOption(HELP); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_INSTANCES) + .withDescription("Specify the number of instances to run this on") + .withArgName(OPTION_INSTANCES) + .hasArg() + .create('i')); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_ARGS) + .withDescription("java arguments to the llap instance") + .withArgName(OPTION_ARGS) + .hasArg() + .create('a')); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_LOGLEVEL) + .withDescription("log levels for the llap instance") + .withArgName(OPTION_LOGLEVEL) + .hasArg() + .create('l')); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_SERVICE_KEYTAB_DIR) + .withDescription("Service AM keytab directory on HDFS (where the headless user keytab is stored by Service " + + "keytab installation, e.g. .yarn/keytabs/llap)") + .withArgName(OPTION_SERVICE_KEYTAB_DIR) + .hasArg() + .create()); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_SERVICE_KEYTAB) + .withDescription("Service AM keytab file name inside " + OPTION_SERVICE_KEYTAB_DIR) + .withArgName(OPTION_SERVICE_KEYTAB) + .hasArg() + .create()); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_SERVICE_PRINCIPAL) + .withDescription("Service AM principal; should be the user running the cluster, e.g. hive@EXAMPLE.COM") + .withArgName(OPTION_SERVICE_PRINCIPAL) + .hasArg() + .create()); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_SERVICE_PLACEMENT) + .withDescription("Service placement policy; see YARN documentation at " + + "https://issues.apache.org/jira/browse/YARN-1042. This is unnecessary if LLAP is going to take more than " + + "half of the YARN capacity of a node.") + .withArgName(OPTION_SERVICE_PLACEMENT) + .hasArg() + .create()); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_SERVICE_DEFAULT_KEYTAB) + .withDescription("try to set default settings for Service AM keytab; mostly for dev testing") + .withArgName(OPTION_SERVICE_DEFAULT_KEYTAB) + .hasArg(false) + .create()); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_HEALTH_PERCENT) + .withDescription("Percentage of running containers after which LLAP application is considered healthy" + + " (Default: 80)") + .withArgName(OPTION_HEALTH_PERCENT) + .hasArg() + .create()); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_HEALTH_TIME_WINDOW_SECS) + .withDescription("Time window in seconds (after initial delay) for which LLAP application is allowed to be " + + "in unhealthy state before being killed (Default: 300)") + .withArgName(OPTION_HEALTH_TIME_WINDOW_SECS) + .hasArg() + .create()); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_HEALTH_INIT_DELAY_SECS) + .withDescription("Delay in seconds after which health percentage is monitored (Default: 400)") + .withArgName(OPTION_HEALTH_INIT_DELAY_SECS) + .hasArg() + .create()); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_SERVICE_AM_CONTAINER_MB) + .withDescription("The size of the service AppMaster container in MB") + .withArgName("b") + .hasArg() + .create('b')); + + OPTIONS.addOption(OptionBuilder + .withLongOpt(OPTION_SERVICE_APPCONFIG_GLOBAL) + .withDescription("Property (key=value) to be set in the global section of the Service appConfig") + .withArgName("property=value") + .hasArgs(2) + .withValueSeparator() + .create()); + } + + private String[] args; + + private String directory; + private String name; + private int executors; + private int ioThreads; + private long cache; + private long size; + private long xmx; + private String jars; + private boolean isHbase; + private Properties conf = new Properties(); + private String javaPath = null; + private String llapQueueName; + private String logger = null; + private boolean isStarting; + private String output; + private boolean isHiveAux; + private boolean isHelp; + + static LlapServiceCommandLine parseArguments(String[] args) { + LlapServiceCommandLine cl = null; + try { + cl = new LlapServiceCommandLine(args); + } catch (Exception e) { + LOG.error("Parsing the command line arguments failed", e); + printUsage(); + System.exit(1); + } + + if (cl.isHelp) { + printUsage(); + System.exit(0); + } + + return cl; + } + + LlapServiceCommandLine(String[] args) throws ParseException { + LOG.info("LLAP invoked with arguments = {}", Arrays.toString(args)); + this.args = args; + parseCommandLine(args); + } + + private void parseCommandLine(String[] args) throws ParseException { + CommandLine cl = new GnuParser().parse(OPTIONS, args); + if (cl.hasOption(HELP.getOpt())) { + isHelp = true; + return; + } + + if (!cl.hasOption(OPTION_INSTANCES)) { + printUsage(); + throw new ParseException("instance must be set"); + } + + int instances = Integer.parseInt(cl.getOptionValue(OPTION_INSTANCES)); + if (instances <= 0) { + throw new ParseException("Invalid configuration: " + instances + " (should be greater than 0)"); + } + + directory = cl.getOptionValue(DIRECTORY.getOpt()); + name = cl.getOptionValue(NAME.getOpt()); + executors = Integer.parseInt(cl.getOptionValue(EXECUTORS.getOpt(), "-1")); + ioThreads = Integer.parseInt(cl.getOptionValue(IO_THREADS.getOpt(), Integer.toString(executors))); + cache = TraditionalBinaryPrefix.string2long(cl.getOptionValue(CACHE.getOpt(), "-1")); + size = TraditionalBinaryPrefix.string2long(cl.getOptionValue(SIZE.getOpt(), "-1")); + xmx = TraditionalBinaryPrefix.string2long(cl.getOptionValue(XMX.getOpt(), "-1")); + jars = cl.getOptionValue(AUXJARS.getOpt()); + isHbase = Boolean.parseBoolean(cl.getOptionValue(AUXHBASE.getOpt(), "true")); + if (cl.hasOption(HIVECONF.getLongOpt())) { + conf = cl.getOptionProperties(HIVECONF.getLongOpt()); + } + if (cl.hasOption(JAVAHOME.getLongOpt())) { + javaPath = cl.getOptionValue(JAVAHOME.getLongOpt()); + } + llapQueueName = cl.getOptionValue(QUEUE.getOpt(), ConfVars.LLAP_DAEMON_QUEUE_NAME.getDefaultValue()); + if (cl.hasOption(LOGGER.getLongOpt())) { + logger = cl.getOptionValue(LOGGER.getLongOpt()); + Preconditions.checkArgument(VALID_LOGGERS.contains(logger.toLowerCase())); + } + isStarting = cl.hasOption(START.getOpt()); + output = cl.getOptionValue(OUTPUT.getLongOpt()); + isHiveAux = Boolean.parseBoolean(cl.getOptionValue(AUXHIVE.getOpt(), "true")); + } + + private static void printUsage() { + HelpFormatter hf = new HelpFormatter(); + try { + int width = hf.getWidth(); + int jlineWidth = TerminalFactory.get().getWidth(); + width = Math.min(160, Math.max(jlineWidth, width)); + hf.setWidth(width); + } catch (Throwable t) { // Ignore + } + + hf.printHelp("llap", OPTIONS); + } + + String[] getArgs() { + return args; + } + + String getDirectory() { + return directory; + } + + String getName() { + return name; + } + + int getExecutors() { + return executors; + } + + int getIoThreads() { + return ioThreads; + } + + long getCache() { + return cache; + } + + long getSize() { + return size; + } + + long getXmx() { + return xmx; + } + + String getAuxJars() { + return jars; + } + + boolean getIsHBase() { + return isHbase; + } + + boolean getIsHiveAux() { + return isHiveAux; + } + + Properties getConfig() { + return conf; + } + + String getJavaPath() { + return javaPath; + } + + String getLlapQueueName() { + return llapQueueName; + } + + String getLogger() { + return logger; + } + + boolean isStarting() { + return isStarting; + } + + String getOutput() { + return output; + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceDriver.java new file mode 100644 index 0000000..82753a1 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceDriver.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli.service; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.cli.LlapSliderUtils; +import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.utils.CoreFileSystem; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** Starts the llap daemon. */ +public class LlapServiceDriver { + private static final Logger LOG = LoggerFactory.getLogger(LlapServiceDriver.class.getName()); + + private static final String LLAP_PACKAGE_DIR = ".yarn/package/LLAP/"; + private static final String OUTPUT_DIR_PREFIX = "llap-yarn-"; + + /** + * This is a working configuration for the instance to merge various variables. + * It is not written out for llap server usage + */ + private final HiveConf conf; + private final LlapServiceCommandLine cl; + + public LlapServiceDriver(LlapServiceCommandLine cl) throws Exception { + this.cl = cl; + + SessionState ss = SessionState.get(); + this.conf = (ss != null) ? ss.getConf() : new HiveConf(SessionState.class); + if (conf == null) { + throw new Exception("Cannot load any configuration to run command"); + } + } + + private int run() throws Exception { + Properties propsDirectOptions = new Properties(); + + // Working directory. + Path tmpDir = new Path(cl.getDirectory()); + + long t0 = System.nanoTime(); + + FileSystem fs = FileSystem.get(conf); + FileSystem rawFs = FileSystem.getLocal(conf).getRawFileSystem(); + + int threadCount = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + ExecutorService executor = Executors.newFixedThreadPool(threadCount, + new ThreadFactoryBuilder().setNameFormat("llap-pkg-%d").build()); + + int rc = 0; + try { + + setupConf(propsDirectOptions); + + URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE); + if (logger == null) { + throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties"); + } + + Path home = new Path(System.getenv("HIVE_HOME")); + Path scriptParent = new Path(new Path(home, "scripts"), "llap"); + Path scripts = new Path(scriptParent, "bin"); + + if (!rawFs.exists(home)) { + throw new Exception("Unable to find HIVE_HOME:" + home); + } else if (!rawFs.exists(scripts)) { + LOG.warn("Unable to find llap scripts:" + scripts); + } + + String javaHome = getJavaHome(); + + LlapTarComponentGatherer tarComponentGatherer = new LlapTarComponentGatherer(cl, conf, propsDirectOptions, + fs, rawFs, executor, tmpDir); + tarComponentGatherer.createDirs(); + tarComponentGatherer.submitTarComponentGatherTasks(); + + // TODO: need to move from Python to Java for the rest of the script. + LlapConfigJsonCreator lcjCreator = new LlapConfigJsonCreator(conf, rawFs, tmpDir, cl.getCache(), cl.getXmx(), + javaHome); + lcjCreator.createLlapConfigJson(); + + LOG.debug("Config Json generation took " + (System.nanoTime() - t0) + " ns"); + + tarComponentGatherer.waitForFinish(); + + if (cl.isStarting()) { + rc = startLlap(tmpDir, scriptParent); + } else { + rc = 0; + } + } finally { + executor.shutdown(); + rawFs.close(); + fs.close(); + } + + if (rc == 0) { + LOG.debug("Exiting successfully"); + } else { + LOG.info("Exiting with rc = " + rc); + } + return rc; + } + + private void setupConf(Properties propsDirectOptions) throws Exception { + // needed so that the file is actually loaded into configuration. + for (String f : LlapDaemonConfiguration.DAEMON_CONFIGS) { + conf.addResource(f); + if (conf.getResource(f) == null) { + throw new Exception("Unable to find required config file: " + f); + } + } + for (String f : LlapDaemonConfiguration.SSL_DAEMON_CONFIGS) { + conf.addResource(f); + } + + conf.reloadConfiguration(); + + populateConfWithLlapProperties(conf, cl.getConfig()); + + if (cl.getName() != null) { + // update service registry configs - caveat: this has nothing to do with the actual settings as read by the AM + // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between instances + conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + cl.getName()); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + cl.getName()); + } + + if (cl.getLogger() != null) { + HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, cl.getLogger()); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, cl.getLogger()); + } + + boolean isDirect = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT); + + String cacheStr = LlapUtil.humanReadableByteCount(cl.getCache()); + String sizeStr = LlapUtil.humanReadableByteCount(cl.getSize()); + String xmxStr = LlapUtil.humanReadableByteCount(cl.getXmx()); + + if (cl.getSize() != -1) { + if (cl.getCache() != -1) { + if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) { + // direct heap allocations need to be safer + Preconditions.checkArgument(cl.getCache() < cl.getSize(), "Cache size (" + cacheStr + ") has to be smaller" + + " than the container sizing (" + sizeStr + ")"); + } else if (cl.getCache() < cl.getSize()) { + LOG.warn("Note that this might need YARN physical memory monitoring to be turned off " + + "(yarn.nodemanager.pmem-check-enabled=false)"); + } + } + if (cl.getXmx() != -1) { + Preconditions.checkArgument(cl.getXmx() < cl.getSize(), "Working memory (Xmx=" + xmxStr + ") has to be" + + " smaller than the container sizing (" + sizeStr + ")"); + } + if (isDirect && !HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) { + // direct and not memory mapped + Preconditions.checkArgument(cl.getXmx() + cl.getCache() <= cl.getSize(), "Working memory (Xmx=" + + xmxStr + ") + cache size (" + cacheStr + ") has to be smaller than the container sizing (" + sizeStr + ")"); + } + } + + if (cl.getExecutors() != -1) { + conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, cl.getExecutors()); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, String.valueOf(cl.getExecutors())); + // TODO: vcpu settings - possibly when DRFA works right + } + + if (cl.getIoThreads() != -1) { + conf.setLong(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, cl.getIoThreads()); + propsDirectOptions.setProperty(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, String.valueOf(cl.getIoThreads())); + } + + long cache = cl.getCache(); + if (cache != -1) { + conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache)); + propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache)); + } + + long xmx = cl.getXmx(); + if (xmx != -1) { + // Needs more explanation here + // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor fraction + // from this, to get actual usable memory before it goes into GC + long xmxMb = (xmx / (1024L * 1024L)); + conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, String.valueOf(xmxMb)); + } + + long containerSize = cl.getSize(); + if (containerSize == -1) { + long heapSize = xmx; + if (!isDirect) { + heapSize += cache; + } + containerSize = Math.min((long)(heapSize * 1.2), heapSize + 1024L * 1024 * 1024); + if (isDirect) { + containerSize += cache; + } + } + long containerSizeMB = containerSize / (1024 * 1024); + long minAllocMB = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1); + String containerSizeStr = LlapUtil.humanReadableByteCount(containerSize); + Preconditions.checkArgument(containerSizeMB >= minAllocMB, "Container size (" + containerSizeStr + ") should be " + + "greater than minimum allocation(" + LlapUtil.humanReadableByteCount(minAllocMB * 1024L * 1024L) + ")"); + conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSizeMB); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, String.valueOf(containerSizeMB)); + + LOG.info("Memory settings: container memory: {} executor memory: {} cache memory: {}", containerSizeStr, xmxStr, + cacheStr); + + if (!StringUtils.isEmpty(cl.getLlapQueueName())) { + conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, cl.getLlapQueueName()); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, cl.getLlapQueueName()); + } + } + + private String getJavaHome() { + String javaHome = cl.getJavaPath(); + if (StringUtils.isEmpty(javaHome)) { + javaHome = System.getenv("JAVA_HOME"); + String jreHome = System.getProperty("java.home"); + if (javaHome == null) { + javaHome = jreHome; + } else if (!javaHome.equals(jreHome)) { + LOG.warn("Java versions might not match : JAVA_HOME=[{}],process jre=[{}]", javaHome, jreHome); + } + } + if (StringUtils.isEmpty(javaHome)) { + throw new RuntimeException( + "Could not determine JAVA_HOME from command line parameters, environment or system properties"); + } + LOG.info("Using [{}] for JAVA_HOME", javaHome); + return javaHome; + } + + private static void populateConfWithLlapProperties(Configuration conf, Properties properties) { + for(Entry props : properties.entrySet()) { + String key = (String) props.getKey(); + if (HiveConf.getLlapDaemonConfVars().contains(key)) { + conf.set(key, (String) props.getValue()); + } else { + if (key.startsWith(HiveConf.PREFIX_LLAP) || key.startsWith(HiveConf.PREFIX_HIVE_LLAP)) { + LOG.warn("Adding key [{}] even though it is not in the set of known llap-server keys"); + conf.set(key, (String) props.getValue()); + } else { + LOG.warn("Ignoring unknown llap server parameter: [{}]", key); + } + } + } + } + + private int startLlap(Path tmpDir, Path scriptParent) throws IOException, InterruptedException { + int rc; + String version = System.getenv("HIVE_VERSION"); + if (StringUtils.isEmpty(version)) { + version = DateTime.now().toString("ddMMMyyyy"); + } + + String outputDir = cl.getOutput(); + Path packageDir = null; + if (outputDir == null) { + outputDir = OUTPUT_DIR_PREFIX + version; + packageDir = new Path(Paths.get(".").toAbsolutePath().toString(), OUTPUT_DIR_PREFIX + version); + } else { + packageDir = new Path(outputDir); + } + + rc = runPackagePy(tmpDir, scriptParent, version, outputDir); + if (rc == 0) { + String tarballName = "llap-" + version + ".tar.gz"; + startCluster(conf, cl.getName(), tarballName, packageDir, conf.getVar(ConfVars.LLAP_DAEMON_QUEUE_NAME)); + } + return rc; + } + + private int runPackagePy(Path tmpDir, Path scriptParent, String version, String outputDir) + throws IOException, InterruptedException { + Path scriptPath = new Path(new Path(scriptParent, "yarn"), "package.py"); + List scriptArgs = new ArrayList<>(cl.getArgs().length + 7); + scriptArgs.addAll(Arrays.asList("python", scriptPath.toString(), "--input", tmpDir.toString(), "--output", + outputDir, "--javaChild")); + scriptArgs.addAll(Arrays.asList(cl.getArgs())); + + LOG.debug("Calling package.py via: " + scriptArgs); + ProcessBuilder builder = new ProcessBuilder(scriptArgs); + builder.redirectError(ProcessBuilder.Redirect.INHERIT); + builder.redirectOutput(ProcessBuilder.Redirect.INHERIT); + builder.environment().put("HIVE_VERSION", version); + return builder.start().waitFor(); + } + + private void startCluster(Configuration conf, String name, String packageName, Path packageDir, String queue) { + LOG.info("Starting cluster with " + name + ", " + packageName + ", " + queue + ", " + packageDir); + ServiceClient sc; + try { + sc = LlapSliderUtils.createServiceClient(conf); + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + try { + LOG.info("Executing the stop command"); + sc.actionStop(name, true); + } catch (Exception ex) { // Ignore exceptions from stop + LOG.info(ex.getLocalizedMessage()); + } + try { + LOG.info("Executing the destroy command"); + sc.actionDestroy(name); + } catch (Exception ex) { // Ignore exceptions from destroy + LOG.info(ex.getLocalizedMessage()); + } + LOG.info("Uploading the app tarball"); + CoreFileSystem fs = new CoreFileSystem(conf); + fs.createWithPermissions(new Path(LLAP_PACKAGE_DIR), FsPermission.getDirDefault()); + fs.copyLocalFileToHdfs(new File(packageDir.toString(), packageName), new Path(LLAP_PACKAGE_DIR), + new FsPermission("755")); + + LOG.info("Executing the launch command"); + File yarnfile = new File(new Path(packageDir, "Yarnfile").toString()); + Long lifetime = null; // unlimited lifetime + sc.actionLaunch(yarnfile.getAbsolutePath(), name, lifetime, queue); + LOG.debug("Started the cluster via service API"); + } catch (YarnException | IOException e) { + throw new RuntimeException(e); + } finally { + try { + sc.close(); + } catch (IOException e) { + LOG.info("Failed to close service client", e); + } + } + } + + public static void main(String[] args) throws Exception { + LlapServiceCommandLine cl = new LlapServiceCommandLine(args); + int ret = 0; + try { + ret = new LlapServiceDriver(cl).run(); + } catch (Throwable t) { + System.err.println("Failed: " + t.getMessage()); + t.printStackTrace(); + ret = 3; + } finally { + LOG.info("LLAP service driver finished"); + } + LOG.debug("Completed processing - exiting with " + ret); + System.exit(ret); + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapTarComponentGatherer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapTarComponentGatherer.java new file mode 100644 index 0000000..a83647b --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapTarComponentGatherer.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli.service; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Gathers all the jar files necessary to start llap. + */ +class LlapTarComponentGatherer { + private static final Logger LOG = LoggerFactory.getLogger(LlapTarComponentGatherer.class.getName()); + + //using Callable instead of Runnable to be able to throw Exception + private final Map> tasks = new HashMap<>(); + + private final LlapServiceCommandLine cl; + private final HiveConf conf; + private final Properties directProperties; + private final FileSystem fs; + private final FileSystem rawFs; + private final ExecutorService executor; + private final Path libDir; + private final Path tezDir; + private final Path udfDir; + private final Path confDir; + + LlapTarComponentGatherer(LlapServiceCommandLine cl, HiveConf conf, Properties directProperties, FileSystem fs, + FileSystem rawFs, ExecutorService executor, Path tmpDir) { + this.cl = cl; + this.conf = conf; + this.directProperties = directProperties; + this.fs = fs; + this.rawFs = rawFs; + this.executor = executor; + this.libDir = new Path(tmpDir, "lib"); + this.tezDir = new Path(libDir, "tez"); + this.udfDir = new Path(libDir, "udfs"); + this.confDir = new Path(tmpDir, "conf"); + } + + void createDirs() throws Exception { + if (!rawFs.mkdirs(tezDir)) { + LOG.warn("mkdirs for " + tezDir + " returned false"); + } + if (!rawFs.mkdirs(udfDir)) { + LOG.warn("mkdirs for " + udfDir + " returned false"); + } + if (!rawFs.mkdirs(confDir)) { + LOG.warn("mkdirs for " + confDir + " returned false"); + } + } + + void submitTarComponentGatherTasks() { + CompletionService asyncRunner = new ExecutorCompletionService(executor); + + tasks.put("downloadTezJars", asyncRunner.submit(new AsyncTaskDownloadTezJars(conf, fs, rawFs, libDir, tezDir))); + tasks.put("copyLocalJars", asyncRunner.submit(new AsyncTaskCopyLocalJars(rawFs, libDir))); + tasks.put("copyAuxJars", asyncRunner.submit(new AsyncTaskCopyAuxJars(cl, conf, rawFs, libDir))); + tasks.put("createUdfFile", asyncRunner.submit(new AsyncTaskCreateUdfFile(conf, fs, rawFs, udfDir, confDir))); + tasks.put("copyConfigs", asyncRunner.submit(new AsyncTaskCopyConfigs(cl, conf, directProperties, rawFs, + confDir))); + } + + void waitForFinish() throws Exception { + for (Map.Entry> task : tasks.entrySet()) { + long t1 = System.nanoTime(); + task.getValue().get(); + long t2 = System.nanoTime(); + LOG.debug(task.getKey() + " waited for " + (t2 - t1) + " ns"); + } + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/package-info.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/package-info.java new file mode 100644 index 0000000..46aacf8 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package consisting the program LlapServiceDriver (and other classes used by it) which is starting up the llap daemon. + */ +package org.apache.hadoop.hive.llap.cli.service; + diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/TestLlapServiceCommandLine.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/TestLlapServiceCommandLine.java new file mode 100644 index 0000000..bb2a99b --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/TestLlapServiceCommandLine.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli.service; + +import static junit.framework.TestCase.assertEquals; + +import java.util.Properties; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.configuration2.ConfigurationConverter; +import org.apache.commons.configuration2.MapConfiguration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.google.common.collect.ImmutableMap; + +/** Tests for LlapServiceCommandLine. */ +public class TestLlapServiceCommandLine { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testArgumentParsingEmpty() throws Exception { + thrown.expect(ParseException.class); + thrown.expectMessage("instance must be set"); + + new LlapServiceCommandLine(new String[] {}); + } + + @Test + public void testArgumentParsingDefault() throws Exception { + LlapServiceCommandLine cl = new LlapServiceCommandLine(new String[] {"--instances", "1"}); + assertEquals(null, cl.getAuxJars()); + assertEquals(-1, cl.getCache()); + assertEquals(new Properties(), cl.getConfig()); + assertEquals(null, cl.getDirectory()); + assertEquals(-1, cl.getExecutors()); + assertEquals(-1, cl.getIoThreads()); + assertEquals(true, cl.getIsHBase()); + assertEquals(true, cl.getIsHiveAux()); + assertEquals(null, cl.getJavaPath()); + assertEquals(null, cl.getLlapQueueName()); + assertEquals(null, cl.getLogger()); + assertEquals(null, cl.getName()); + assertEquals(null, cl.getOutput()); + assertEquals(-1, cl.getSize()); + assertEquals(-1, cl.getXmx()); + assertEquals(false, cl.isStarting()); + } + + @Test + public void testParsingArguments() throws Exception { + LlapServiceCommandLine cl = new LlapServiceCommandLine(new String[] {"--instances", "2", "--auxjars", "auxjarsVal", + "--cache", "10k", "--hiveconf", "a=b", "--directory", "directoryVal", "--executors", "4", "--iothreads", "5", + "--auxhbase", "false", "--auxhive", "false", "--javaHome", "javaHomeVal", "--queue", "queueVal", + "--logger", "console", "--name", "nameVal", "--output", "outputVal", "--size", "10m", "--xmx", "10g", + "--startImmediately"}); + assertEquals("auxjarsVal", cl.getAuxJars()); + assertEquals(10L * 1024, cl.getCache()); + assertEquals(ConfigurationConverter.getProperties(new MapConfiguration(ImmutableMap.of("a", "b"))), cl.getConfig()); + assertEquals("directoryVal", cl.getDirectory()); + assertEquals(4, cl.getExecutors()); + assertEquals(5, cl.getIoThreads()); + assertEquals(false, cl.getIsHBase()); + assertEquals(false, cl.getIsHiveAux()); + assertEquals("javaHomeVal", cl.getJavaPath()); + assertEquals("queueVal", cl.getLlapQueueName()); + assertEquals("console", cl.getLogger()); + assertEquals("nameVal", cl.getName()); + assertEquals("outputVal", cl.getOutput()); + assertEquals(10L * 1024 * 1024, cl.getSize()); + assertEquals(10L * 1024 * 1024 * 1024, cl.getXmx()); + assertEquals(true, cl.isStarting()); + } + + @Test + public void testIllegalLogger() throws Exception { + thrown.expect(IllegalArgumentException.class); + new LlapServiceCommandLine(new String[] {"--instances", "1", "--logger", "someValue"}); + } + + @Test + public void testIllegalInstances() throws Exception { + thrown.expect(NumberFormatException.class); + new LlapServiceCommandLine(new String[] {"--instances", "a"}); + } + + @Test + public void testIllegalCache() throws Exception { + thrown.expect(IllegalArgumentException.class); + new LlapServiceCommandLine(new String[] {"--instances", "1", "--cache", "a"}); + } + + @Test + public void testIllegalExecutors() throws Exception { + thrown.expect(NumberFormatException.class); + new LlapServiceCommandLine(new String[] {"--instances", "1", "--executors", "a"}); + } + + @Test + public void testIllegalIoThreads() throws Exception { + thrown.expect(NumberFormatException.class); + new LlapServiceCommandLine(new String[] {"--instances", "1", "--iothreads", "a"}); + } + + @Test + public void testIllegalSize() throws Exception { + thrown.expect(IllegalArgumentException.class); + new LlapServiceCommandLine(new String[] {"--instances", "1", "--size", "a"}); + } + + @Test + public void testIllegalXmx() throws Exception { + thrown.expect(IllegalArgumentException.class); + new LlapServiceCommandLine(new String[] {"--instances", "1", "--xmx", "a"}); + } +} diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/package-info.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/package-info.java new file mode 100644 index 0000000..e8746d2 --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package consisting the tests for the program LlapServiceDriver and other classes used by it. + */ +package org.apache.hadoop.hive.llap.cli.service; +