diff --git a/bin/ext/llap.sh b/bin/ext/llap.sh
new file mode 100644
index 0000000..93cd753
--- /dev/null
+++ b/bin/ext/llap.sh
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+THISSERVICE=llap
+export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "
+
+llap () {
+ TMPDIR=$(mktemp -d /tmp/staging-slider-XXXXXX)
+ CLASS=org.apache.hadoop.hive.llap.cli.LlapServiceDriver;
+ if [ ! -f ${HIVE_LIB}/hive-cli-*.jar ]; then
+ echo "Missing Hive CLI Jar"
+ exit 3;
+ fi
+
+ if $cygwin; then
+ HIVE_LIB=`cygpath -w "$HIVE_LIB"`
+ fi
+
+ set -e;
+
+ # hadoop 20 or newer - skip the aux_jars option. picked up from hiveconf
+ $HADOOP $CLASS $HIVE_OPTS -directory $TMPDIR "$@"
+
+ # check for config files
+ test -f $TMPDIR/config.json
+
+ python $HIVE_HOME/scripts/llap/slider/package.py --input $TMPDIR "$@"
+}
+
+llap_help () {
+ CLASS=org.apache.hadoop.hive.llap.ServiceDriver
+ execHiveCmd $CLASS "--help"
+}
+
diff --git a/cli/pom.xml b/cli/pom.xml
index fc9a400..b55e2e6 100644
--- a/cli/pom.xml
+++ b/cli/pom.xml
@@ -64,6 +64,11 @@
hive-exec
${project.version}
+
+ org.apache.hive
+ hive-llap-server
+ ${project.version}
+
commons-cli
diff --git a/common/src/java/org/apache/hadoop/hive/common/CompressionUtils.java b/common/src/java/org/apache/hadoop/hive/common/CompressionUtils.java
index 0e0d538..521a35a 100644
--- a/common/src/java/org/apache/hadoop/hive/common/CompressionUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/CompressionUtils.java
@@ -21,13 +21,26 @@
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.commons.compress.archivers.ArchiveException;
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tools.zip.ZipEntry;
+import org.apache.tools.zip.ZipOutputStream;
/**
* This class contains methods used for the purposes of compression, this class
@@ -35,6 +48,8 @@
*/
public class CompressionUtils {
+ static final Log LOG = LogFactory.getLog(CompressionUtils.class);
+
/**
* Archive all the files in the inputFiles into outputFile
*
@@ -70,4 +85,120 @@ public static void tar(String parentDir, String[] inputFiles, String outputFile)
org.apache.hadoop.io.IOUtils.closeStream(out);
}
}
+
+ public static void zip(String parentDir, String[] inputFiles, String outputFile)
+ throws IOException {
+ ZipOutputStream output = null;
+ try {
+ output = new ZipOutputStream(new FileOutputStream(new File(parentDir, outputFile)));
+ for (int i = 0; i < inputFiles.length; i++) {
+ File f = new File(parentDir, inputFiles[i]);
+ FileInputStream input = new FileInputStream(f);
+ output.putNextEntry(new ZipEntry(inputFiles[i]));
+ try {
+ IOUtils.copy(input, output);
+ } finally {
+ input.close();
+ }
+ }
+ } finally {
+ org.apache.hadoop.io.IOUtils.closeStream(output);
+ }
+ }
+
+ /**
+ * Untar an input file into an output file.
+ *
+ * The output file is created in the output folder, having the same name as the input file, minus
+ * the '.tar' extension.
+ *
+ * @param inputFile the input .tar file
+ * @param outputDir the output directory file.
+ * @throws IOException
+ * @throws FileNotFoundException
+ *
+ * @return The {@link List} of {@link File}s with the untared content.
+ * @throws ArchiveException
+ */
+ public static List unTar(final String inputFileName, final String outputDirName)
+ throws FileNotFoundException, IOException, ArchiveException {
+ return unTar(inputFileName, outputDirName, false);
+ }
+
+ /**
+ * Untar an input file into an output file.
+ *
+ * The output file is created in the output folder, having the same name as the input file, minus
+ * the '.tar' extension.
+ *
+ * @param inputFile the input .tar file
+ * @param outputDir the output directory file.
+ * @throws IOException
+ * @throws FileNotFoundException
+ *
+ * @return The {@link List} of {@link File}s with the untared content.
+ * @throws ArchiveException
+ */
+ public static List unTar(final String inputFileName, final String outputDirName,
+ boolean flatten) throws FileNotFoundException, IOException, ArchiveException {
+
+ File inputFile = new File(inputFileName);
+ File outputDir = new File(outputDirName);
+
+ final List untaredFiles = new LinkedList();
+ final InputStream is;
+
+ if (inputFileName.endsWith(".gz")) {
+ is = new GzipCompressorInputStream(new FileInputStream(inputFile));
+ } else {
+ is = new FileInputStream(inputFile);
+ }
+
+ final TarArchiveInputStream debInputStream =
+ (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream("tar", is);
+ TarArchiveEntry entry = null;
+ while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) {
+ final File outputFile = new File(outputDir, entry.getName());
+ if (entry.isDirectory()) {
+ if (flatten) {
+ // no sub-directories
+ continue;
+ }
+ LOG.debug(String.format("Attempting to write output directory %s.",
+ outputFile.getAbsolutePath()));
+ if (!outputFile.exists()) {
+ LOG.debug(String.format("Attempting to create output directory %s.",
+ outputFile.getAbsolutePath()));
+ if (!outputFile.mkdirs()) {
+ throw new IllegalStateException(String.format("Couldn't create directory %s.",
+ outputFile.getAbsolutePath()));
+ }
+ }
+ } else {
+ final OutputStream outputFileStream;
+ if (flatten) {
+ File flatOutputFile = new File(outputDir, outputFile.getName());
+ LOG.debug(String.format("Creating flat output file %s.", flatOutputFile.getAbsolutePath()));
+ outputFileStream = new FileOutputStream(flatOutputFile);
+ } else if (!outputFile.getParentFile().exists()) {
+ LOG.debug(String.format("Attempting to create output directory %s.",
+ outputFile.getParentFile().getAbsoluteFile()));
+ if (!outputFile.getParentFile().getAbsoluteFile().mkdirs()) {
+ throw new IllegalStateException(String.format("Couldn't create directory %s.",
+ outputFile.getParentFile().getAbsolutePath()));
+ }
+ LOG.debug(String.format("Creating output file %s.", outputFile.getAbsolutePath()));
+ outputFileStream = new FileOutputStream(outputFile);
+ } else {
+ outputFileStream = new FileOutputStream(outputFile);
+ }
+ IOUtils.copy(debInputStream, outputFileStream);
+ outputFileStream.close();
+ }
+ untaredFiles.add(outputFile);
+ }
+ debInputStream.close();
+
+ return untaredFiles;
+ }
}
diff --git a/llap-server/bin/llapDaemon.sh b/llap-server/bin/llapDaemon.sh
index a02d35d..010e1e6 100755
--- a/llap-server/bin/llapDaemon.sh
+++ b/llap-server/bin/llapDaemon.sh
@@ -80,7 +80,7 @@ if [ ! -w "$LLAP_DAEMON_LOG_DIR" ] ; then
fi
if [ "$LLAP_DAEMON_PID_DIR" = "" ]; then
- LLAP_DAEMON_PID_DIR=/tmp
+ LLAP_DAEMON_PID_DIR=/tmp/$USER
fi
# some variables
@@ -92,7 +92,7 @@ if [ ! -n "${LLAP_DAEMON_LOGGER}" ]; then
fi
logLog=$LLAP_DAEMON_LOG_DIR/$LLAP_DAEMON_LOG_BASE.log
logOut=$LLAP_DAEMON_LOG_DIR/$LLAP_DAEMON_LOG_BASE.out
-pid=$LLAP_DAEMON_PID_DIR/llap-daemon-$USER.pid
+pid=$LLAP_DAEMON_PID_DIR/llap-daemon.pid
LLAP_DAEMON_STOP_TIMEOUT=${LLAP_DAEMON_STOP_TIMEOUT:-2}
# Set default scheduling priority
diff --git a/llap-server/bin/runLlapDaemon.sh b/llap-server/bin/runLlapDaemon.sh
index 4bde4eb..7bc7444 100755
--- a/llap-server/bin/runLlapDaemon.sh
+++ b/llap-server/bin/runLlapDaemon.sh
@@ -1,4 +1,6 @@
-#!/usr/bin/env bash
+#!/usr/bin/env bash
+
+set -x
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -72,7 +74,7 @@ if [ ! -n "${LLAP_DAEMON_LOGGER}" ]; then
LLAP_DAEMON_LOGGER=${LOG_LEVEL_DEFAULT}
fi
-CLASSPATH=${LLAP_DAEMON_CONF_DIR}:${LLAP_DAEMON_HOME}/hive-llap-server-1.2.0-SNAPSHOT.jar:${LLAP_DAEMON_HOME}/hive-exec-1.2.0-SNAPSHOT.jar:`${HADOOP_PREFIX}/bin/hadoop classpath`:.
+CLASSPATH=${LLAP_DAEMON_CONF_DIR}:${LLAP_DAEMON_HOME}/lib/*:`${HADOOP_PREFIX}/bin/hadoop classpath`:.
if [ -n "LLAP_DAEMON_USER_CLASSPATH" ]; then
CLASSPATH=${CLASSPATH}:${LLAP_DAEMON_USER_CLASSPATH}
diff --git a/llap-server/pom.xml b/llap-server/pom.xml
index 300ba8c..7b17f17 100644
--- a/llap-server/pom.xml
+++ b/llap-server/pom.xml
@@ -85,6 +85,11 @@
libthrift
${libthrift.version}
+
+ org.json
+ json
+ ${json.version}
+
junit
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
new file mode 100644
index 0000000..825771d
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class LlapOptionsProcessor {
+
+ public class LlapOptions {
+ private int instances = 0;
+ private String directory = null;
+
+ public LlapOptions(int instances, String directory)
+ throws ParseException {
+ if (instances <= 0) {
+ throw new ParseException("Invalid configuration: " + instances
+ + " (should be greater than 0)");
+ }
+ this.instances = instances;
+ this.directory = directory;
+ }
+
+ public int getNumInstances() {
+ return instances;
+ }
+
+ public String getDirectory() {
+ return directory;
+ }
+ }
+
+ protected static final Log l4j = LogFactory.getLog(LlapOptionsProcessor.class.getName());
+ private final Options options = new Options();
+ Map hiveVariables = new HashMap();
+ private org.apache.commons.cli.CommandLine commandLine;
+
+ @SuppressWarnings("static-access")
+ public LlapOptionsProcessor() {
+
+ // set the number of instances on which llap should run
+ options.addOption(OptionBuilder.hasArg().withArgName("instances").withLongOpt("instances")
+ .withDescription("Specify the number of instances to run this on").create('i'));
+
+ // [-H|--help]
+ options.addOption(new Option("H", "help", false, "Print help information"));
+
+ options.addOption(OptionBuilder.hasArg().withArgName("directory").withLongOpt("directory")
+ .withDescription("Temp directory for jars etc.").create('d'));
+ }
+
+ public LlapOptions processOptions(String argv[]) throws ParseException {
+ commandLine = new GnuParser().parse(options, argv);
+ if (commandLine.hasOption('H')) {
+ printUsage();
+ return null;
+ }
+
+ int instances = Integer.parseInt(commandLine.getOptionValue("instances"));
+ String directory = commandLine.getOptionValue("directory");
+
+ return new LlapOptions(instances, directory);
+ }
+
+ private void printUsage() {
+ new HelpFormatter().printHelp("hive", options);
+ }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
new file mode 100644
index 0000000..2f7259c
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli;
+
+import java.io.OutputStreamWriter;
+import java.net.URL;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.CompressionUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.cli.LlapOptionsProcessor.LlapOptions;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+public class LlapServiceDriver {
+
+ protected static final Log LOG = LogFactory.getLog(LlapServiceDriver.class.getName());
+ private final Configuration conf;
+
+ public LlapServiceDriver() {
+ SessionState ss = SessionState.get();
+ conf = (ss != null) ? ss.getConf() : new HiveConf(SessionState.class);
+ }
+
+ public static void main(String[] args) throws Exception {
+ int ret = 0;
+ ret = new LlapServiceDriver().run(args);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Completed processing - exiting with " + ret);
+ }
+ System.exit(ret);
+ }
+
+ private int run(String[] args) throws Exception {
+ LlapOptionsProcessor optionsProcessor = new LlapOptionsProcessor();
+ LlapOptions options = optionsProcessor.processOptions(args);
+ Path tmpDir = new Path(options.getDirectory());
+
+ if (conf == null) {
+ LOG.warn("Cannot load any configuration to run command");
+ return 1;
+ }
+
+ FileSystem fs = FileSystem.get(conf);
+ FileSystem lfs = FileSystem.getLocal(conf).getRawFileSystem();
+
+ String[] neededConfig =
+ { "tez-site.xml", "hive-site.xml", "llap-daemon-site.xml", "core-site.xml" };
+
+ // needed so that the file is actually loaded into configuration.
+ for (String f : neededConfig) {
+ conf.addResource(f);
+ if (conf.getResource(f) == null) {
+ LOG.warn("Unable to find required config file: " + f);
+ return 2;
+ }
+ }
+
+ conf.reloadConfiguration();
+
+ URL logger = conf.getResource("llap-daemon-log4j.properties");
+
+ if (null == logger) {
+ LOG.warn("Unable to find required config file: llap-daemon-log4j.properties");
+ return 3;
+ }
+
+ Path home = new Path(System.getenv("HIVE_HOME"));
+ Path scripts = new Path(new Path(new Path(home, "scripts"), "llap"), "bin");
+
+ if (!lfs.exists(home)) {
+ LOG.warn("Unable to find HIVE_HOME:" + home);
+ return 3;
+ } else if (!lfs.exists(scripts)) {
+ LOG.warn("Unable to find llap scripts:" + scripts);
+ }
+
+ Path libDir = new Path(tmpDir, "lib");
+
+ String tezLibs = conf.get("tez.lib.uris");
+ if (tezLibs == null) {
+ LOG.warn("Missing tez.lib.uris in tez-site.xml");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Copying tez libs from " + tezLibs);
+ }
+ lfs.mkdirs(libDir);
+ fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz"));
+ CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(),
+ libDir.toString(), true);
+ lfs.delete(new Path(libDir, "tez.tar.gz"), false);
+
+ // TODO: aux jars (like compression libs)
+
+ lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(LlapInputFormat.class)), libDir);
+ lfs.copyFromLocalFile(new Path(Utilities.jarFinderGetJar(HiveInputFormat.class)), libDir);
+
+ Path confPath = new Path(tmpDir, "conf");
+ lfs.mkdirs(confPath);
+
+ for (String f : neededConfig) {
+ // they will be file:// URLs
+ lfs.copyFromLocalFile(new Path(conf.getResource(f).toString()), confPath);
+ }
+
+ lfs.copyFromLocalFile(new Path(logger.toString()), confPath);
+
+ // extract configs for processing by the python fragments in Slider
+ JSONObject configs = new JSONObject();
+
+ configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname,
+ HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE));
+
+ configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT.varname,
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT));
+
+ configs.put(LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, conf.getInt(
+ LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB,
+ LlapDaemonConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT));
+
+ configs.put(LlapDaemonConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE, conf.getInt(
+ LlapDaemonConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE,
+ LlapDaemonConfiguration.LLAP_DAEMON_VCPUS_PER_INSTANCE_DEFAULT));
+
+ configs.put(LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS, conf.getInt(
+ LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS,
+ LlapDaemonConfiguration.LLAP_DAEMON_NUM_EXECUTORS_DEFAULT));
+
+ configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1));
+
+ configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1));
+
+ FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json"));
+ OutputStreamWriter w = new OutputStreamWriter(os);
+ configs.write(w);
+ w.close();
+ os.close();
+
+ lfs.close();
+ fs.close();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exiting successfully");
+ }
+
+ return 0;
+ }
+}
diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java
index 47c9472..34c04e7 100644
--- a/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java
+++ b/llap-server/src/java/org/apache/tez/dag/app/rm/DaemonTaskSchedulerService.java
@@ -14,6 +14,7 @@
package org.apache.tez.dag.app.rm;
+import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
@@ -29,6 +30,7 @@
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -37,8 +39,12 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.hive.llap.daemon.LlapDaemonConfiguration;
import org.apache.tez.dag.app.AppContext;
@@ -70,6 +76,8 @@
// Per Executor Thread
private final Resource resourcePerExecutor;
+ // TODO: replace with service registry
+ private final YarnClient yc = YarnClient.createYarnClient();
public DaemonTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext,
String clientHostname, int clientPort, String trackingUrl,
@@ -102,27 +110,69 @@ public DaemonTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext
String[] hosts = conf.getTrimmedStrings(LlapDaemonConfiguration.LLAP_DAEMON_AM_SERVICE_HOSTS);
if (hosts == null || hosts.length == 0) {
hosts = new String[]{"localhost"};
- }
- for (String host : hosts) {
- serviceHosts.add(host);
- serviceHostSet.add(host);
+ serviceHosts.add("localhost");
+ serviceHostSet.add("localhost");
+ } else if (!hosts[0].equals("*")) {
+ for (String host : hosts) {
+ serviceHosts.add(host);
+ serviceHostSet.add(host);
+ }
}
- LOG.info("Running with configuration: " +
- "memoryPerInstance=" + memoryPerInstance +
- ", vcoresPerInstance=" + coresPerInstance +
- ", executorsPerInstance=" + executorsPerInstance +
- ", resourcePerInstanceInferred=" + resourcePerExecutor +
- ", hosts=" + serviceHosts.toString());
+ if (serviceHosts.size() > 0) {
+ LOG.info("Running with configuration: " +
+ "memoryPerInstance=" + memoryPerInstance +
+ ", vcoresPerInstance=" + coresPerInstance +
+ ", executorsPerInstance=" + executorsPerInstance +
+ ", resourcePerInstanceInferred=" + resourcePerExecutor +
+ ", hosts=" + serviceHosts.toString());
+ } else {
+ LOG.info("Running with configuration: " +
+ "memoryPerInstance=" + memoryPerInstance +
+ ", vcoresPerInstance=" + coresPerInstance +
+ ", executorsPerInstance=" + executorsPerInstance +
+ ", resourcePerInstanceInferred=" + resourcePerExecutor +
+ ", hosts=");
+ }
}
@Override
public void serviceInit(Configuration conf) {
+ yc.init(conf);
}
+
@Override
public void serviceStart() {
+ yc.start();
+ if (serviceHosts.size() > 0) {
+ return;
+ }
+ LOG.info("Evaluating host usage criteria for service nodes");
+ try {
+ List nodes = yc.getNodeReports(NodeState.RUNNING);
+ for (NodeReport nd : nodes) {
+ Resource used = nd.getUsed();
+ LOG.info("Examining node: " + nd);
+ if (nd.getNodeState() == NodeState.RUNNING
+ && used.getMemory() >= memoryPerInstance) {
+ // TODO: fix this with YARN registry
+ serviceHosts.add(nd.getNodeId().getHost());
+ serviceHostSet.add(nd.getNodeId().getHost());
+ }
+ }
+ LOG.info("Re-inited with configuration: " +
+ "memoryPerInstance=" + memoryPerInstance +
+ ", vcoresPerInstance=" + coresPerInstance +
+ ", executorsPerInstance=" + executorsPerInstance +
+ ", resourcePerInstanceInferred=" + resourcePerExecutor +
+ ", hosts="+ serviceHosts.toString());
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (YarnException e) {
+ e.printStackTrace();
+ }
}
@Override
diff --git a/llap-server/src/main/resources/llap-daemon-log4j.properties b/llap-server/src/main/resources/llap-daemon-log4j.properties
new file mode 100644
index 0000000..51593e9
--- /dev/null
+++ b/llap-server/src/main/resources/llap-daemon-log4j.properties
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define some default values that can be overridden by system properties
+llap.daemon.root.logger=INFO,console
+llap.daemon.log.dir=.
+llap.daemon.log.file=llapdaemon.log
+
+# Define the root logger to the system property "llap.daemon.root.logger".
+log4j.rootLogger=${llap.daemon.root.logger}
+
+# Logging Threshold
+log4j.threshold=ALL
+
+# Null Appender
+log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender
+
+#
+# Rolling File Appender - cap space usage at 5gb.
+#
+llap.daemon.log.maxfilesize=256MB
+llap.daemon.log.maxbackupindex=20
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.File=${llap.daemon.log.dir}/${llap.daemon.log.file}
+
+log4j.appender.RFA.MaxFileSize=${llap.daemon.log.maxfilesize}
+log4j.appender.RFA.MaxBackupIndex=${llap.daemon.log.maxbackupindex}
+
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${llap.daemon.log.dir}/${llap.daemon.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t(%x)] %p %c{2} : %m%n
diff --git a/llap-server/src/main/resources/llap.py b/llap-server/src/main/resources/llap.py
new file mode 100644
index 0000000..6d5bdcd
--- /dev/null
+++ b/llap-server/src/main/resources/llap.py
@@ -0,0 +1,69 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+import os
+import subprocess
+from resource_management import *
+from os.path import dirname
+
+class Llap(Script):
+ def install(self, env):
+ self.install_packages(env)
+ pass
+
+ def configure(self, env):
+ import params
+ env.set_params(params)
+
+ def start(self, env):
+ import params
+ env.set_params(params)
+ os.environ['JAVA_HOME'] = format('{java64_home}')
+ os.environ['LD_LIBRARY_PATH'] = format('{library_path}')
+ # this is the same as TEZ_PREFIX
+ os.environ['LLAP_DAEMON_HOME'] = format('{app_root}')
+ # this is the location where we have the llap server components (shell scripts)
+ os.environ['LLAP_DAEMON_BIN_HOME'] = format('{app_root}/bin')
+ # location containing llap-daemon-site.xml, tez and yarn configuration xmls as well.
+ os.environ['LLAP_DAEMON_CONF_DIR'] = format("{app_root}/conf/")
+ os.environ['LLAP_DAEMON_LOG_DIR'] = format("{app_log_dir}/")
+ os.environ['LLAP_DAEMON_HEAPSIZE'] = format("{memory_val}")
+ os.environ['LLAP_DAEMON_PID_DIR'] = dirname(format("{pid_file}"))
+ print "Debug from LLAP python script"
+ print os.environ['LLAP_DAEMON_CONF_DIR']
+ self.configure(env)
+ location = "bash -x {app_root}/bin/llapDaemon.sh start &> {app_log_dir}/shell.out"
+ process_cmd = format(location)
+
+ subprocess.call(process_cmd, shell=True
+ )
+
+ def stop(self, env):
+ import params
+ env.set_params(params)
+
+ def status(self, env):
+ import params
+ env.set_params(params)
+ check_process_status(params.pid_file)
+
+if __name__ == "__main__":
+ Llap().execute()
diff --git a/llap-server/src/main/resources/package.py b/llap-server/src/main/resources/package.py
new file mode 100644
index 0000000..7424440
--- /dev/null
+++ b/llap-server/src/main/resources/package.py
@@ -0,0 +1,132 @@
+#!/usr/bin/python
+
+import sys,os,stat
+from getopt import getopt
+from json import loads as json_parse
+from os.path import exists, join, relpath
+from time import gmtime, strftime
+import shutil
+import tarfile
+import zipfile
+
+from templates import metainfo, appConfig, resources, runner
+
+class LlapResource(object):
+ def __init__(self, config):
+ self.memory = config["llap.daemon.memory.per.instance.mb"]
+ self.cores = config["llap.daemon.vcpus.per.instance"]
+ # convert to Mb
+ self.cache = config["hive.llap.io.cache.orc.size"] / (1024*1024.0)
+ self.direct = config["hive.llap.io.cache.direct"]
+ self.min_mb = -1
+ self.min_cores = -1
+ # compute heap
+ h = max(1.2*self.memory, self.memory + 256)
+ if (not self.direct):
+ h += self.cache
+ c = max(h*1.2, h + 128)
+ if (self.direct):
+ c += self.cache
+ if self.min_mb > 0:
+ c = c + c%self.min_mb
+ h = c/1.2
+ if self.direct:
+ h = h - self.cache
+ self.container_size = int(c)
+ self.container_cores = self.cores
+ self.heap_size = int(h)
+
+ def __repr__(self):
+ return "" % (self.heap_size, self.container_size)
+
+def zipdir(path, zip, prefix="."):
+ for root, dirs, files in os.walk(path):
+ for file in files:
+ src = join(root, file)
+ dst = src.replace(path, prefix)
+ zip.write(src, dst)
+
+def main(args):
+ opts, args = getopt(args,"n:o:i:",["instances=","output=", "input="])
+ version = os.getenv("HIVE_VERSION")
+ if not version:
+ version = strftime("%d%b%Y", gmtime())
+ home = os.getenv("HIVE_HOME")
+ output = "llap-slider-%(version)s" % ({"version": version})
+ instances=1
+ input = None
+ for k,v in opts:
+ if k in ("--input", "-i"):
+ input = v
+ elif k in ("--output", "-o"):
+ output = v
+ elif k in ("--instances", "-n"):
+ instances = int(v)
+ if not input:
+ print "Cannot find input files"
+ sys.exit(1)
+ return
+ config = json_parse(open(join(input, "config.json")).read())
+ resource = LlapResource(config)
+ vars = {
+ "home" : home,
+ "version" : version,
+ "instances" : instances,
+ "heap" : resource.heap_size,
+ "container.mb" : resource.container_size,
+ "container.cores" : resource.container_cores,
+ "hadoop_home" : os.getenv("HADOOP_HOME"),
+ "java_home" : os.getenv("JAVA_HOME"),
+ "name" : "llap0"
+ }
+
+ if not exists(output):
+ os.makedirs(output)
+
+ src = join(home, "scripts", "llap", "bin")
+ dst = join(input, "bin")
+ if exists(dst):
+ shutil.rmtree(dst)
+ shutil.copytree(src, dst)
+
+
+ # Make the zip package
+ tmp = join(output, "tmp")
+ pkg = join(tmp, "package")
+
+ src = join(home, "scripts", "llap", "slider")
+ dst = join(pkg, "scripts")
+ if exists(dst):
+ shutil.rmtree(dst)
+ shutil.copytree(src, dst)
+
+ with open(join(tmp, "metainfo.xml"),"w") as f:
+ f.write(metainfo % vars)
+
+ os.mkdir(join(pkg, "files"))
+ tarball = tarfile.open(join(pkg, "files", "llap-%s.tar.gz" % version), "w:gz")
+ # recursive add + -C chdir inside
+ tarball.add(input, "")
+ tarball.close()
+
+ zipped = zipfile.ZipFile(join(output, "llap-%s.zip" % version), "w")
+ zipdir(tmp, zipped)
+ zipped.close()
+
+ # cleanup after making zip pkg
+ shutil.rmtree(tmp)
+
+ with open(join(output, "appConfig.json"), "w") as f:
+ f.write(appConfig % vars)
+
+ with open(join(output, "resources.json"), "w") as f:
+ f.write(resources % vars)
+
+ with open(join(output, "run.sh"), "w") as f:
+ f.write(runner % vars)
+ os.chmod(join(output, "run.sh"), 0755)
+
+ print "Prepared %s/run.sh for running LLAP on Slider" % (output)
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/llap-server/src/main/resources/params.py b/llap-server/src/main/resources/params.py
new file mode 100644
index 0000000..8706d04
--- /dev/null
+++ b/llap-server/src/main/resources/params.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management import *
+
+# server configurations
+config = Script.get_config()
+
+app_root = config['configurations']['global']['app_root']
+java64_home = config['hostLevelParams']['java_home']
+pid_file = config['configurations']['global']['pid_file']
+
+additional_cp = config['configurations']['global']['additional_cp']
+app_log_dir = config['configurations']['global']['app_log_dir']
+
+port = config['configurations']['global']['listen_port']
+memory_val = config['configurations']['global']['memory_val']
+library_path = config['configurations']['global']['library_path']
diff --git a/llap-server/src/main/resources/templates.py b/llap-server/src/main/resources/templates.py
new file mode 100644
index 0000000..382ef17
--- /dev/null
+++ b/llap-server/src/main/resources/templates.py
@@ -0,0 +1,116 @@
+metainfo = """
+
+
+
+ 2.0
+
+ LLAP
+ LLAP is a daemon service that works with a cache and works on SQL constructs.
+ %(version)s
+ None
+
+
+ Servers
+
+
+ instances
+ ${LLAP_HOST}:${site.global.listen_port}
+
+
+
+
+
+
+
+ LLAP
+ MASTER
+ Servers-instances
+
+
+ PYTHON
+
+
+
+
+
+
+ any
+
+
+ tarball
+ files/llap-%(version)s.tar.gz
+
+
+
+
+
+
+
+"""
+
+appConfig = """
+{
+ "schema": "http://example.org/specification/v2.0.0",
+ "metadata": {
+ },
+ "global": {
+ "application.def": ".slider/package/LLAP/llap-%(version)s.zip",
+ "java_home": "%(java_home)s",
+ "site.global.app_user": "yarn",
+ "site.global.app_root": "${AGENT_WORK_ROOT}/app/install/",
+ "site.global.additional_cp": "%(hadoop_home)s",
+ "site.global.library_path": "%(hadoop_home)s/lib/native",
+ "site.global.memory_val": "%(heap)d",
+ "site.global.pid_file": "${AGENT_WORK_ROOT}/app/run/llap-daemon.pid"
+ },
+ "components": {
+ "slider-appmaster": {
+ "jvm.heapsize": "1024M"
+ }
+ }
+}
+"""
+
+resources = """
+{
+ "schema" : "http://example.org/specification/v2.0.0",
+ "metadata" : {
+ },
+ "global" : {
+ },
+ "components": {
+ "slider-appmaster": {
+ },
+ "LLAP": {
+ "yarn.role.priority": "1",
+ "yarn.component.instances": "%(instances)d",
+ "yarn.memory": "%(container.mb)d"
+ }
+ }
+}
+"""
+
+runner = """
+#!/bin/bash -e
+
+BASEDIR=$(dirname $0)
+slider stop %(name)s
+slider destroy %(name)s
+slider install-package --name LLAP --package $BASEDIR/llap-%(version)s.zip --replacepkg
+slider create %(name)s --resources $BASEDIR/resources.json --template $BASEDIR/appConfig.json
+"""
diff --git a/packaging/src/main/assembly/bin.xml b/packaging/src/main/assembly/bin.xml
index 260d8a3..a9afb05 100644
--- a/packaging/src/main/assembly/bin.xml
+++ b/packaging/src/main/assembly/bin.xml
@@ -124,6 +124,7 @@
${project.parent.basedir}/bin
hive
+ llap/**/*
metatool
schematool
beeline
@@ -135,6 +136,24 @@
+ 755
+ ${project.parent.basedir}/llap-server/bin
+
+ **/*
+
+ scripts/llap/bin
+
+
+
+ ${project.parent.basedir}/llap-server/src/main/resources/
+
+ **/*.py
+ **/*.xml
+
+ scripts/llap/slider
+
+
+
${project.parent.basedir}/metastore/scripts/upgrade
**/*
@@ -335,6 +354,11 @@
beeline-log4j.properties.template
+ ${project.parent.basedir}/llap-server/src/main/resources/llap-daemon-log4j.properties
+ conf
+ llap-daemon-log4j.properties.template
+
+
${project.parent.basedir}/hcatalog/README.txt
hcatalog/share/doc/hcatalog
diff --git a/pom.xml b/pom.xml
index ca79be1..8f1a22b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,12 +42,12 @@
hwi
jdbc
metastore
- llap-client
- llap-server
odbc
ql
serde
service
+ llap-client
+ llap-server
shims
spark-client
testutils
@@ -108,7 +108,7 @@
3.2.9
1.2
1.4
- 1.4.1
+ 1.9
1.1
3.0.1
2.4
diff --git a/ql/pom.xml b/ql/pom.xml
index 152f8e0..f85662e 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -673,7 +673,6 @@
org.apache.hive:hive-exec
org.apache.hive:hive-serde
org.apache.hive:hive-llap-client
- org.apache.hive:hive-llap-server
com.esotericsoftware.kryo:kryo
com.twitter:parquet-hadoop-bundle
org.apache.thrift:libthrift
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index f2ba3c5..d8fc621 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -40,6 +40,7 @@
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
+import java.net.URLDecoder;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
@@ -55,6 +56,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -188,6 +190,7 @@
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
+import com.google.common.base.Preconditions;
/**
* Utilities.
@@ -239,6 +242,7 @@ private Utilities() {
private static ThreadLocal