diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aa58d74..d920606 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3637,9 +3637,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "For example, arithmetic expressions which can overflow the output data type can be evaluated using\n" + " checked vector expressions so that they produce same result as non-vectorized evaluation."), HIVE_VECTORIZED_ADAPTOR_SUPPRESS_EVALUATE_EXCEPTIONS( - "hive.vectorized.adaptor.suppress.evaluate.exceptions", false, + "hive.vectorized.adaptor.suppress.evaluate.exceptions", false, "This flag should be set to true to suppress HiveException from the generic UDF function\n" + - "evaluate call and turn them into NULLs. Assume, by default, this is not needed"), + "evaluate call and turn them into NULLs. Assume, by default, this is not needed"), HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED( "hive.vectorized.input.format.supports.enabled", "decimal_64", @@ -4075,7 +4075,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR("hive.llap.mapjoin.memory.oversubscribe.factor", 0.2f, "Fraction of memory from hive.auto.convert.join.noconditionaltask.size that can be over subscribed\n" + "by queries running in LLAP mode. This factor has to be from 0.0 to 1.0. Default is 20% over subscription.\n"), - LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY("hive.llap.memory.oversubscription.max.executors.per.query", 3, + LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY("hive.llap.memory.oversubscription.max.executors.per.query", + -1, "Used along with hive.llap.mapjoin.memory.oversubscribe.factor to limit the number of executors from\n" + "which memory for mapjoin can be borrowed. Default 3 (from 3 other executors\n" + "hive.llap.mapjoin.memory.oversubscribe.factor amount of memory can be borrowed based on which mapjoin\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index cd952a2..0cfecaa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -86,6 +86,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { private static final Logger LOG = LoggerFactory.getLogger(ConvertJoinMapJoin.class.getName()); + private static final int DEFAULT_MAX_EXECUTORS_PER_QUERY_CONTAINER_MODE = 3; public float hashTableLoadFactor; private long maxJoinMemory; @@ -298,7 +299,7 @@ public MemoryMonitorInfo getMemoryMonitorInfo( LlapClusterStateForCompile llapInfo) { long maxSize = conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); - final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY); + final int maxSlotsPerQuery = getMaxSlotsPerQuery(conf, llapInfo); final long memoryCheckInterval = conf.getLongVar(ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); final float inflationFactor = conf.getFloatVar(ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); final MemoryMonitorInfo memoryMonitorInfo; @@ -336,6 +337,18 @@ public MemoryMonitorInfo getMemoryMonitorInfo( return memoryMonitorInfo; } + private int getMaxSlotsPerQuery(HiveConf conf, LlapClusterStateForCompile llapInfo) { + int maxExecutorsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY); + if (maxExecutorsPerQuery == -1) { + if (llapInfo == null) { + maxExecutorsPerQuery = DEFAULT_MAX_EXECUTORS_PER_QUERY_CONTAINER_MODE; + } else { + maxExecutorsPerQuery = Math.min(Math.max(1, llapInfo.getNumExecutorsPerNode() / 3), 8); + } + } + return maxExecutorsPerQuery; + } + @SuppressWarnings("unchecked") private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index fe64bf5..abf7198 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -63,6 +63,8 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; import junit.framework.TestCase; @@ -461,6 +463,7 @@ public void testNoConditionalTaskSizeForLlap() { // default executors is 4, max slots is 3. so 3 * 20% of noconditional task size will be oversubscribed hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR.varname, "0.2"); + hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "3"); double fraction = hiveConf.getFloatVar(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); int maxSlots = 3; long expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * maxSlots)); @@ -488,4 +491,32 @@ public void testNoConditionalTaskSizeForLlap() { assertFalse( convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).doMemoryMonitoring()); } + + @Test + public void testLlapMemoryOversubscriptionMaxExecutorsPerQueryCalculation() { + ConvertJoinMapJoin convertJoinMapJoin = new ConvertJoinMapJoin(); + HiveConf hiveConf = new HiveConf(); + + LlapClusterStateForCompile llapInfo = Mockito.mock(LlapClusterStateForCompile.class); + + when(llapInfo.getNumExecutorsPerNode()).thenReturn(1); + assertEquals(1, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).getMaxExecutorsOverSubscribeMemory()); + assertEquals(3, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, null).getMaxExecutorsOverSubscribeMemory()); + + when(llapInfo.getNumExecutorsPerNode()).thenReturn(6); + assertEquals(2, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).getMaxExecutorsOverSubscribeMemory()); + + when(llapInfo.getNumExecutorsPerNode()).thenReturn(30); + assertEquals(8, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).getMaxExecutorsOverSubscribeMemory()); + + hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "5"); + assertEquals(5, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, llapInfo).getMaxExecutorsOverSubscribeMemory()); + assertEquals(5, + convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, null).getMaxExecutorsOverSubscribeMemory()); + } } diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q index 4b2cad8..ec32e9c 100644 --- a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q +++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q @@ -7,6 +7,7 @@ set hive.explain.user=false; set hive.auto.convert.join=true; set hive.auto.convert.join.noconditionaltask=true; set hive.auto.convert.join.noconditionaltask.size=30000; +set hive.llap.memory.oversubscription.max.executors.per.query=3; CREATE TABLE srcbucket_mapjoin_n18(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; CREATE TABLE tab_part_n11 (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; @@ -157,3 +158,5 @@ FROM my_fact JOIN my_dim ON my_fact.join_col = my_dim.join_col WHERE my_fact.fiscal_year = '2015' AND my_dim.filter_col IN ( 'VAL1', 'VAL2' ) and my_fact.accounting_period in (10); + +reset hive.llap.memory.oversubscription.max.executors.per.query; diff --git a/ql/src/test/queries/clientpositive/join32_lessSize.q b/ql/src/test/queries/clientpositive/join32_lessSize.q index fcadbe3..7e6d222 100644 --- a/ql/src/test/queries/clientpositive/join32_lessSize.q +++ b/ql/src/test/queries/clientpositive/join32_lessSize.q @@ -10,6 +10,7 @@ CREATE TABLE dest_j2_n1(key STRING, value STRING, val2 STRING) STORED AS TEXTFIL set hive.auto.convert.join=true; set hive.auto.convert.join.noconditionaltask=true; set hive.auto.convert.join.noconditionaltask.size=4000; +set hive.llap.memory.oversubscription.max.executors.per.query=3; -- Since the inputs are small, it should be automatically converted to mapjoin @@ -92,3 +93,5 @@ FROM (select x.key, x.value from src1 x JOIN src y ON (x.key = y.key)) res JOIN srcpart y ON (res.value = y.value and y.ds='2008-04-08' and y.hr=11); select * from dest_j2_n1; + +reset hive.llap.memory.oversubscription.max.executors.per.query; diff --git a/ql/src/test/queries/clientpositive/orc_llap.q b/ql/src/test/queries/clientpositive/orc_llap.q index e487480..4ebc741 100644 --- a/ql/src/test/queries/clientpositive/orc_llap.q +++ b/ql/src/test/queries/clientpositive/orc_llap.q @@ -9,6 +9,7 @@ SET hive.exec.orc.default.buffer.size=32768; SET hive.exec.orc.default.row.index.stride=1000; SET hive.optimize.index.filter=true; set hive.auto.convert.join=false; +set hive.llap.memory.oversubscription.max.executors.per.query=3; DROP TABLE cross_numbers; DROP TABLE orc_llap; @@ -115,3 +116,5 @@ select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner DROP TABLE cross_numbers; DROP TABLE orc_llap; DROP TABLE orc_llap_small; + +reset hive.llap.memory.oversubscription.max.executors.per.query;