diff --git a/bin/ext/llap.sh b/bin/ext/llap.sh new file mode 100644 index 0000000..93cd753 --- /dev/null +++ b/bin/ext/llap.sh @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +THISSERVICE=llap +export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} " + +llap () { + TMPDIR=$(mktemp -d /tmp/staging-slider-XXXXXX) + CLASS=org.apache.hadoop.hive.llap.cli.LlapServiceDriver; + if [ ! -f ${HIVE_LIB}/hive-cli-*.jar ]; then + echo "Missing Hive CLI Jar" + exit 3; + fi + + if $cygwin; then + HIVE_LIB=`cygpath -w "$HIVE_LIB"` + fi + + set -e; + + # hadoop 20 or newer - skip the aux_jars option. picked up from hiveconf + $HADOOP $CLASS $HIVE_OPTS -directory $TMPDIR "$@" + + # check for config files + test -f $TMPDIR/config.json + + python $HIVE_HOME/scripts/llap/slider/package.py --input $TMPDIR "$@" +} + +llap_help () { + CLASS=org.apache.hadoop.hive.llap.ServiceDriver + execHiveCmd $CLASS "--help" +} + diff --git a/cli/pom.xml b/cli/pom.xml index fc9a400..b55e2e6 100644 --- a/cli/pom.xml +++ b/cli/pom.xml @@ -64,6 +64,11 @@ hive-exec ${project.version} + + org.apache.hive + hive-llap-server + ${project.version} + commons-cli diff --git a/common/src/java/org/apache/hadoop/hive/common/CompressionUtils.java b/common/src/java/org/apache/hadoop/hive/common/CompressionUtils.java index 0e0d538..521a35a 100644 --- a/common/src/java/org/apache/hadoop/hive/common/CompressionUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/CompressionUtils.java @@ -21,13 +21,26 @@ import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.LinkedList; +import java.util.List; +import org.apache.commons.compress.archivers.ArchiveException; +import org.apache.commons.compress.archivers.ArchiveStreamFactory; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; import org.apache.commons.compress.utils.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tools.zip.ZipEntry; +import org.apache.tools.zip.ZipOutputStream; /** * This class contains methods used for the purposes of compression, this class @@ -35,6 +48,8 @@ */ public class CompressionUtils { + static final Log LOG = LogFactory.getLog(CompressionUtils.class); + /** * Archive all the files in the inputFiles into outputFile * @@ -70,4 +85,120 @@ public static void tar(String parentDir, String[] inputFiles, String outputFile) org.apache.hadoop.io.IOUtils.closeStream(out); } } + + public static void zip(String parentDir, String[] inputFiles, String outputFile) + throws IOException { + ZipOutputStream output = null; + try { + output = new ZipOutputStream(new FileOutputStream(new File(parentDir, outputFile))); + for (int i = 0; i < inputFiles.length; i++) { + File f = new File(parentDir, inputFiles[i]); + FileInputStream input = new FileInputStream(f); + output.putNextEntry(new ZipEntry(inputFiles[i])); + try { + IOUtils.copy(input, output); + } finally { + input.close(); + } + } + } finally { + org.apache.hadoop.io.IOUtils.closeStream(output); + } + } + + /** + * Untar an input file into an output file. + * + * The output file is created in the output folder, having the same name as the input file, minus + * the '.tar' extension. + * + * @param inputFile the input .tar file + * @param outputDir the output directory file. + * @throws IOException + * @throws FileNotFoundException + * + * @return The {@link List} of {@link File}s with the untared content. + * @throws ArchiveException + */ + public static List unTar(final String inputFileName, final String outputDirName) + throws FileNotFoundException, IOException, ArchiveException { + return unTar(inputFileName, outputDirName, false); + } + + /** + * Untar an input file into an output file. + * + * The output file is created in the output folder, having the same name as the input file, minus + * the '.tar' extension. + * + * @param inputFile the input .tar file + * @param outputDir the output directory file. + * @throws IOException + * @throws FileNotFoundException + * + * @return The {@link List} of {@link File}s with the untared content. + * @throws ArchiveException + */ + public static List unTar(final String inputFileName, final String outputDirName, + boolean flatten) throws FileNotFoundException, IOException, ArchiveException { + + File inputFile = new File(inputFileName); + File outputDir = new File(outputDirName); + + final List untaredFiles = new LinkedList(); + final InputStream is; + + if (inputFileName.endsWith(".gz")) { + is = new GzipCompressorInputStream(new FileInputStream(inputFile)); + } else { + is = new FileInputStream(inputFile); + } + + final TarArchiveInputStream debInputStream = + (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream("tar", is); + TarArchiveEntry entry = null; + while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) { + final File outputFile = new File(outputDir, entry.getName()); + if (entry.isDirectory()) { + if (flatten) { + // no sub-directories + continue; + } + LOG.debug(String.format("Attempting to write output directory %s.", + outputFile.getAbsolutePath())); + if (!outputFile.exists()) { + LOG.debug(String.format("Attempting to create output directory %s.", + outputFile.getAbsolutePath())); + if (!outputFile.mkdirs()) { + throw new IllegalStateException(String.format("Couldn't create directory %s.", + outputFile.getAbsolutePath())); + } + } + } else { + final OutputStream outputFileStream; + if (flatten) { + File flatOutputFile = new File(outputDir, outputFile.getName()); + LOG.debug(String.format("Creating flat output file %s.", flatOutputFile.getAbsolutePath())); + outputFileStream = new FileOutputStream(flatOutputFile); + } else if (!outputFile.getParentFile().exists()) { + LOG.debug(String.format("Attempting to create output directory %s.", + outputFile.getParentFile().getAbsoluteFile())); + if (!outputFile.getParentFile().getAbsoluteFile().mkdirs()) { + throw new IllegalStateException(String.format("Couldn't create directory %s.", + outputFile.getParentFile().getAbsolutePath())); + } + LOG.debug(String.format("Creating output file %s.", outputFile.getAbsolutePath())); + outputFileStream = new FileOutputStream(outputFile); + } else { + outputFileStream = new FileOutputStream(outputFile); + } + IOUtils.copy(debInputStream, outputFileStream); + outputFileStream.close(); + } + untaredFiles.add(outputFile); + } + debInputStream.close(); + + return untaredFiles; + } } diff --git a/llap-server/bin/llapDaemon.sh b/llap-server/bin/llapDaemon.sh index a02d35d..010e1e6 100755 --- a/llap-server/bin/llapDaemon.sh +++ b/llap-server/bin/llapDaemon.sh @@ -80,7 +80,7 @@ if [ ! -w "$LLAP_DAEMON_LOG_DIR" ] ; then fi if [ "$LLAP_DAEMON_PID_DIR" = "" ]; then - LLAP_DAEMON_PID_DIR=/tmp + LLAP_DAEMON_PID_DIR=/tmp/$USER fi # some variables @@ -92,7 +92,7 @@ if [ ! -n "${LLAP_DAEMON_LOGGER}" ]; then fi logLog=$LLAP_DAEMON_LOG_DIR/$LLAP_DAEMON_LOG_BASE.log logOut=$LLAP_DAEMON_LOG_DIR/$LLAP_DAEMON_LOG_BASE.out -pid=$LLAP_DAEMON_PID_DIR/llap-daemon-$USER.pid +pid=$LLAP_DAEMON_PID_DIR/llap-daemon.pid LLAP_DAEMON_STOP_TIMEOUT=${LLAP_DAEMON_STOP_TIMEOUT:-2} # Set default scheduling priority diff --git a/llap-server/bin/runLlapDaemon.sh b/llap-server/bin/runLlapDaemon.sh index 4bde4eb..7bc7444 100755 --- a/llap-server/bin/runLlapDaemon.sh +++ b/llap-server/bin/runLlapDaemon.sh @@ -1,4 +1,6 @@ -#!/usr/bin/env bash +#!/usr/bin/env bash + +set -x # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -72,7 +74,7 @@ if [ ! -n "${LLAP_DAEMON_LOGGER}" ]; then LLAP_DAEMON_LOGGER=${LOG_LEVEL_DEFAULT} fi -CLASSPATH=${LLAP_DAEMON_CONF_DIR}:${LLAP_DAEMON_HOME}/hive-llap-server-1.2.0-SNAPSHOT.jar:${LLAP_DAEMON_HOME}/hive-exec-1.2.0-SNAPSHOT.jar:`${HADOOP_PREFIX}/bin/hadoop classpath`:. +CLASSPATH=${LLAP_DAEMON_CONF_DIR}:${LLAP_DAEMON_HOME}/lib/*:`${HADOOP_PREFIX}/bin/hadoop classpath`:. if [ -n "LLAP_DAEMON_USER_CLASSPATH" ]; then CLASSPATH=${CLASSPATH}:${LLAP_DAEMON_USER_CLASSPATH} diff --git a/llap-server/pom.xml b/llap-server/pom.xml index 300ba8c..7b17f17 100644 --- a/llap-server/pom.xml +++ b/llap-server/pom.xml @@ -85,6 +85,11 @@ libthrift ${libthrift.version} + + org.json + json + ${json.version} + junit diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java new file mode 100644 index 0000000..825771d --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class LlapOptionsProcessor { + + public class LlapOptions { + private int instances = 0; + private String directory = null; + + public LlapOptions(int instances, String directory) + throws ParseException { + if (instances <= 0) { + throw new ParseException("Invalid configuration: " + instances + + " (should be greater than 0)"); + } + this.instances = instances; + this.directory = directory; + } + + public int getNumInstances() { + return instances; + } + + public String getDirectory() { + return directory; + } + } + + protected static final Log l4j = LogFactory.getLog(LlapOptionsProcessor.class.getName()); + private final Options options = new Options(); + Map hiveVariables = new HashMap(); + private org.apache.commons.cli.CommandLine commandLine; + + @SuppressWarnings("static-access") + public LlapOptionsProcessor() { + + // set the number of instances on which llap should run + options.addOption(OptionBuilder.hasArg().withArgName("instances").withLongOpt("instances") + .withDescription("Specify the number of instances to run this on").create('i')); + + // [-H|--help] + options.addOption(new Option("H", "help", false, "Print help information")); + + options.addOption(OptionBuilder.hasArg().withArgName("directory").withLongOpt("directory") + .withDescription("Temp directory for jars etc.").create('d')); + } + + public LlapOptions processOptions(String argv[]) throws ParseException { + commandLine = new GnuParser().parse(options, argv); + if (commandLine.hasOption('H')) { + printUsage(); + return null; + } + + int instances = Integer.parseInt(commandLine.getOptionValue("instances")); + String directory = commandLine.getOptionValue("directory"); + + return new LlapOptions(instances, directory); + } + + private void printUsage() { + new HelpFormatter().printHelp("hive", options); + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java new file mode 100644 index 0000000..2f7259c --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.cli; + +import java.io.OutputStreamWriter; +import java.net.URL; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.CompressionUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.cli.LlapOptionsProcessor.LlapOptions; +import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; +import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.json.JSONArray; +import org.json.JSONObject; + +public class LlapServiceDriver { + + protected static final Log LOG = LogFactory.getLog(LlapServiceDriver.class.getName()); + private final Configuration conf; + + public LlapServiceDriver() { + SessionState ss = SessionState.get(); + conf = (ss != null) ? ss.getConf() : new HiveConf(SessionState.class); + } + + public static void main(String[] args) throws Exception { + int ret = 0; + ret = new LlapServiceDriver().run(args); + if (LOG.isDebugEnabled()) { + LOG.debug("Completed processing - exiting with " + ret); + } + System.exit(ret); + } + + private int run(String[] args) throws Exception { + LlapOptionsProcessor optionsProcessor = new LlapOptionsProcessor(); + LlapOptions options = optionsProcessor.processOptions(args); + Path tmpDir = new Path(options.getDirectory()); + + if (conf == null) { + LOG.warn("Cannot load any configuration to run command"); + return 1; + } + + FileSystem fs = FileSystem.get(conf); + FileSystem lfs = FileSystem.getLocal(conf).getRawFileSystem(); + + String[] neededConfig = + { "tez-site.xml", "hive-site.xml", "llap-daemon-site.xml", "core-site.xml" }; + + // needed so that the file is actually loaded into configuration. + for (String f : neededConfig) { + conf.addResource(f); + if (conf.getResource(f) == null) { + LOG.warn("Unable to find required config file: " + f); + return 2; + } + } + + conf.reloadConfiguration(); + + URL logger = conf.getResource("llap-daemon-log4j.properties"); + + if (null == logger) { + LOG.warn("Unable to find required config file: llap-daemon-log4j.properties"); + return 3; + } + + Path home = new Path(System.getenv("HIVE_HOME")); + Path scripts = new Path(new Path(new Path(home, "scripts"), "llap"), "bin"); + + if (!lfs.exists(home)) { + LOG.warn("Unable to find HIVE_HOME:" + home); + return 3; + } else if (!lfs.exists(scripts)) { + LOG.warn("Unable to find llap scripts:" + scripts); + } + + Path libDir = new Path(tmpDir, "lib"); + + String tezLibs = conf.get("tez.lib.uris"); + if (tezLibs == null) { + LOG.warn("Missing tez.lib.uris in tez-site.xml"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Copying tez libs from " + tezLibs); + } + lfs.mkdirs(libDir); + fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz")); + CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), + libDir.toString(), true); + lfs.delete(new Path(libDir, "tez.tar.gz"), false); + + // TODO: aux jars (like compression libs) + + lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapInputFormat.class)), libDir); + lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(HiveInputFormat.class)), libDir); + + Path confPath = new Path(tmpDir, "conf"); + lfs.mkdirs(confPath); + + for (String f : neededConfig) { + // they will be file:// URLs + lfs.copyFromLocalFile(new Path(conf.getResource(f).toString()), confPath); + } + + lfs.copyFromLocalFile(new Path(logger.toString()), confPath); + + // extract configs for processing by the python fragments in Slider + JSONObject configs = new JSONObject(); + + configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, + HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE)); + + configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT.varname, + HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT)); + + configs.put(LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, conf.getInt( + LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, + LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT)); + + configs.put(LlapDaemonConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE, conf.getInt( + LlapDaemonConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE, + LlapDaemonConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT)); + + configs.put(LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS, conf.getInt( + LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS, + LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT)); + + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1)); + + configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1)); + + FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json")); + OutputStreamWriter w = new OutputStreamWriter(os); + configs.write(w); + w.close(); + os.close(); + + lfs.close(); + fs.close(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Exiting successfully"); + } + + return 0; + } +} diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java index 47c9472..34c04e7 100644 --- a/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java +++ b/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java @@ -14,6 +14,7 @@ package org.apache.tez.dag.app.rm; +import java.io.IOException; import java.util.Arrays; import java.util.HashSet; import java.util.LinkedList; @@ -29,6 +30,7 @@ import com.google.common.primitives.Ints; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -37,8 +39,12 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration; import org.apache.tez.dag.app.AppContext; @@ -70,6 +76,8 @@ // Per Executor Thread private final Resource resourcePerExecutor; + // TODO: replace with service registry + private final YarnClient yc = YarnClient.createYarnClient(); public DaemonTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext, String clientHostname, int clientPort, String trackingUrl, @@ -102,27 +110,69 @@ public DaemonTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext String[] hosts = conf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_AM_SERVICE_HOSTS); if (hosts == null || hosts.length == 0) { hosts = new String[]{"localhost"}; - } - for (String host : hosts) { - serviceHosts.add(host); - serviceHostSet.add(host); + serviceHosts.add("localhost"); + serviceHostSet.add("localhost"); + } else if (!hosts[0].equals("*")) { + for (String host : hosts) { + serviceHosts.add(host); + serviceHostSet.add(host); + } } - LOG.info("Running with configuration: " + - "memoryPerInstance=" + memoryPerInstance + - ", vcoresPerInstance=" + coresPerInstance + - ", executorsPerInstance=" + executorsPerInstance + - ", resourcePerInstanceInferred=" + resourcePerExecutor + - ", hosts=" + serviceHosts.toString()); + if (serviceHosts.size() > 0) { + LOG.info("Running with configuration: " + + "memoryPerInstance=" + memoryPerInstance + + ", vcoresPerInstance=" + coresPerInstance + + ", executorsPerInstance=" + executorsPerInstance + + ", resourcePerInstanceInferred=" + resourcePerExecutor + + ", hosts=" + serviceHosts.toString()); + } else { + LOG.info("Running with configuration: " + + "memoryPerInstance=" + memoryPerInstance + + ", vcoresPerInstance=" + coresPerInstance + + ", executorsPerInstance=" + executorsPerInstance + + ", resourcePerInstanceInferred=" + resourcePerExecutor + + ", hosts="); + } } @Override public void serviceInit(Configuration conf) { + yc.init(conf); } + @Override public void serviceStart() { + yc.start(); + if (serviceHosts.size() > 0) { + return; + } + LOG.info("Evaluating host usage criteria for service nodes"); + try { + List nodes = yc.getNodeReports(NodeState.RUNNING); + for (NodeReport nd : nodes) { + Resource used = nd.getUsed(); + LOG.info("Examining node: " + nd); + if (nd.getNodeState() == NodeState.RUNNING + && used.getMemory() >= memoryPerInstance) { + // TODO: fix this with YARN registry + serviceHosts.add(nd.getNodeId().getHost()); + serviceHostSet.add(nd.getNodeId().getHost()); + } + } + LOG.info("Re-inited with configuration: " + + "memoryPerInstance=" + memoryPerInstance + + ", vcoresPerInstance=" + coresPerInstance + + ", executorsPerInstance=" + executorsPerInstance + + ", resourcePerInstanceInferred=" + resourcePerExecutor + + ", hosts="+ serviceHosts.toString()); + } catch (IOException e) { + e.printStackTrace(); + } catch (YarnException e) { + e.printStackTrace(); + } } @Override diff --git a/llap-server/src/main/resources/llap-daemon-log4j.properties b/llap-server/src/main/resources/llap-daemon-log4j.properties new file mode 100644 index 0000000..51593e9 --- /dev/null +++ b/llap-server/src/main/resources/llap-daemon-log4j.properties @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define some default values that can be overridden by system properties +llap.daemon.root.logger=INFO,console +llap.daemon.log.dir=. +llap.daemon.log.file=llapdaemon.log + +# Define the root logger to the system property "llap.daemon.root.logger". +log4j.rootLogger=${llap.daemon.root.logger} + +# Logging Threshold +log4j.threshold=ALL + +# Null Appender +log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender + +# +# Rolling File Appender - cap space usage at 5gb. +# +llap.daemon.log.maxfilesize=256MB +llap.daemon.log.maxbackupindex=20 +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.File=${llap.daemon.log.dir}/${llap.daemon.log.file} + +log4j.appender.RFA.MaxFileSize=${llap.daemon.log.maxfilesize} +log4j.appender.RFA.MaxBackupIndex=${llap.daemon.log.maxbackupindex} + +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n +# Debugging Pattern format +#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + +# +# Daily Rolling File Appender +# + +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${llap.daemon.log.dir}/${llap.daemon.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n +# Debugging Pattern format +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t(%x)] %p %c{2} : %m%n diff --git a/llap-server/src/main/resources/llap.py b/llap-server/src/main/resources/llap.py new file mode 100644 index 0000000..6d5bdcd --- /dev/null +++ b/llap-server/src/main/resources/llap.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +import os +import subprocess +from resource_management import * +from os.path import dirname + +class Llap(Script): + def install(self, env): + self.install_packages(env) + pass + + def configure(self, env): + import params + env.set_params(params) + + def start(self, env): + import params + env.set_params(params) + os.environ['JAVA_HOME'] = format('{java64_home}') + os.environ['LD_LIBRARY_PATH'] = format('{library_path}') + # this is the same as TEZ_PREFIX + os.environ['LLAP_DAEMON_HOME'] = format('{app_root}') + # this is the location where we have the llap server components (shell scripts) + os.environ['LLAP_DAEMON_BIN_HOME'] = format('{app_root}/bin') + # location containing llap-daemon-site.xml, tez and yarn configuration xmls as well. + os.environ['LLAP_DAEMON_CONF_DIR'] = format("{app_root}/conf/") + os.environ['LLAP_DAEMON_LOG_DIR'] = format("{app_log_dir}/") + os.environ['LLAP_DAEMON_HEAPSIZE'] = format("{memory_val}") + os.environ['LLAP_DAEMON_PID_DIR'] = dirname(format("{pid_file}")) + print "Debug from LLAP python script" + print os.environ['LLAP_DAEMON_CONF_DIR'] + self.configure(env) + location = "bash -x {app_root}/bin/llapDaemon.sh start &> {app_log_dir}/shell.out" + process_cmd = format(location) + + subprocess.call(process_cmd, shell=True + ) + + def stop(self, env): + import params + env.set_params(params) + + def status(self, env): + import params + env.set_params(params) + check_process_status(params.pid_file) + +if __name__ == "__main__": + Llap().execute() diff --git a/llap-server/src/main/resources/package.py b/llap-server/src/main/resources/package.py new file mode 100644 index 0000000..7424440 --- /dev/null +++ b/llap-server/src/main/resources/package.py @@ -0,0 +1,132 @@ +#!/usr/bin/python + +import sys,os,stat +from getopt import getopt +from json import loads as json_parse +from os.path import exists, join, relpath +from time import gmtime, strftime +import shutil +import tarfile +import zipfile + +from templates import metainfo, appConfig, resources, runner + +class LlapResource(object): + def __init__(self, config): + self.memory = config["llap.daemon.memory.per.instance.mb"] + self.cores = config["llap.daemon.vcpus.per.instance"] + # convert to Mb + self.cache = config["hive.llap.io.cache.orc.size"] / (1024*1024.0) + self.direct = config["hive.llap.io.cache.direct"] + self.min_mb = -1 + self.min_cores = -1 + # compute heap + h = max(1.2*self.memory, self.memory + 256) + if (not self.direct): + h += self.cache + c = max(h*1.2, h + 128) + if (self.direct): + c += self.cache + if self.min_mb > 0: + c = c + c%self.min_mb + h = c/1.2 + if self.direct: + h = h - self.cache + self.container_size = int(c) + self.container_cores = self.cores + self.heap_size = int(h) + + def __repr__(self): + return "" % (self.heap_size, self.container_size) + +def zipdir(path, zip, prefix="."): + for root, dirs, files in os.walk(path): + for file in files: + src = join(root, file) + dst = src.replace(path, prefix) + zip.write(src, dst) + +def main(args): + opts, args = getopt(args,"n:o:i:",["instances=","output=", "input="]) + version = os.getenv("HIVE_VERSION") + if not version: + version = strftime("%d%b%Y", gmtime()) + home = os.getenv("HIVE_HOME") + output = "llap-slider-%(version)s" % ({"version": version}) + instances=1 + input = None + for k,v in opts: + if k in ("--input", "-i"): + input = v + elif k in ("--output", "-o"): + output = v + elif k in ("--instances", "-n"): + instances = int(v) + if not input: + print "Cannot find input files" + sys.exit(1) + return + config = json_parse(open(join(input, "config.json")).read()) + resource = LlapResource(config) + vars = { + "home" : home, + "version" : version, + "instances" : instances, + "heap" : resource.heap_size, + "container.mb" : resource.container_size, + "container.cores" : resource.container_cores, + "hadoop_home" : os.getenv("HADOOP_HOME"), + "java_home" : os.getenv("JAVA_HOME"), + "name" : "llap0" + } + + if not exists(output): + os.makedirs(output) + + src = join(home, "scripts", "llap", "bin") + dst = join(input, "bin") + if exists(dst): + shutil.rmtree(dst) + shutil.copytree(src, dst) + + + # Make the zip package + tmp = join(output, "tmp") + pkg = join(tmp, "package") + + src = join(home, "scripts", "llap", "slider") + dst = join(pkg, "scripts") + if exists(dst): + shutil.rmtree(dst) + shutil.copytree(src, dst) + + with open(join(tmp, "metainfo.xml"),"w") as f: + f.write(metainfo % vars) + + os.mkdir(join(pkg, "files")) + tarball = tarfile.open(join(pkg, "files", "llap-%s.tar.gz" % version), "w:gz") + # recursive add + -C chdir inside + tarball.add(input, "") + tarball.close() + + zipped = zipfile.ZipFile(join(output, "llap-%s.zip" % version), "w") + zipdir(tmp, zipped) + zipped.close() + + # cleanup after making zip pkg + shutil.rmtree(tmp) + + with open(join(output, "appConfig.json"), "w") as f: + f.write(appConfig % vars) + + with open(join(output, "resources.json"), "w") as f: + f.write(resources % vars) + + with open(join(output, "run.sh"), "w") as f: + f.write(runner % vars) + os.chmod(join(output, "run.sh"), 0755) + + print "Prepared %s/run.sh for running LLAP on Slider" % (output) + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/llap-server/src/main/resources/params.py b/llap-server/src/main/resources/params.py new file mode 100644 index 0000000..8706d04 --- /dev/null +++ b/llap-server/src/main/resources/params.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management import * + +# server configurations +config = Script.get_config() + +app_root = config['configurations']['global']['app_root'] +java64_home = config['hostLevelParams']['java_home'] +pid_file = config['configurations']['global']['pid_file'] + +additional_cp = config['configurations']['global']['additional_cp'] +app_log_dir = config['configurations']['global']['app_log_dir'] + +port = config['configurations']['global']['listen_port'] +memory_val = config['configurations']['global']['memory_val'] +library_path = config['configurations']['global']['library_path'] diff --git a/llap-server/src/main/resources/templates.py b/llap-server/src/main/resources/templates.py new file mode 100644 index 0000000..382ef17 --- /dev/null +++ b/llap-server/src/main/resources/templates.py @@ -0,0 +1,116 @@ +metainfo = """ + + + + 2.0 + + LLAP + LLAP is a daemon service that works with a cache and works on SQL constructs. + %(version)s + None + + + Servers + + + instances + ${LLAP_HOST}:${site.global.listen_port} + + + + + + + + LLAP + MASTER + Servers-instances + + + PYTHON + + + + + + + any + + + tarball + files/llap-%(version)s.tar.gz + + + + + + + +""" + +appConfig = """ +{ + "schema": "http://example.org/specification/v2.0.0", + "metadata": { + }, + "global": { + "application.def": ".slider/package/LLAP/llap-%(version)s.zip", + "java_home": "%(java_home)s", + "site.global.app_user": "yarn", + "site.global.app_root": "${AGENT_WORK_ROOT}/app/install/", + "site.global.additional_cp": "%(hadoop_home)s", + "site.global.library_path": "%(hadoop_home)s/lib/native", + "site.global.memory_val": "%(heap)d", + "site.global.pid_file": "${AGENT_WORK_ROOT}/app/run/llap-daemon.pid" + }, + "components": { + "slider-appmaster": { + "jvm.heapsize": "1024M" + } + } +} +""" + +resources = """ +{ + "schema" : "http://example.org/specification/v2.0.0", + "metadata" : { + }, + "global" : { + }, + "components": { + "slider-appmaster": { + }, + "LLAP": { + "yarn.role.priority": "1", + "yarn.component.instances": "%(instances)d", + "yarn.memory": "%(container.mb)d" + } + } +} +""" + +runner = """ +#!/bin/bash -e + +BASEDIR=$(dirname $0) +slider stop %(name)s +slider destroy %(name)s +slider install-package --name LLAP --package $BASEDIR/llap-%(version)s.zip --replacepkg +slider create %(name)s --resources $BASEDIR/resources.json --template $BASEDIR/appConfig.json +""" diff --git a/packaging/src/main/assembly/bin.xml b/packaging/src/main/assembly/bin.xml index 260d8a3..a9afb05 100644 --- a/packaging/src/main/assembly/bin.xml +++ b/packaging/src/main/assembly/bin.xml @@ -124,6 +124,7 @@ ${project.parent.basedir}/bin hive + llap/**/* metatool schematool beeline @@ -135,6 +136,24 @@ + 755 + ${project.parent.basedir}/llap-server/bin + + **/* + + scripts/llap/bin + + + + ${project.parent.basedir}/llap-server/src/main/resources/ + + **/*.py + **/*.xml + + scripts/llap/slider + + + ${project.parent.basedir}/metastore/scripts/upgrade **/* @@ -335,6 +354,11 @@ beeline-log4j.properties.template + ${project.parent.basedir}/llap-server/src/main/resources/llap-daemon-log4j.properties + conf + llap-daemon-log4j.properties.template + + ${project.parent.basedir}/hcatalog/README.txt hcatalog/share/doc/hcatalog diff --git a/pom.xml b/pom.xml index ca79be1..8f1a22b 100644 --- a/pom.xml +++ b/pom.xml @@ -42,12 +42,12 @@ hwi jdbc metastore - llap-client - llap-server odbc ql serde service + llap-client + llap-server shims spark-client testutils @@ -108,7 +108,7 @@ 3.2.9 1.2 1.4 - 1.4.1 + 1.9 1.1 3.0.1 2.4 diff --git a/ql/pom.xml b/ql/pom.xml index 152f8e0..f85662e 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -673,7 +673,6 @@ org.apache.hive:hive-exec org.apache.hive:hive-serde org.apache.hive:hive-llap-client - org.apache.hive:hive-llap-server com.esotericsoftware.kryo:kryo com.twitter:parquet-hadoop-bundle org.apache.thrift:libthrift diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index f2ba3c5..d8fc621 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -40,6 +40,7 @@ import java.net.URI; import java.net.URL; import java.net.URLClassLoader; +import java.net.URLDecoder; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.sql.Connection; @@ -55,6 +56,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -188,6 +190,7 @@ import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.FieldSerializer; import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy; +import com.google.common.base.Preconditions; /** * Utilities. @@ -239,6 +242,7 @@ private Utilities() { private static ThreadLocal> gWorkMap = new ThreadLocal>() { + @Override protected Map initialValue() { return new HashMap(); } @@ -3792,4 +3796,39 @@ public static String getQualifiedPath(HiveConf conf, Path path) throws HiveExcep public static boolean isDefaultNameNode(HiveConf conf) { return !conf.getChangedProperties().containsKey(HiveConf.ConfVars.HADOOPFS.varname); } + + /** + * Returns the full path to the Jar containing the class. It always return a JAR. + * + * @param klass + * class. + * + * @return path to the Jar containing the class. + */ + @SuppressWarnings("rawtypes") + public static String jarFinderGetJar(Class klass) { + Preconditions.checkNotNull(klass, "klass"); + ClassLoader loader = klass.getClassLoader(); + if (loader != null) { + String class_file = klass.getName().replaceAll("\\.", "/") + ".class"; + try { + for (Enumeration itr = loader.getResources(class_file); itr.hasMoreElements();) { + URL url = (URL) itr.nextElement(); + String path = url.getPath(); + if (path.startsWith("file:")) { + path = path.substring("file:".length()); + } + path = URLDecoder.decode(path, "UTF-8"); + if ("jar".equals(url.getProtocol())) { + path = URLDecoder.decode(path, "UTF-8"); + return path.replaceAll("!.*$", ""); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return null; + } + }