diff --git llap-common/src/java/org/apache/hadoop/hive/llap/LlapDaemonInfo.java llap-common/src/java/org/apache/hadoop/hive/llap/LlapDaemonInfo.java new file mode 100644 index 0000000..fa29b59 --- /dev/null +++ llap-common/src/java/org/apache/hadoop/hive/llap/LlapDaemonInfo.java @@ -0,0 +1,92 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.yarn.api.records.Resource; + +public enum LlapDaemonInfo { + INSTANCE; + + private static final class LlapDaemonInfoHolder { + public LlapDaemonInfoHolder(int numExecutors, long executorMemory, long cacheSize, + boolean isDirectCache, boolean isLlapIo) { + this.numExecutors = numExecutors; + this.executorMemory = executorMemory; + this.cacheSize = cacheSize; + this.isDirectCache = isDirectCache; + this.isLlapIo = isLlapIo; + } + + final int numExecutors; + final long executorMemory; + final long cacheSize; + final boolean isDirectCache; + final boolean isLlapIo; + } + + // add more variables as required + private AtomicReference dataRef = + new AtomicReference(); + + public static void initialize(String appName, Configuration daemonConf) { + int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + long executorMemoryBytes = + HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l; + long ioMemoryBytes = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); + boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT); + boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true); + initialize(appName, numExecutors, executorMemoryBytes, ioMemoryBytes, isDirectCache, isLlapIo); + } + + public static void initialize(String appName, int numExecutors, long executorMemoryBytes, + long ioMemoryBytes, boolean isDirectCache, boolean isLlapIo) { + INSTANCE.dataRef.set(new LlapDaemonInfoHolder(numExecutors, executorMemoryBytes, ioMemoryBytes, + isDirectCache, isLlapIo)); + } + + public boolean isLlap() { + return dataRef.get() != null; + } + + public int getNumExecutors() { + return dataRef.get().numExecutors; + } + + public long getExecutorMemory() { + return dataRef.get().executorMemory; + } + + public long getMemoryPerExecutor() { + final LlapDaemonInfoHolder data = dataRef.get(); + return (getExecutorMemory() - -(data.isDirectCache ? 0 : data.cacheSize)) / getNumExecutors(); + } + + public long getCacheSize() { + return dataRef.get().cacheSize; + } + + public boolean isDirectCache() { + return dataRef.get().isDirectCache; + } + + public boolean isLlapIo() { + return dataRef.get().isLlapIo; + } + +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 30f9ad4..f679337 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -15,6 +15,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; import org.apache.hadoop.hive.llap.LlapOutputFormatService; + import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryPoolMXBean; @@ -38,6 +39,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; @@ -499,8 +501,6 @@ public static void main(String[] args) throws Exception { nmHost, nmPort); } - int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); - String workDirsString = System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()); String localDirList = LlapUtil.getDaemonLocalDirString(daemonConf, workDirsString); @@ -511,16 +511,19 @@ public static void main(String[] args) throws Exception { int shufflePort = daemonConf .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT); - long executorMemoryBytes = HiveConf.getIntVar( - daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l; - long ioMemoryBytes = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); - boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT); - boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true); + + LlapDaemonInfo.initialize(appName, daemonConf); + + int numExecutors = LlapDaemonInfo.INSTANCE.getNumExecutors(); + long executorMemoryBytes = LlapDaemonInfo.INSTANCE.getExecutorMemory(); + long ioMemoryBytes = LlapDaemonInfo.INSTANCE.getCacheSize(); + boolean isDirectCache = LlapDaemonInfo.INSTANCE.isDirectCache(); + boolean isLlapIo = LlapDaemonInfo.INSTANCE.isLlapIo(); LlapDaemon.initializeLogging(daemonConf); - llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo, - isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort, - appName); + llapDaemon = + new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo, isDirectCache, + ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort, appName); LOG.info("Adding shutdown hook for LlapDaemon"); ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index 06f6dac..6f1305e 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -15,6 +15,7 @@ package org.apache.hadoop.hive.llap.daemon; import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; @@ -26,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.service.AbstractService; @@ -118,6 +120,9 @@ private MiniLlapCluster(String clusterName, @Nullable MiniZooKeeperCluster miniZ this.llapIoEnabled = llapIoEnabled; this.ioBytesPerService = ioBytesPerService; + LlapDaemonInfo.initialize("mini-llap-cluster", numExecutorsPerService, execMemoryPerService, + ioBytesPerService, ioIsDirect, llapIoEnabled); + // Setup Local Dirs localDirs = new String[numLocalDirs]; for (int i = 0 ; i < numLocalDirs ; i++) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 6d6c608..f8b55da 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.OpParseContext; @@ -402,8 +403,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { newKeys = keyWrapperFactory.getKeyWrapper(); isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"); - isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap"); - numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1; + isLlap = LlapDaemonInfo.INSTANCE.isLlap(); + numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1; firstRow = true; // estimate the number of hash table entries based on the size of each // entry. Since the size of a entry diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java index f3c7c77..48ae02f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -105,8 +106,8 @@ public void initialize( } final boolean isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"); - final boolean isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap"); - final int numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1; + final boolean isLlap = LlapDaemonInfo.INSTANCE.isLlap(); + final int numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1; // Used Memory = totalMemory() - freeMemory(); // Total Free Memory = maxMemory() - Used Memory;