From 42435afb1769401ec5b88d5af2a5bcc0f28de217 Mon Sep 17 00:00:00 2001 From: Gopal V Date: Thu, 30 Mar 2017 10:20:27 +0530 Subject: [PATCH] Use daemonConf values for numExecutors, not client-side conf --- .../apache/hadoop/hive/llap/LlapDaemonInfo.java | 83 ++++++++++++++++++++++ .../hadoop/hive/llap/daemon/impl/LlapDaemon.java | 23 +++--- .../hadoop/hive/ql/exec/GroupByOperator.java | 3 +- .../org/apache/hadoop/hive/ql/exec/TopNHash.java | 3 +- 4 files changed, 100 insertions(+), 12 deletions(-) create mode 100644 llap-common/src/java/org/apache/hadoop/hive/llap/LlapDaemonInfo.java diff --git llap-common/src/java/org/apache/hadoop/hive/llap/LlapDaemonInfo.java llap-common/src/java/org/apache/hadoop/hive/llap/LlapDaemonInfo.java new file mode 100644 index 0000000..15078c5 --- /dev/null +++ llap-common/src/java/org/apache/hadoop/hive/llap/LlapDaemonInfo.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.yarn.api.records.Resource; + +public enum LlapDaemonInfo { + INSTANCE; + + private static final class LlapDaemonInfoHolder { + public LlapDaemonInfoHolder(int numExecutors, long executorMemory, long cacheSize, + boolean isDirectCache, boolean isLlapIo) { + this.numExecutors = numExecutors; + this.executorMemory = executorMemory; + this.cacheSize = cacheSize; + this.isDirectCache = isDirectCache; + this.isLlapIo = isLlapIo; + } + + final int numExecutors; + final long executorMemory; + final long cacheSize; + final boolean isDirectCache; + final boolean isLlapIo; + } + + // add more variables as required + private AtomicReference dataRef = + new AtomicReference(); + + public static void initialize(String appName, Configuration daemonConf) { + int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + long executorMemoryBytes = HiveConf.getIntVar( + daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l; + long ioMemoryBytes = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); + boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT); + boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true); + INSTANCE.dataRef.set(new LlapDaemonInfoHolder(numExecutors, executorMemoryBytes, ioMemoryBytes, + isDirectCache, isLlapIo)); + } + + public int getNumExecutors() { + return dataRef.get().numExecutors; + } + + public long getExecutorMemory() { + return dataRef.get().executorMemory; + } + + public long getMemoryPerExecutor() { + final LlapDaemonInfoHolder data = dataRef.get(); + return (getExecutorMemory() - -(data.isDirectCache ? 0 : data.cacheSize)) / getNumExecutors(); + } + + public long getCacheSize() { + return dataRef.get().cacheSize; + } + + public boolean isDirectCache() { + return dataRef.get().isDirectCache; + } + + public boolean isLlapIo() { + return dataRef.get().isLlapIo; + } + +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 30f9ad4..f679337 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -15,6 +15,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; import org.apache.hadoop.hive.llap.LlapOutputFormatService; + import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryPoolMXBean; @@ -38,6 +39,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; @@ -499,8 +501,6 @@ public static void main(String[] args) throws Exception { nmHost, nmPort); } - int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS); - String workDirsString = System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()); String localDirList = LlapUtil.getDaemonLocalDirString(daemonConf, workDirsString); @@ -511,16 +511,19 @@ public static void main(String[] args) throws Exception { int shufflePort = daemonConf .getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT); int webPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_WEB_PORT); - long executorMemoryBytes = HiveConf.getIntVar( - daemonConf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB) * 1024l * 1024l; - long ioMemoryBytes = HiveConf.getSizeVar(daemonConf, ConfVars.LLAP_IO_MEMORY_MAX_SIZE); - boolean isDirectCache = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_ALLOCATOR_DIRECT); - boolean isLlapIo = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_ENABLED, true); + + LlapDaemonInfo.initialize(appName, daemonConf); + + int numExecutors = LlapDaemonInfo.INSTANCE.getNumExecutors(); + long executorMemoryBytes = LlapDaemonInfo.INSTANCE.getExecutorMemory(); + long ioMemoryBytes = LlapDaemonInfo.INSTANCE.getCacheSize(); + boolean isDirectCache = LlapDaemonInfo.INSTANCE.isDirectCache(); + boolean isLlapIo = LlapDaemonInfo.INSTANCE.isLlapIo(); LlapDaemon.initializeLogging(daemonConf); - llapDaemon = new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo, - isDirectCache, ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort, - appName); + llapDaemon = + new LlapDaemon(daemonConf, numExecutors, executorMemoryBytes, isLlapIo, isDirectCache, + ioMemoryBytes, localDirs, rpcPort, mngPort, shufflePort, webPort, appName); LOG.info("Adding shutdown hook for LlapDaemon"); ShutdownHookManager.addShutdownHook(new CompositeServiceShutdownHook(llapDaemon), 1); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 6d6c608..0d6034a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.OpParseContext; @@ -403,7 +404,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { newKeys = keyWrapperFactory.getKeyWrapper(); isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"); isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap"); - numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1; + numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1; firstRow = true; // estimate the number of hash table entries based on the size of each // entry. Since the size of a entry diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java index f3c7c77..e47c6fb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -106,7 +107,7 @@ public void initialize( final boolean isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"); final boolean isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap"); - final int numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1; + final int numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1; // Used Memory = totalMemory() - freeMemory(); // Total Free Memory = maxMemory() - Used Memory; -- 2.4.0