diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 771f588..cd2f2ee 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3143,6 +3143,17 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4, "Number of executors to use in LLAP daemon; essentially, the number of tasks that can be\n" + "executed in parallel.", "llap.daemon.num.executors"), + LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR("hive.llap.mapjoin.memory.oversubscribe.factor", 0.2f, + new RangeValidator(0.0f, 1.0f), + "Fraction of memory from hive.auto.convert.join.noconditionaltask.size that can be over subscribed\n" + + "by queries running in LLAP mode. This factor has to be from 0.0 to 1.0. Default is 20% over subscription.\n"), + LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY("hive.llap.memory.oversubscription.max.executors.per.query", 3, + new RangeValidator(0, Integer.MAX_VALUE), + "Used along with hive.llap.mapjoin.memory.oversubscribe.factor to limit the number of executors from\n" + + "which memory for mapjoin can be borrowed. Default 3 (from 3 other executors\n" + + "hive.llap.mapjoin.memory.oversubscribe.factor amount of memory can be borrowed based on which mapjoin\n" + + "conversion decision will be made). This is only an upper bound. Lower bound is determined by number of\n" + + "executors and configured max concurrency."), LLAP_DAEMON_AM_REPORTER_MAX_THREADS("hive.llap.daemon.am-reporter.max.threads", 4, "Maximum number of threads to be used for AM reporter. If this is lower than number of\n" + "executors in llap daemon, it would be set to number of executors at runtime.", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 637bc54..7b2491f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile; import org.apache.hadoop.hive.ql.parse.GenTezUtils; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.ParseContext; @@ -95,15 +96,48 @@ JoinOperator joinOp = (JoinOperator) nd; long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + // adjust noconditional task size threshold for LLAP + if ("llap".equalsIgnoreCase(context.conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_MODE))) { + LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(context.conf); + llapInfo.initClusterInfo(); + final int executorsPerNode; + if (!llapInfo.hasClusterInfo()) { + LOG.warn("LLAP cluster information not available. Falling back to getting #executors from hiveconf.."); + executorsPerNode = context.conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + } else { + final int numExecutorsPerNodeFromCluster = llapInfo.getNumExecutorsPerNode(); + if (numExecutorsPerNodeFromCluster == -1) { + LOG.warn("Cannot determine executor count from LLAP cluster information. Falling back to getting #executors" + + " from hiveconf.."); + executorsPerNode = context.conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS); + } else { + executorsPerNode = numExecutorsPerNodeFromCluster; + } + } + final int numSessions = context.conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); + final int availableSlotsPerQuery = (int) ((double) executorsPerNode / numSessions); + final double overSubscriptionFactor = context.conf.getFloatVar(HiveConf.ConfVars + .LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); + final int maxSlotsPerQuery = context.conf.getIntVar(ConfVars + .LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY); + final int slotsPerQuery = Math.min(maxSlotsPerQuery, availableSlotsPerQuery); + final long llapMaxSize = (long) (maxSize + (maxSize * overSubscriptionFactor * slotsPerQuery)); + LOG.info("No conditional task size adjusted for LLAP. executorsPerNode: {}, numSessions: {}, " + + "availableSlotsPerQuery: {}, overSubscriptionFactor: {}, maxSlotsPerQuery: {}, slotsPerQuery: {}, " + + "noconditionalTaskSize: {}, adjustedNoconditionalTaskSize: {}", executorsPerNode, numSessions, + availableSlotsPerQuery, overSubscriptionFactor, maxSlotsPerQuery, slotsPerQuery, maxSize, llapMaxSize); + maxSize = Math.max(maxSize, llapMaxSize); + } + TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx); + Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize); if (retval == null) { return retval; } else { - fallbackToReduceSideJoin(joinOp, context); + fallbackToReduceSideJoin(joinOp, context, maxSize); return null; } } @@ -120,13 +154,13 @@ LOG.info("Estimated number of buckets " + numBuckets); int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize, true); if (mapJoinConversionPos < 0) { - Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx); + Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize); if (retval == null) { return retval; } else { // only case is full outer join with SMB enabled which is not possible. Convert to regular // join. - fallbackToReduceSideJoin(joinOp, context); + fallbackToReduceSideJoin(joinOp, context, maxSize); return null; } } @@ -147,7 +181,7 @@ if (mapJoinConversionPos < 0) { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - fallbackToReduceSideJoin(joinOp, context); + fallbackToReduceSideJoin(joinOp, context, maxSize); return null; } @@ -166,13 +200,13 @@ @SuppressWarnings("unchecked") private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp, - TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException { + TezBucketJoinProcCtx tezBucketJoinProcCtx, final long maxSize) throws SemanticException { // we cannot convert to bucket map join, we cannot convert to // map join either based on the size. Check if we can convert to SMB join. if ((HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) || ((!HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN_REDUCE)) && joinOp.getOpTraits().getNumReduceSinks() >= 2)) { - fallbackToReduceSideJoin(joinOp, context); + fallbackToReduceSideJoin(joinOp, context, maxSize); return null; } Class bigTableMatcherClass = null; @@ -201,7 +235,7 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat // contains aliases from sub-query // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - fallbackToReduceSideJoin(joinOp, context); + fallbackToReduceSideJoin(joinOp, context, maxSize); return null; } @@ -211,7 +245,7 @@ private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperat } else { // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - fallbackToReduceSideJoin(joinOp, context); + fallbackToReduceSideJoin(joinOp, context, maxSize); } return null; } @@ -928,15 +962,14 @@ private static int estimateNumBuckets(JoinOperator joinOp, boolean useOpTraits) return numBuckets; } - private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context) + private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context, + final long maxSize) throws SemanticException { // Attempt dynamic partitioned hash join // Since we don't have big table index yet, must start with estimate of numReducers int numReducers = estimateNumBuckets(joinOp, false); LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers"); - int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, - context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD), - false); + int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, maxSize,false); if (bigTablePos >= 0) { // Now that we have the big table index, get real numReducers value based on big table RS ReduceSinkOperator bigTableParentRS = @@ -971,11 +1004,11 @@ private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, Optim return false; } - private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context) + private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context, final long maxSize) throws SemanticException { if (context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) && context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN)) { - if (convertJoinDynamicPartitionedHashJoin(joinOp, context)) { + if (convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize)) { return; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java index b2e8614..a5ed308 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java @@ -41,6 +41,7 @@ private static final long CLUSTER_UPDATE_INTERVAL_NS = 120 * 1000000000L; // 2 minutes. private Long lastClusterUpdateNs; private Integer noConfigNodeCount, executorCount; + private int numExecutorsPerNode = -1; private LlapRegistryService svc; private final Configuration conf; @@ -82,6 +83,10 @@ public int getNodeCountWithUnknownExecutors() { return noConfigNodeCount; } + public int getNumExecutorsPerNode() { + return numExecutorsPerNode; + } + public synchronized void initClusterInfo() { if (lastClusterUpdateNs != null) { long elapsed = System.nanoTime() - lastClusterUpdateNs; @@ -111,7 +116,11 @@ public synchronized void initClusterInfo() { continue; } try { - executorsLocal += Integer.parseInt(props.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname)); + int numExecutors = Integer.parseInt(props.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname)); + executorsLocal += numExecutors; + if (numExecutorsPerNode == -1) { + numExecutorsPerNode = numExecutors; + } } catch (NumberFormatException e) { ++noConfigNodesLocal; }