diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 07aa2ea..4971707 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -132,6 +132,10 @@ protected HashTableLoader getHashTableLoader(Configuration hconf) { return HashTableLoaderFactory.getLoader(hconf); } + public String getCacheKey() { + return cacheKey; + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { this.hconf = hconf; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryMonitorInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryMonitorInfo.java new file mode 100644 index 0000000..c188a79 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryMonitorInfo.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; + +/** + * Contains information required for memory usage monitoring. + **/ + +public class MemoryMonitorInfo implements Serializable { + + private static final long serialVersionUID = 1L; + + // Variables for LLAP hash table loading memory monitor + private boolean isLlap; + private int executorsPerNode; + private int maxExecutorsOverSubscribeMemory; + private double memoryOverSubscriptionFactor; + private long noConditionalTaskSize; + private long adjustedNoConditionalTaskSize; + private long memoryCheckInterval; + private double hashTableInflationFactor; + private long threshold; + + public MemoryMonitorInfo() { + } + + public MemoryMonitorInfo(boolean isLlap, int executorsPerNode, int maxExecutorsOverSubscribeMemory, + double memoryOverSubscriptionFactor, long noConditionalTaskSize, long adjustedNoConditionalTaskSize, + long memoryCheckInterval, double hashTableInflationFactor) { + this.isLlap = isLlap; + this.executorsPerNode = executorsPerNode; + this.maxExecutorsOverSubscribeMemory = maxExecutorsOverSubscribeMemory; + this.memoryOverSubscriptionFactor = memoryOverSubscriptionFactor; + this.noConditionalTaskSize = noConditionalTaskSize; + this.adjustedNoConditionalTaskSize = adjustedNoConditionalTaskSize; + this.memoryCheckInterval = memoryCheckInterval; + this.hashTableInflationFactor = hashTableInflationFactor; + this.threshold = (long) (hashTableInflationFactor * adjustedNoConditionalTaskSize); + } + + public MemoryMonitorInfo(MemoryMonitorInfo memoryMonitorInfo) { + this.isLlap = memoryMonitorInfo.isLlap; + this.executorsPerNode = memoryMonitorInfo.executorsPerNode; + this.maxExecutorsOverSubscribeMemory = memoryMonitorInfo.maxExecutorsOverSubscribeMemory; + this.memoryOverSubscriptionFactor = memoryMonitorInfo.memoryOverSubscriptionFactor; + this.noConditionalTaskSize = memoryMonitorInfo.noConditionalTaskSize; + this.adjustedNoConditionalTaskSize = memoryMonitorInfo.adjustedNoConditionalTaskSize; + this.memoryCheckInterval = memoryMonitorInfo.memoryCheckInterval; + this.hashTableInflationFactor = memoryMonitorInfo.hashTableInflationFactor; + this.threshold = memoryMonitorInfo.threshold; + } + + public int getExecutorsPerNode() { + return executorsPerNode; + } + + public void setExecutorsPerNode(final int executorsPerNode) { + this.executorsPerNode = executorsPerNode; + } + + public int getMaxExecutorsOverSubscribeMemory() { + return maxExecutorsOverSubscribeMemory; + } + + public void setMaxExecutorsOverSubscribeMemory(final int maxExecutorsOverSubscribeMemory) { + this.maxExecutorsOverSubscribeMemory = maxExecutorsOverSubscribeMemory; + } + + public double getMemoryOverSubscriptionFactor() { + return memoryOverSubscriptionFactor; + } + + public void setMemoryOverSubscriptionFactor(final double memoryOverSubscriptionFactor) { + this.memoryOverSubscriptionFactor = memoryOverSubscriptionFactor; + } + + public long getNoConditionalTaskSize() { + return noConditionalTaskSize; + } + + public void setNoConditionalTaskSize(final long noConditionalTaskSize) { + this.noConditionalTaskSize = noConditionalTaskSize; + } + + public long getAdjustedNoConditionalTaskSize() { + return adjustedNoConditionalTaskSize; + } + + public void setAdjustedNoConditionalTaskSize(final long adjustedNoConditionalTaskSize) { + this.adjustedNoConditionalTaskSize = adjustedNoConditionalTaskSize; + } + + public long getMemoryCheckInterval() { + return memoryCheckInterval; + } + + public void setMemoryCheckInterval(final long memoryCheckInterval) { + this.memoryCheckInterval = memoryCheckInterval; + } + + public double getHashTableInflationFactor() { + return hashTableInflationFactor; + } + + public void setHashTableInflationFactor(final double hashTableInflationFactor) { + this.hashTableInflationFactor = hashTableInflationFactor; + } + + public long getThreshold() { + return threshold; + } + + public void setLlap(final boolean llap) { + isLlap = llap; + } + + public boolean isLlap() { + return isLlap; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + sb.append(" isLlap: ").append(isLlap); + sb.append(" executorsPerNode: ").append(executorsPerNode); + sb.append(" maxExecutorsOverSubscribeMemory: ").append(maxExecutorsOverSubscribeMemory); + sb.append(" memoryOverSubscriptionFactor: ").append(memoryOverSubscriptionFactor); + sb.append(" memoryCheckInterval: ").append(memoryCheckInterval); + sb.append(" noConditionalTaskSize: ").append(noConditionalTaskSize); + sb.append(" adjustedNoConditionalTaskSize: ").append(adjustedNoConditionalTaskSize); + sb.append(" hashTableInflationFactor: ").append(hashTableInflationFactor); + sb.append(" threshold: ").append(threshold); + sb.append(" }"); + return sb.toString(); + } + + public boolean doMemoryMonitoring() { + return isLlap && hashTableInflationFactor > 0.0d && noConditionalTaskSize > 0 && + memoryCheckInterval > 0; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 7011d23..b2608dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; +import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +65,7 @@ private Configuration hconf; private MapJoinDesc desc; private TezContext tezContext; + private String cacheKey; @Override public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf, @@ -70,6 +73,7 @@ public void init(ExecMapperContext context, MapredContext mrContext, Configurati this.tezContext = (TezContext) mrContext; this.hconf = hconf; this.desc = joinOp.getConf(); + this.cacheKey = joinOp.getCacheKey(); } @Override @@ -147,25 +151,39 @@ public void load(MapJoinTableContainer[] mapJoinTables, } nwayConf.setNumberOfPartitions(numPartitions); } - final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); - final long memoryCheckInterval = HiveConf.getLongVar(hconf, - HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); - final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); - long numEntries = 0; - long noCondTaskSize = desc.getNoConditionalTaskSize(); - boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0; + MemoryMonitorInfo memoryMonitorInfo = desc.getMemoryMonitorInfo(); + boolean doMemCheck = false; + long effectiveThreshold = 0; + if (memoryMonitorInfo != null) { + final long threshold = memoryMonitorInfo.getThreshold(); + // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory + // available for container/executor + effectiveThreshold = (long) Math.max(threshold, (2.0 / 3.0) * desc.getMaxMemoryAvailable()); + + // hash table loading happens in server side, LlapDecider could kick out some fragments to run outside of LLAP. + // Flip the flag at runtime in case if we are running outside of LLAP + if (!LlapDaemonInfo.INSTANCE.isLlap()) { + memoryMonitorInfo.setLlap(false); + } + if (memoryMonitorInfo.doMemoryMonitoring()) { + doMemCheck = true; + if (LOG.isInfoEnabled()) { + LOG.info("Memory monitoring for hash table loader enabled. {}", memoryMonitorInfo); + } + } + } + if (!doMemCheck) { - LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " + - "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval); - } else { - LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ", - noCondTaskSize, inflationFactor); + if (LOG.isInfoEnabled()) { + LOG.info("Not doing hash table memory monitoring. {}", memoryMonitorInfo); + } } for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; } + long numEntries = 0; String inputName = parentToInput.get(pos); LogicalInput input = tezContext.getInput(inputName); @@ -219,36 +237,38 @@ public void load(MapJoinTableContainer[] mapJoinTables, tableContainer = new HashMapWrapper(hconf, keyCount); } - LOG.info("Using tableContainer: " + tableContainer.getClass().getSimpleName()); + LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {}", inputName, + cacheKey, tableContainer.getClass().getSimpleName(), pos); tableContainer.setSerde(keyCtx, valCtx); while (kvReader.next()) { tableContainer.putRow((Writable) kvReader.getCurrentKey(), (Writable) kvReader.getCurrentValue()); numEntries++; - if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) { + if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) { final long estMemUsage = tableContainer.getEstimatedMemorySize(); - final long threshold = (long) (inflationFactor * noCondTaskSize); - // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory - // available for container/executor - final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable()); if (estMemUsage > effectiveThreshold) { - String msg = "Hash table loading exceeded memory limits." + - " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize + - " inflationFactor: " + inflationFactor + " threshold: " + threshold + - " effectiveThreshold: " + effectiveThreshold; + String msg = "Hash table loading exceeded memory limits for input: " + inputName + + " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage + + " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo; LOG.error(msg); throw new MapJoinMemoryExhaustionError(msg); } else { if (LOG.isInfoEnabled()) { - LOG.info("Checking hash table loader memory usage.. numEntries: {} estimatedMemoryUsage: {} " + - "effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold); + LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " + + "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage, + effectiveThreshold); } } } } tableContainer.seal(); - LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos); mapJoinTables[pos] = tableContainer; + if (doMemCheck) { + LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} estimatedMemoryUsage: {}", + inputName, cacheKey, numEntries, tableContainer.getEstimatedMemorySize()); + } else { + LOG.info("Finished loading hash table for input: {} cacheKey: {}", inputName, cacheKey); + } } catch (Exception e) { throw new HiveException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java index b015e43..787f569 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java @@ -21,6 +21,8 @@ import java.util.Collections; import java.util.Map; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; +import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +53,7 @@ private Configuration hconf; protected MapJoinDesc desc; private TezContext tezContext; + private String cacheKey; @Override public void init(ExecMapperContext context, MapredContext mrContext, @@ -58,6 +61,7 @@ public void init(ExecMapperContext context, MapredContext mrContext, this.tezContext = (TezContext) mrContext; this.hconf = hconf; this.desc = joinOp.getConf(); + this.cacheKey = joinOp.getCacheKey(); } @Override @@ -68,26 +72,39 @@ public void load(MapJoinTableContainer[] mapJoinTables, Map parentToInput = desc.getParentToInput(); Map parentKeyCounts = desc.getParentKeyCounts(); - final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); - final long memoryCheckInterval = HiveConf.getLongVar(hconf, - HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); - final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); - long numEntries = 0; - long noCondTaskSize = desc.getNoConditionalTaskSize(); - boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0; - if (!doMemCheck) { - LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " + - "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval); - } else { - LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ", - noCondTaskSize, inflationFactor); + MemoryMonitorInfo memoryMonitorInfo = desc.getMemoryMonitorInfo(); + boolean doMemCheck = false; + long effectiveThreshold = 0; + if (memoryMonitorInfo != null) { + final long threshold = memoryMonitorInfo.getThreshold(); + // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory + // available for container/executor + effectiveThreshold = (long) Math.max(threshold, (2.0 / 3.0) * desc.getMaxMemoryAvailable()); + + // hash table loading happens in server side, LlapDecider could kick out some fragments to run outside of LLAP. + // Flip the flag at runtime in case if we are running outside of LLAP + if (!LlapDaemonInfo.INSTANCE.isLlap()) { + memoryMonitorInfo.setLlap(false); + } + if (memoryMonitorInfo.doMemoryMonitoring()) { + doMemCheck = true; + if (LOG.isInfoEnabled()) { + LOG.info("Memory monitoring for hash table loader enabled. {}", memoryMonitorInfo); + } + } } + if (!doMemCheck) { + if (LOG.isInfoEnabled()) { + LOG.info("Not doing hash table memory monitoring. {}", memoryMonitorInfo); + } + } for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; } + long numEntries = 0; String inputName = parentToInput.get(pos); LogicalInput input = tezContext.getInput(inputName); @@ -108,41 +125,41 @@ public void load(MapJoinTableContainer[] mapJoinTables, VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer = new VectorMapJoinFastTableContainer(desc, hconf, keyCount); - LOG.info("Using vectorMapJoinFastTableContainer: " + vectorMapJoinFastTableContainer.getClass().getSimpleName()); + LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {}", inputName, + cacheKey, vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos); vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here. while (kvReader.next()) { vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(), (BytesWritable)kvReader.getCurrentValue()); numEntries++; - if (doMemCheck && numEntries >= memoryCheckInterval) { - if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) { + if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) { final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize(); - final long threshold = (long) (inflationFactor * noCondTaskSize); - // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory - // available for container/executor - final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable()); if (estMemUsage > effectiveThreshold) { - String msg = "VectorMapJoin Hash table loading exceeded memory limits." + - " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize + - " inflationFactor: " + inflationFactor + " threshold: " + threshold + - " effectiveThreshold: " + effectiveThreshold; + String msg = "Hash table loading exceeded memory limits for input: " + inputName + + " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage + + " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo; LOG.error(msg); throw new MapJoinMemoryExhaustionError(msg); } else { if (LOG.isInfoEnabled()) { - LOG.info("Checking vector mapjoin hash table loader memory usage.. numEntries: {} " + - "estimatedMemoryUsage: {} effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold); + LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " + + "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage, + effectiveThreshold); } } - } } } vectorMapJoinFastTableContainer.seal(); mapJoinTables[pos] = vectorMapJoinFastTableContainer; - LOG.info("Finished loading hashtable using " + vectorMapJoinFastTableContainer.getClass() + - ". Small table position: " + pos); + if (doMemCheck) { + LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} " + + "estimatedMemoryUsage: {}", inputName, cacheKey, numEntries, + vectorMapJoinFastTableContainer.getEstimatedMemorySize()); + } else { + LOG.info("Finished loading vector hash table for input: {} cacheKey: {}", inputName, cacheKey); + } } catch (IOException e) { throw new HiveException(e); } catch (SerDeException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index d0fdb52..5205bea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo; import org.apache.hadoop.hive.ql.exec.MuxOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -97,10 +98,9 @@ JoinOperator joinOp = (JoinOperator) nd; long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); - // adjust noconditional task size threshold for LLAP - maxSize = getNoConditionalTaskSizeForLlap(maxSize, context.conf); - joinOp.getConf().setNoConditionalTaskSize(maxSize); + MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfoForLlap(maxSize, context.conf); + joinOp.getConf().setMemoryMonitorInfo(memoryMonitorInfo); TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { @@ -172,7 +172,7 @@ } @VisibleForTesting - public long getNoConditionalTaskSizeForLlap(final long maxSize, final HiveConf conf) { + public MemoryMonitorInfo getMemoryMonitorInfoForLlap(final long maxSize, final HiveConf conf) { if ("llap".equalsIgnoreCase(conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) { LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(conf); llapInfo.initClusterInfo(); @@ -190,24 +190,23 @@ public long getNoConditionalTaskSizeForLlap(final long maxSize, final HiveConf c executorsPerNode = numExecutorsPerNodeFromCluster; } } - final int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); - if (numSessions > 0) { - final int availableSlotsPerQuery = (int) ((double) executorsPerNode / numSessions); - final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); - final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY); - final int slotsPerQuery = Math.min(maxSlotsPerQuery, availableSlotsPerQuery); - final long llapMaxSize = (long) (maxSize + (maxSize * overSubscriptionFactor * slotsPerQuery)); - LOG.info("No conditional task size adjusted for LLAP. executorsPerNode: {}, numSessions: {}, " + - "availableSlotsPerQuery: {}, overSubscriptionFactor: {}, maxSlotsPerQuery: {}, slotsPerQuery: {}, " + - "noconditionalTaskSize: {}, adjustedNoconditionalTaskSize: {}", executorsPerNode, numSessions, - availableSlotsPerQuery, overSubscriptionFactor, maxSlotsPerQuery, slotsPerQuery, maxSize, llapMaxSize); - return Math.max(maxSize, llapMaxSize); - } else { - LOG.warn(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname + " returned value {}. Returning {}" + - " as no conditional task size for LLAP.", numSessions, maxSize); + final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); + final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY); + // bounded by max executors + final int slotsPerQuery = Math.min(maxSlotsPerQuery, executorsPerNode); + final long llapMaxSize = (long) (maxSize + (maxSize * overSubscriptionFactor * slotsPerQuery)); + // prevents under subscription + final long adjustedMaxSize = Math.max(maxSize, llapMaxSize); + final float inflationFactor = conf.getFloatVar(ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); + final long memoryCheckInterval = conf.getLongVar(ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); + MemoryMonitorInfo memoryMonitorInfo = new MemoryMonitorInfo(true, executorsPerNode, maxSlotsPerQuery, + overSubscriptionFactor, maxSize, adjustedMaxSize, memoryCheckInterval, inflationFactor); + if (LOG.isInfoEnabled()) { + LOG.info("Adjusted no-conditional task size for LLAP. {}", memoryMonitorInfo); } + return memoryMonitorInfo; } - return maxSize; + return null; } @SuppressWarnings("unchecked") @@ -281,7 +280,7 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont joinOp.getConf().getBaseSrc(), joinOp).getSecond(), null, joinDesc.getExprs(), null, null, joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), - joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null, joinDesc.getNoConditionalTaskSize()); + joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null, joinDesc.getMemoryMonitorInfo()); mapJoinDesc.setNullSafes(joinDesc.getNullSafes()); mapJoinDesc.setFilterMap(joinDesc.getFilterMap()); mapJoinDesc.setResidualFilterExprs(joinDesc.getResidualFilterExprs()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index 85d46f3..b099386 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -432,7 +432,7 @@ public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf, smbJoinDesc.getOutputColumnNames(), bigTablePos, smbJoinDesc.getConds(), smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix(), - smbJoinDesc.getNoConditionalTaskSize()); + smbJoinDesc.getMemoryMonitorInfo()); mapJoinDesc.setStatistics(smbJoinDesc.getStatistics()); @@ -1185,7 +1185,7 @@ public static MapJoinDesc getMapJoinDesc(HiveConf hconf, MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op - .getConf().getNoOuterJoin(), dumpFilePrefix, op.getConf().getNoConditionalTaskSize()); + .getConf().getNoOuterJoin(), dumpFilePrefix, op.getConf().getMemoryMonitorInfo()); mapJoinDescriptor.setStatistics(op.getConf().getStatistics()); mapJoinDescriptor.setTagOrder(tagOrder); mapJoinDescriptor.setNullSafes(desc.getNullSafes()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index b9b600d..471675b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -1003,7 +1003,7 @@ private static JoinOperator genJoin(RelNode join, ExprNodeDesc[][] joinExpressio // 4. We create the join operator with its descriptor JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, joinCondns, - filters, joinExpressions, 0); + filters, joinExpressions, null); desc.setReversedExprs(reversedExprs); desc.setFilterMap(filterMap); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index f78bd7c..94607e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -282,7 +282,7 @@ public static void processSkewJoin(JoinOperator joinOp, newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc,joinDescriptor .getOutputColumnNames(), i, joinDescriptor.getConds(), joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix, - joinDescriptor.getNoConditionalTaskSize()); + joinDescriptor.getMemoryMonitorInfo()); mapJoinDescriptor.setTagOrder(tags); mapJoinDescriptor.setHandleSkewJoin(false); mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index c970611..3847cd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -241,7 +241,7 @@ public static void processSkewJoin(JoinOperator joinOp, Task streamAliases; // non-transient field, used at runtime to kill a task if it exceeded memory limits when running in LLAP - protected long noConditionalTaskSize; + protected MemoryMonitorInfo memoryMonitorInfo; public JoinDesc() { } @@ -116,14 +117,14 @@ public JoinDesc() { public JoinDesc(final Map> exprs, List outputColumnNames, final boolean noOuterJoin, final JoinCondDesc[] conds, final Map> filters, - ExprNodeDesc[][] joinKeys, final long noConditionalTaskSize) { + ExprNodeDesc[][] joinKeys, final MemoryMonitorInfo memoryMonitorInfo) { this.exprs = exprs; this.outputColumnNames = outputColumnNames; this.noOuterJoin = noOuterJoin; this.conds = conds; this.filters = filters; this.joinKeys = joinKeys; - this.noConditionalTaskSize = noConditionalTaskSize; + this.memoryMonitorInfo = memoryMonitorInfo; resetOrder(); } @@ -150,7 +151,7 @@ public Object clone() { ret.setHandleSkewJoin(handleSkewJoin); ret.setSkewKeyDefinition(getSkewKeyDefinition()); ret.setTagOrder(getTagOrder().clone()); - ret.setNoConditionalTaskSize(getNoConditionalTaskSize()); + ret.setMemoryMonitorInfo(new MemoryMonitorInfo(getMemoryMonitorInfo())); if (getKeyTableDesc() != null) { ret.setKeyTableDesc((TableDesc) getKeyTableDesc().clone()); } @@ -201,7 +202,7 @@ public JoinDesc(JoinDesc clone) { this.filterMap = clone.filterMap; this.residualFilterExprs = clone.residualFilterExprs; this.statistics = clone.statistics; - this.noConditionalTaskSize = clone.noConditionalTaskSize; + this.memoryMonitorInfo = clone.memoryMonitorInfo; } public Map> getExprs() { @@ -687,13 +688,11 @@ public void cloneQBJoinTreeProps(JoinDesc joinDesc) { streamAliases = joinDesc.streamAliases == null ? null : new ArrayList(joinDesc.streamAliases); } - private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(JoinDesc.class); - - public long getNoConditionalTaskSize() { - return noConditionalTaskSize; + public MemoryMonitorInfo getMemoryMonitorInfo() { + return memoryMonitorInfo; } - public void setNoConditionalTaskSize(final long noConditionalTaskSize) { - this.noConditionalTaskSize = noConditionalTaskSize; + public void setMemoryMonitorInfo(final MemoryMonitorInfo memoryMonitorInfo) { + this.memoryMonitorInfo = memoryMonitorInfo; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 8da85d2..50bf48e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; @@ -113,8 +114,8 @@ public MapJoinDesc(final Map> keys, final List valueTblDescs, final List valueFilteredTblDescs, List outputColumnNames, final int posBigTable, final JoinCondDesc[] conds, final Map> filters, boolean noOuterJoin, String dumpFilePrefix, - final long noConditionalTaskSize) { - super(values, outputColumnNames, noOuterJoin, conds, filters, null, noConditionalTaskSize); + final MemoryMonitorInfo memoryMonitorInfo) { + super(values, outputColumnNames, noOuterJoin, conds, filters, null, memoryMonitorInfo); vectorDesc = null; this.keys = keys; this.keyTblDesc = keyTblDesc; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index 0287ff2..f4ba846 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -446,9 +446,8 @@ public void testNoConditionalTaskSizeForLlap() { long defaultNoConditionalTaskSize = 1024L * 1024L * 1024L; HiveConf hiveConf = new HiveConf(); - // execution mode not set, default is returned - long gotSize = convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf); - assertEquals(defaultNoConditionalTaskSize, gotSize); + // execution mode not set, null is returned + assertNull(convertJoinMapJoin.getMemoryMonitorInfoForLlap(defaultNoConditionalTaskSize, hiveConf)); hiveConf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap"); // default executors is 4, max slots is 3. so 3 * 20% of noconditional task size will be oversubscribed @@ -457,7 +456,8 @@ public void testNoConditionalTaskSizeForLlap() { int maxSlots = 3; long expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * maxSlots)); assertEquals(expectedSize, - convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); + convertJoinMapJoin.getMemoryMonitorInfoForLlap(defaultNoConditionalTaskSize, hiveConf) + .getAdjustedNoConditionalTaskSize()); // num executors is less than max executors per query (which is not expected case), default executors will be // chosen. 4 * 20% of noconditional task size will be oversubscribed @@ -465,40 +465,18 @@ public void testNoConditionalTaskSizeForLlap() { hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "5"); expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * chosenSlots)); assertEquals(expectedSize, - convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); - - // 2 concurrent sessions, 4 executors. 2 * 20% of noconditional task size will be oversubscribed - hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname); - hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "2"); - expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 2)); - assertEquals(expectedSize, - convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); - - // 4 concurrent sessions, 4 executors. 1 * 20% of noconditional task size will be oversubscribed - hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname); - hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "4"); - expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 1)); - assertEquals(expectedSize, - convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); - - // 8 concurrent sessions, 4 executors. default noconditioanl task will be used (no oversubscription) - hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname); - hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "8"); - assertEquals(defaultNoConditionalTaskSize, - convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); - - // 2 * 120% of noconditional task size will be oversubscribed - hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname); - hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "2"); - hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR.varname, "1.2"); - fraction = hiveConf.getFloatVar(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR); - expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 2)); - assertEquals(expectedSize, - convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); - - // 0 value for number of sessions - hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "0"); - assertEquals(defaultNoConditionalTaskSize, - convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf)); + convertJoinMapJoin.getMemoryMonitorInfoForLlap(defaultNoConditionalTaskSize, hiveConf) + .getAdjustedNoConditionalTaskSize()); + + // disable memory checking + hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "0"); + assertFalse( + convertJoinMapJoin.getMemoryMonitorInfoForLlap(defaultNoConditionalTaskSize, hiveConf).doMemoryMonitoring()); + + // invalid inflation factor + hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "10000"); + hiveConf.set(HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR.varname, "0.0f"); + assertFalse( + convertJoinMapJoin.getMemoryMonitorInfoForLlap(defaultNoConditionalTaskSize, hiveConf).doMemoryMonitoring()); } }