diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9064e49..828dc06 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -380,6 +380,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_RPC_PORT.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS.varname); @@ -2988,9 +2989,14 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "executed in parallel.", "llap.daemon.num.executors"), LLAP_DAEMON_RPC_PORT("hive.llap.daemon.rpc.port", 0, "The LLAP daemon RPC port.", "llap.daemon.rpc.port. A value of 0 indicates a dynamic port"), - LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 3276, + LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 4096, "The total amount of memory to use for the executors inside LLAP (in megabytes).", "llap.daemon.memory.per.instance.mb"), + LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.headroom.memory.per.instance.mb", 512, + "The total amount of memory deducted from daemon memory required for other LLAP services. The remaining memory" + + " will be used by the executors. If the cache is off-heap, Executor memory + Headroom memory = Xmx. If the " + + "cache is on-heap, Executor memory + Cache memory + Headroom memory = Xmx. The headroom memory has to be " + + "minimum of 5% from the daemon memory."), LLAP_DAEMON_VCPUS_PER_INSTANCE("hive.llap.daemon.vcpus.per.instance", 4, "The total number of vcpus to use for the executors inside LLAP.", "llap.daemon.vcpus.per.instance"), diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java index 17913f0..aa69752 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java @@ -221,4 +221,14 @@ public static String getAmHostNameFromAddress(InetSocketAddress address, Configu // getCanonicalHostName would either return FQDN, or an IP. return (ia == null) ? address.getHostName() : ia.getCanonicalHostName(); } + + public static String humanReadableByteCount(long bytes) { + int unit = 1024; + if (bytes < unit) { + return bytes + "B"; + } + int exp = (int) (Math.log(bytes) / Math.log(unit)); + String suffix = "KMGTPE".charAt(exp-1) + ""; + return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix); + } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java index dfd2f7b..169be22 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -43,8 +43,10 @@ import java.util.concurrent.Future; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants; +import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; @@ -236,8 +238,8 @@ private void run(String[] args) throws Exception { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false) { // direct heap allocations need to be safer Preconditions.checkArgument(options.getCache() < options.getSize(), "Cache size (" - + humanReadableByteCount(options.getCache()) + ") has to be smaller" - + " than the container sizing (" + humanReadableByteCount(options.getSize()) + ")"); + + LlapUtil.humanReadableByteCount(options.getCache()) + ") has to be smaller" + + " than the container sizing (" + LlapUtil.humanReadableByteCount(options.getSize()) + ")"); } else if (options.getCache() < options.getSize()) { LOG.warn("Note that this might need YARN physical memory monitoring to be turned off " + "(yarn.nodemanager.pmem-check-enabled=false)"); @@ -245,18 +247,17 @@ private void run(String[] args) throws Exception { } if (options.getXmx() != -1) { Preconditions.checkArgument(options.getXmx() < options.getSize(), "Working memory (Xmx=" - + humanReadableByteCount(options.getXmx()) + ") has to be" - + " smaller than the container sizing (" + humanReadableByteCount(options.getSize()) + + LlapUtil.humanReadableByteCount(options.getXmx()) + ") has to be" + + " smaller than the container sizing (" + LlapUtil.humanReadableByteCount(options.getSize()) + ")"); } if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT) && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) { // direct and not memory mapped - Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(), - "Working memory + cache (Xmx=" + humanReadableByteCount(options.getXmx()) - + " + cache=" + humanReadableByteCount(options.getCache()) + ")" - + " has to be smaller than the container sizing (" - + humanReadableByteCount(options.getSize()) + ")"); + Preconditions.checkArgument(options.getXmx() + options.getCache() <= options.getSize(), + "Working memory (Xmx=" + LlapUtil.humanReadableByteCount(options.getXmx()) + ") + cache size (" + + LlapUtil.humanReadableByteCount(options.getCache()) + ") has to be smaller than the container sizing (" + + LlapUtil.humanReadableByteCount(options.getSize()) + ")"); } } @@ -267,8 +268,8 @@ private void run(String[] args) throws Exception { if (options.getSize() != -1) { containerSize = options.getSize() / (1024 * 1024); Preconditions.checkArgument(containerSize >= minAlloc, "Container size (" - + humanReadableByteCount(options.getSize()) + ") should be greater" - + " than minimum allocation(" + humanReadableByteCount(minAlloc * 1024L * 1024L) + ")"); + + LlapUtil.humanReadableByteCount(options.getSize()) + ") should be greater" + + " than minimum allocation(" + LlapUtil.humanReadableByteCount(minAlloc * 1024L * 1024L) + ")"); conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize); propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, String.valueOf(containerSize)); @@ -300,12 +301,24 @@ private void run(String[] args) throws Exception { // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor fraction // from this, to get actual usable memory before it goes into GC xmx = options.getXmx(); - long xmxMb = (long) (xmx / (1024 * 1024)); + long xmxMb = (xmx / (1024L * 1024L)); conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb); propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, String.valueOf(xmxMb)); } + final long currentHeadRoom = options.getSize() - options.getXmx() - options.getCache(); + final long minHeadRoom = (long) (options.getXmx() * LlapDaemon.MIN_HEADROOM_PERCENT); + final long headRoom = currentHeadRoom < minHeadRoom ? minHeadRoom : currentHeadRoom; + final long headRoomMb = headRoom / (1024L * 1024L); + conf.setLong(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname, headRoomMb); + propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname, + String.valueOf(headRoomMb)); + + LOG.info("Memory settings: container memory: {} executor memory: {} cache memory: {} headroom memory: {}", + LlapUtil.humanReadableByteCount(options.getSize()), LlapUtil.humanReadableByteCount(options.getXmx()), + LlapUtil.humanReadableByteCount(options.getCache()), LlapUtil.humanReadableByteCount(headRoom)); + if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) { conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName()); propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, @@ -551,6 +564,9 @@ public Void call() throws Exception { configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB)); + configs.put(ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB.varname, + HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB)); + configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname, HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE)); @@ -699,14 +715,4 @@ private void copyConfig(FileSystem lfs, Path confPath, String f) throws IOExcept // they will be file:// URLs lfs.copyFromLocalFile(new Path(conf.getResource(f).toString()), confPath); } - - private String humanReadableByteCount(long bytes) { - int unit = 1024; - if (bytes < unit) { - return bytes + "B"; - } - int exp = (int) (Math.log(bytes) / Math.log(unit)); - String suffix = "KMGTPE".charAt(exp-1) + ""; - return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix); - } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index b7e05d3..789641e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker; import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ExitUtil; @@ -84,6 +83,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, LlapDaemonMXBean { private static final Logger LOG = LoggerFactory.getLogger(LlapDaemon.class); + public static final double MIN_HEADROOM_PERCENT = 0.05; private final Configuration shuffleHandlerConf; private final SecretManager secretManager; @@ -113,8 +113,8 @@ private final AtomicReference shufflePort = new AtomicReference<>(); public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemoryBytes, - boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort, - int mngPort, int shufflePort, int webPort, String appName) { + boolean ioEnabled, boolean isDirectCache, long ioMemoryBytes, String[] localDirs, int srvPort, + int mngPort, int shufflePort, int webPort, String appName, final long headRoomBytes) { super("LlapDaemon"); printAsciiArt(); @@ -158,28 +158,37 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor this.maxJvmMemory = getTotalHeapSize(); this.llapIoEnabled = ioEnabled; - this.executorMemoryPerInstance = executorMemoryBytes; + Preconditions.checkArgument(headRoomBytes < executorMemoryBytes, "LLAP daemon headroom size should be less " + + "than daemon max memory size. headRoomBytes: " + headRoomBytes + " executorMemoryBytes: " + executorMemoryBytes); + final long minHeadRoomBytes = (long) (executorMemoryBytes * MIN_HEADROOM_PERCENT); + final long headroom = headRoomBytes < minHeadRoomBytes ? minHeadRoomBytes : headRoomBytes; + this.executorMemoryPerInstance = executorMemoryBytes - headroom; this.ioMemoryPerInstance = ioMemoryBytes; this.numExecutors = numExecutors; this.localDirs = localDirs; + int waitQueueSize = HiveConf.getIntVar( daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE); boolean enablePreemption = HiveConf.getBoolVar( daemonConf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION); LOG.warn("Attempting to start LlapDaemonConf with the following configuration: " + - "numExecutors=" + numExecutors + + "maxJvmMemory=" + maxJvmMemory + " (" + LlapUtil.humanReadableByteCount(maxJvmMemory) + ")" + + ", requestedExecutorMemory=" + executorMemoryBytes + + " (" + LlapUtil.humanReadableByteCount(executorMemoryBytes) + ")" + + ", llapIoCacheSize=" + ioMemoryBytes + " (" + LlapUtil.humanReadableByteCount(ioMemoryBytes) + ")" + + ", headRoomMemory=" + headroom + " (" + LlapUtil.humanReadableByteCount(headroom) + ")" + + ", adjustedExecutorMemory=" + executorMemoryPerInstance + + " (" + LlapUtil.humanReadableByteCount(executorMemoryPerInstance) + ")" + + ", numExecutors=" + numExecutors + + ", llapIoEnabled=" + ioEnabled + + ", llapIoCacheIsDirect=" + isDirectCache + ", rpcListenerPort=" + srvPort + ", mngListenerPort=" + mngPort + ", webPort=" + webPort + ", outputFormatSvcPort=" + outputFormatServicePort + ", workDirs=" + Arrays.toString(localDirs) + ", shufflePort=" + shufflePort + - ", executorMemory=" + executorMemoryBytes + - ", llapIoEnabled=" + ioEnabled + - ", llapIoCacheIsDirect=" + isDirectCache + - ", llapIoCacheSize=" + ioMemoryBytes + - ", jvmAvailableMemory=" + maxJvmMemory + ", waitQueueSize= " + waitQueueSize + ", enablePreemption= " + enablePreemption); @@ -187,9 +196,8 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor executorMemoryBytes + (ioEnabled && isDirectCache == false ? ioMemoryBytes : 0); // TODO: this check is somewhat bogus as the maxJvmMemory != Xmx parameters (see annotation in LlapServiceDriver) Preconditions.checkState(maxJvmMemory >= memRequired, - "Invalid configuration. Xmx value too small. maxAvailable=" + maxJvmMemory + - ", configured(exec + io if enabled)=" + - memRequired); + "Invalid configuration. Xmx value too small. maxAvailable=" + LlapUtil.humanReadableByteCount(maxJvmMemory) + + ", configured(exec + io if enabled)=" + LlapUtil.humanReadableByteCount(memRequired)); this.shuffleHandlerConf = new Configuration(daemonConf); this.shuffleHandlerConf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, shufflePort); @@ -238,7 +246,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor } this.metrics = LlapDaemonExecutorMetrics.create(displayName, sessionId, numExecutors, Ints.toArray(intervalList)); - this.metrics.setMemoryPerInstance(executorMemoryBytes); + this.metrics.setMemoryPerInstance(executorMemoryPerInstance); this.metrics.setCacheMemoryPerInstance(ioMemoryBytes); this.metrics.setJvmMaxMemory(maxJvmMemory); this.metrics.setWaitQueueSize(waitQueueSize); @@ -264,7 +272,7 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor throw new RuntimeException(e); } this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize, - enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryBytes, metrics, + enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryPerInstance, metrics, amReporter, executorClassLoader, daemonId, fsUgiFactory); addIfService(containerRunner); @@ -452,14 +460,15 @@ public static void main(String[] args) throws Exception { int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT); long executorMemoryBytes = HiveConf.getIntVar( daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l; - + long headroomBytes = HiveConf.getIntVar( + daemonConf, ConfVars.LLAP_DAEMON_HEADROOM_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l; long ioMemoryBytes = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT); boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true); LlapDaemon.initializeLogging(daemonConf); llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo, isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort, - appName); + appName, headroomBytes); LOG.info("Adding shutdown hook for LlapDaemon"); ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1); diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index e394191..41ce035 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -196,7 +196,7 @@ public void serviceInit(Configuration conf) throws IOException, InterruptedExcep LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); for (int i = 0 ;i < numInstances ; i++) { llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, - ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed); + ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort, clusterNameTrimmed, 0); llapDaemons[i].init(new Configuration(conf)); } LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed);