diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aa58d74..f2ed7e4 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4075,7 +4075,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR("hive.llap.mapjoin.memory.oversubscribe.factor", 0.2f, "Fraction of memory from hive.auto.convert.join.noconditionaltask.size that can be over subscribed\n" + "by queries running in LLAP mode. This factor has to be from 0.0 to 1.0. Default is 20% over subscription.\n"), - LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY("hive.llap.memory.oversubscription.max.executors.per.query", 3, + LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY("hive.llap.memory.oversubscription.max.executors.per.query", + -1, "Used along with hive.llap.mapjoin.memory.oversubscribe.factor to limit the number of executors from\n" + "which memory for mapjoin can be borrowed. Default 3 (from 3 other executors\n" + "hive.llap.mapjoin.memory.oversubscribe.factor amount of memory can be borrowed based on which mapjoin\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index cd952a2..0cfecaa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -86,6 +86,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { private static final Logger LOG = LoggerFactory.getLogger(ConvertJoinMapJoin.class.getName()); + private static final int DEFAULT_MAX_EXECUTORS_PER_QUERY_CONTAINER_MODE = 3; public float hashTableLoadFactor; private long maxJoinMemory; @@ -298,7 +299,7 @@ public MemoryMonitorInfo getMemoryMonitorInfo( LlapClusterStateForCompile llapInfo) { long maxSize = conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); - final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY); + final int maxSlotsPerQuery = getMaxSlotsPerQuery(conf, llapInfo); final long memoryCheckInterval = conf.getLongVar(ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); final float inflationFactor = conf.getFloatVar(ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); final MemoryMonitorInfo memoryMonitorInfo; @@ -336,6 +337,18 @@ public MemoryMonitorInfo getMemoryMonitorInfo( return memoryMonitorInfo; } + private int getMaxSlotsPerQuery(HiveConf conf, LlapClusterStateForCompile llapInfo) { + int maxExecutorsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY); + if (maxExecutorsPerQuery == -1) { + if (llapInfo == null) { + maxExecutorsPerQuery = DEFAULT_MAX_EXECUTORS_PER_QUERY_CONTAINER_MODE; + } else { + maxExecutorsPerQuery = Math.min(Math.max(1, llapInfo.getNumExecutorsPerNode() / 3), 8); + } + } + return maxExecutorsPerQuery; + } + @SuppressWarnings("unchecked") private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index fe64bf5..abf7198 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -63,6 +63,8 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; import junit.framework.TestCase; @@ -461,6 +463,7 @@ public void testNoConditionalTaskSizeForLlap() { // default executors is 4, max slots is 3. so 3 * 20% of noconditional task size will be oversubscribed hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR.varname, "0.2"); + hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "3"); double fraction = hiveConf.getFloatVar(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); int maxSlots = 3; long expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * maxSlots)); @@ -488,4 +491,32 @@ public void testNoConditionalTaskSizeForLlap() { assertFalse( convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).doMemoryMonitoring()); } + + @Test + public void testLlapMemoryOversubscriptionMaxExecutorsPerQueryCalculation() { + ConvertJoinMapJoin convertJoinMapJoin = new ConvertJoinMapJoin(); + HiveConf hiveConf = new HiveConf(); + + LlapClusterStateForCompile llapInfo = Mockito.mock(LlapClusterStateForCompile.class); + + when(llapInfo.getNumExecutorsPerNode()).thenReturn(1); + assertEquals(1, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).getMaxExecutorsOverSubscribeMemory()); + assertEquals(3, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, null).getMaxExecutorsOverSubscribeMemory()); + + when(llapInfo.getNumExecutorsPerNode()).thenReturn(6); + assertEquals(2, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).getMaxExecutorsOverSubscribeMemory()); + + when(llapInfo.getNumExecutorsPerNode()).thenReturn(30); + assertEquals(8, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).getMaxExecutorsOverSubscribeMemory()); + + hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "5"); + assertEquals(5, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).getMaxExecutorsOverSubscribeMemory()); + assertEquals(5, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, null).getMaxExecutorsOverSubscribeMemory()); + } }