diff --git llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java index b57ae80..dd24661 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java @@ -44,6 +44,9 @@ public LlapConfiguration() { public static final String LLAP_DAEMON_YARN_SHUFFLE_PORT = LLAP_DAEMON_PREFIX + "yarn.shuffle.port"; public static final int LLAP_DAEMON_YARN_SHUFFLE_PORT_DEFAULT = 15551; + public static final String LLAP_DAEMON_YARN_CONTAINER_MB = LLAP_DAEMON_PREFIX + "yarn.container.mb"; + public static final int LLAP_DAEMON_YARN_CONTAINER_MB_DEFAULT = -1; + public static final String LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED = LLAP_DAEMON_PREFIX + "shuffle.dir-watcher.enabled"; public static final boolean LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT = false; diff --git llap-server/bin/runLlapDaemon.sh llap-server/bin/runLlapDaemon.sh index dd59439..bd14fe8 100755 --- llap-server/bin/runLlapDaemon.sh +++ llap-server/bin/runLlapDaemon.sh @@ -48,7 +48,7 @@ shift JAVA=$JAVA_HOME/bin/java LOG_LEVEL_DEFAULT="INFO,console" -JAVA_OPTS_BASE="-server -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps" +JAVA_OPTS_BASE="-server -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps" # CLASSPATH initially contains $HADOOP_CONF_DIR & $YARN_CONF_DIR if [ ! -d "$HADOOP_CONF_DIR" ]; then diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java index 67a099a..211cd8b 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java @@ -29,6 +29,7 @@ import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; public class LlapOptionsProcessor { @@ -36,9 +37,13 @@ private int instances = 0; private String directory = null; private String name; + private int executors; + private long cache; + private long size; + private long xmx; - public LlapOptions(String name, int instances, String directory) - throws ParseException { + public LlapOptions(String name, int instances, String directory, int executors, long cache, + long size, long xmx) throws ParseException { if (instances <= 0) { throw new ParseException("Invalid configuration: " + instances + " (should be greater than 0)"); @@ -46,6 +51,10 @@ public LlapOptions(String name, int instances, String directory) this.instances = instances; this.directory = directory; this.name = name; + this.executors = executors; + this.cache = cache; + this.size = size; + this.xmx = xmx; } public String getName() { @@ -59,6 +68,22 @@ public int getInstances() { public String getDirectory() { return directory; } + + public int getExecutors() { + return executors; + } + + public long getCache() { + return cache; + } + + public long getSize() { + return size; + } + + public long getXmx() { + return xmx; + } } protected static final Log l4j = LogFactory.getLog(LlapOptionsProcessor.class.getName()); @@ -88,10 +113,26 @@ public LlapOptionsProcessor() { options.addOption(OptionBuilder.hasArg().withArgName("chaosmonkey").withLongOpt("chaosmonkey") .withDescription("chaosmonkey interval").create('m')); + options.addOption(OptionBuilder.hasArg().withArgName("executors").withLongOpt("executors") + .withDescription("executor per instance").create('e')); + + options.addOption(OptionBuilder.hasArg().withArgName("cache").withLongOpt("cache") + .withDescription("cache size per instance").create('c')); + + options.addOption(OptionBuilder.hasArg().withArgName("size").withLongOpt("size") + .withDescription("container size per instance").create('s')); + + options.addOption(OptionBuilder.hasArg().withArgName("xmx").withLongOpt("xmx") + .withDescription("working memory size").create('w')); + // [-H|--help] options.addOption(new Option("H", "help", false, "Print help information")); } + private static long parseSuffixed(String value) { + return StringUtils.TraditionalBinaryPrefix.string2long(value); + } + public LlapOptions processOptions(String argv[]) throws ParseException { commandLine = new GnuParser().parse(options, argv); if (commandLine.hasOption('H') || false == commandLine.hasOption("instances")) { @@ -104,9 +145,15 @@ public LlapOptions processOptions(String argv[]) throws ParseException { String directory = commandLine.getOptionValue("directory"); String name = commandLine.getOptionValue("name", null); + + int executors = Integer.parseInt(commandLine.getOptionValue("executors", "-1")); + long cache = parseSuffixed(commandLine.getOptionValue("cache", "-1")); + long size = parseSuffixed(commandLine.getOptionValue("size", "-1")); + long xmx = parseSuffixed(commandLine.getOptionValue("xmx", "-1")); + // loglevel, chaosmonkey & args are parsed by the python processor - return new LlapOptions(name, instances, directory); + return new LlapOptions(name, instances, directory, executors, cache, size, xmx); } private void printUsage() { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java index 2ee3dd8..b3d155b 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -41,6 +41,8 @@ import org.json.JSONArray; import org.json.JSONObject; +import com.google.common.base.Preconditions; + public class LlapServiceDriver { protected static final Log LOG = LogFactory.getLog(LlapServiceDriver.class.getName()); @@ -119,11 +121,53 @@ private int run(String[] args) throws Exception { conf.reloadConfiguration(); if (options.getName() != null) { - // update service registry configs - caveat: this has nothing to do with the actual settings as read by the AM - // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between instances + // update service registry configs - caveat: this has nothing to do with the actual settings + // as read by the AM + // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between + // instances conf.set(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, "@" + options.getName()); } + if (options.getSize() != -1) { + if (options.getCache() != -1) { + Preconditions.checkArgument(options.getCache() < options.getSize(), + "Cache has to be smaller than the container sizing"); + } + if (options.getXmx() != -1) { + Preconditions.checkArgument(options.getXmx() < options.getSize(), + "Working memory has to be smaller than the container sizing"); + } + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT)) { + Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(), + "Working memory + cache has to be smaller than the containing sizing "); + } + } + + final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1); + if (options.getSize() != -1) { + final long containerSize = options.getSize() / (1024 * 1024); + Preconditions.checkArgument(containerSize >= minAlloc, + "Container size should be greater than minimum allocation(%s)", minAlloc + "m"); + conf.setLong(LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB, containerSize); + } + + if (options.getExecutors() != -1) { + conf.setLong(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, options.getExecutors()); + // TODO: vcpu settings - possibly when DRFA works right + } + + if (options.getCache() != -1) { + conf.setLong(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, options.getCache()); + } + + if (options.getXmx() != -1) { + // Needs more explanation here + // Xmx is not the max heap value in JDK8 + // You need to subtract 50% of the survivor fraction from this, to get actual usable memory before it goes into GC + conf.setLong(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, (long)(options.getXmx()) + / (1024 * 1024)); + } + URL logger = conf.getResource("llap-daemon-log4j.properties"); if (null == logger) { @@ -182,6 +226,10 @@ private int run(String[] args) throws Exception { // extract configs for processing by the python fragments in Slider JSONObject configs = new JSONObject(); + configs.put(LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB, conf.getInt( + LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB, + LlapConfiguration.LLAP_DAEMON_YARN_CONTAINER_MB_DEFAULT)); + configs.put(HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE.varname, HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE)); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 7b53e63..75377d4 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -15,6 +15,10 @@ package org.apache.hadoop.hive.llap.daemon.impl; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryPoolMXBean; +import java.lang.management.MemoryType; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Set; @@ -48,6 +52,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +82,7 @@ private final AtomicReference address = new AtomicReference(); public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes, - boolean ioEnabled, long ioMemoryBytes, String[] localDirs, int rpcPort, + boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int rpcPort, int shufflePort) { super("LlapDaemon"); @@ -91,7 +96,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && shufflePort < 65536), "Shuffle Port must be betwee 1024 and 65535, or 0 for automatic selection"); - this.maxJvmMemory = Runtime.getRuntime().maxMemory(); + this.maxJvmMemory = getTotalHeapSize(); this.llapIoEnabled = ioEnabled; this.executorMemoryPerInstance = executorMemoryBytes; this.ioMemoryPerInstance = ioMemoryBytes; @@ -111,12 +116,15 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor ", shufflePort=" + shufflePort + ", executorMemory=" + executorMemoryBytes + ", llapIoEnabled=" + ioEnabled + + ", llapIoCacheIsDirect=" + isDirectCache + ", llapIoCacheSize=" + ioMemoryBytes + ", jvmAvailableMemory=" + maxJvmMemory + ", waitQueueSize= " + waitQueueSize + ", enablePreemption= " + enablePreemption); - long memRequired = executorMemoryBytes + (ioEnabled ? ioMemoryBytes : 0); + long memRequired = + executorMemoryBytes + (ioEnabled && isDirectCache == false ? ioMemoryBytes : 0); + // TODO: this check is somewhat bogus as the maxJvmMemory != Xmx parameters (see annotation in LlapServiceDriver) Preconditions.checkState(maxJvmMemory >= memRequired, "Invalid configuration. Xmx value too small. maxAvailable=" + maxJvmMemory + ", configured(exec + io if enabled)=" + @@ -167,6 +175,28 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor addIfService(webServices); } + private long getTotalHeapSize() { + // runtime.getMax() gives a very different number from the actual Xmx sizing. + // you can iterate through the + // http://docs.oracle.com/javase/7/docs/api/java/lang/management/MemoryPoolMXBean.html + // from java.lang.management to figure this out, but the hard-coded params in the llap run.sh + // result in 89% usable heap (-XX:NewRatio=8) + a survivor region which is technically not + // in the usable space. + + long total = 0; + MemoryMXBean m = ManagementFactory.getMemoryMXBean(); + for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) { + long sz = mp.getUsage().getMax(); + if (mp.getName().contains("Survivor")) { + sz *= 2; // there are 2 survivor spaces + } + if (mp.getType().equals(MemoryType.HEAP)) { + total += sz; + } + } + return total; + } + private void printAsciiArt() { final String asciiArt = "" + "$$\\ $$\\ $$$$$$\\ $$$$$$$\\\n" + @@ -239,9 +269,11 @@ public static void main(String[] args) throws Exception { LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT) * 1024l * 1024l; long cacheMemoryBytes = HiveConf.getLongVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_MAX_SIZE); + boolean isDirectCache = + HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_ORC_CACHE_ALLOCATE_DIRECT); boolean llapIoEnabled = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED); llapDaemon = - new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled, + new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, llapIoEnabled, isDirectCache, cacheMemoryBytes, localDirs, rpcPort, shufflePort); diff --git llap-server/src/main/resources/package.py llap-server/src/main/resources/package.py index 39242e2..ae501cf 100644 --- llap-server/src/main/resources/package.py +++ llap-server/src/main/resources/package.py @@ -15,26 +15,27 @@ class LlapResource(object): def __init__(self, config): self.memory = config["llap.daemon.memory.per.instance.mb"] self.cores = config["llap.daemon.vcpus.per.instance"] + size = config["llap.daemon.yarn.container.mb"] # convert to Mb self.cache = config["hive.llap.io.cache.orc.size"] / (1024*1024.0) self.direct = config["hive.llap.io.cache.direct"] self.min_mb = -1 self.min_cores = -1 - # compute heap - h = max(1.2*self.memory, self.memory + 256) + # compute heap + cache as final Xmx + h = self.memory if (not self.direct): h += self.cache - c = max(h*1.2, h + 128) - if (self.direct): - c += self.cache - if self.min_mb > 0: - c = c + c%self.min_mb - h = c/1.2 - if self.direct: - h = h - self.cache - self.container_size = int(c) + if size == -1: + c = min(h*1.2, h + 1024) # + 1024 or 20% + c += (self.direct and self.cache) or 0 + if self.min_mb > 0: + c = c + c%self.min_mb + else: + # do not mess with user input + c = size + self.container_size = size self.container_cores = self.cores - self.heap_size = int(h) + self.heap_size = h def __repr__(self): return "" % (self.heap_size, self.container_size) @@ -47,7 +48,7 @@ def zipdir(path, zip, prefix="."): zip.write(src, dst) def main(args): - opts, args = getopt(args,"",["instances=","output=", "input=","args=","name=","loglevel=","chaosmonkey="]) + opts, args = getopt(args,"",["instances=","output=", "input=","args=","name=","loglevel=","chaosmonkey=","size=","xmx=", "cache=", "executors="]) version = os.getenv("HIVE_VERSION") if not version: version = strftime("%d%b%Y", gmtime()) @@ -144,7 +145,7 @@ def main(args): with open(join(output, "run.sh"), "w") as f: f.write(runner % vars) - os.chmod(join(output, "run.sh"), 0755) + os.chmod(join(output, "run.sh"), 0700) print "Prepared %s/run.sh for running LLAP on Slider" % (output) diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index ac8dcba..543e616 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -38,6 +38,7 @@ private final File testWorkDir; private final long execBytesPerService; private final boolean llapIoEnabled; + private final boolean ioIsDirect; private final long ioBytesPerService; private final int numExecutorsPerService; private final String[] localDirs; @@ -46,15 +47,15 @@ private LlapDaemon llapDaemon; public static MiniLlapCluster create(String clusterName, int numExecutorsPerService, - long execBytePerService, boolean llapIoEnabled, - long ioBytesPerService, int numLocalDirs) { + long execBytePerService, boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, + int numLocalDirs) { return new MiniLlapCluster(clusterName, numExecutorsPerService, execBytePerService, - llapIoEnabled, ioBytesPerService, numLocalDirs); + llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs); } // TODO Add support for multiple instances private MiniLlapCluster(String clusterName, int numExecutorsPerService, long execMemoryPerService, - boolean llapIoEnabled, long ioBytesPerService, int numLocalDirs) { + boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, int numLocalDirs) { super(clusterName + "_" + MiniLlapCluster.class.getSimpleName()); Preconditions.checkArgument(numExecutorsPerService > 0); Preconditions.checkArgument(execMemoryPerService > 0); @@ -105,6 +106,7 @@ private MiniLlapCluster(String clusterName, int numExecutorsPerService, long exe } this.numExecutorsPerService = numExecutorsPerService; this.execBytesPerService = execMemoryPerService; + this.ioIsDirect = ioIsDirect; this.llapIoEnabled = llapIoEnabled; this.ioBytesPerService = ioBytesPerService; @@ -120,8 +122,9 @@ private MiniLlapCluster(String clusterName, int numExecutorsPerService, long exe @Override public void serviceInit(Configuration conf) { - llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, - ioBytesPerService, localDirs, 0, 0); + llapDaemon = + new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, + ioIsDirect, ioBytesPerService, localDirs, 0, 0); llapDaemon.init(conf); }