diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0d8d7ae..0499449 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3433,7 +3433,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "The number of queries allowed in parallel via llap. Negative number implies 'infinite'."), HIVE_TEZ_ENABLE_MEMORY_MANAGER("hive.tez.enable.memory.manager", true, "Enable memory manager for tez"), - HIVE_HASH_TABLE_INFLATION_FACTOR("hive.hash.table.inflation.factor", (float) 2.0, + HIVE_HASH_TABLE_INFLATION_FACTOR("hive.hash.table.inflation.factor", (float) 1.5, "Expected inflation factor between disk/in memory representation of hash tables"), HIVE_LOG_TRACE_ID("hive.log.trace.id", "", "Log tracing id that can be used by upstream clients for tracking respective logs. " + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 5bb9d7e..7982478 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -54,6 +54,8 @@ import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.library.api.KeyValueReader; +import com.sun.management.ThreadMXBean; + /** * HashTableLoader for Tez constructs the hashtable from records read from * a broadcast edge. @@ -66,6 +68,9 @@ private MapJoinDesc desc; private TezContext tezContext; private String cacheKey; + private long startMemAllocBytes = 0; + private ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean(); + private boolean threadAllocSupported = false; @Override public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf, @@ -74,13 +79,21 @@ public void init(ExecMapperContext context, MapredContext mrContext, Configurati this.hconf = hconf; this.desc = joinOp.getConf(); this.cacheKey = joinOp.getCacheKey(); + if (threadMXBean.isThreadAllocatedMemoryEnabled() && threadMXBean.isThreadAllocatedMemorySupported()) { + this.threadAllocSupported = true; + LOG.info("Thread allocated memory supported and enabled"); + } else { + LOG.info("Thread allocated memory is not supported or enabled"); + } } @Override public void load(MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { - + if (threadAllocSupported) { + startMemAllocBytes = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId()); + } Map parentToInput = desc.getParentToInput(); Map parentKeyCounts = desc.getParentKeyCounts(); @@ -238,35 +251,50 @@ public void load(MapJoinTableContainer[] mapJoinTables, cacheKey, tableContainer.getClass().getSimpleName(), pos); tableContainer.setSerde(keyCtx, valCtx); + long estMemUsage; while (kvReader.next()) { tableContainer.putRow((Writable) kvReader.getCurrentKey(), (Writable) kvReader.getCurrentValue()); numEntries++; if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) { - final long estMemUsage = tableContainer.getEstimatedMemorySize(); + if (threadAllocSupported) { + estMemUsage = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId()) - startMemAllocBytes; + } else { + estMemUsage = tableContainer.getEstimatedMemorySize(); + } if (estMemUsage > effectiveThreshold) { String msg = "Hash table loading exceeded memory limits for input: " + inputName + - " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage + - " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo; + " numEntries[" + pos + "]: " + numEntries + + " threadAllocSupported: " + threadAllocSupported + + " estimatedMemoryUsage: " + estMemUsage + + " effectiveThreshold: " + effectiveThreshold + + " memoryMonitorInfo: " + memoryMonitorInfo; LOG.error(msg); throw new MapJoinMemoryExhaustionError(msg); } else { if (LOG.isInfoEnabled()) { - LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " + - "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage, - effectiveThreshold); + LOG.info("Checking hash table loader memory usage for input: {} numEntries[{}]: {} " + + "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, pos, numEntries, + estMemUsage, effectiveThreshold); } } } } - tableContainer.seal(); - mapJoinTables[pos] = tableContainer; + if (doMemCheck) { - LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} estimatedMemoryUsage: {}", - inputName, cacheKey, numEntries, tableContainer.getEstimatedMemorySize()); + // did not go into mem monitoring loop once + if (threadAllocSupported) { + estMemUsage = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId()) - startMemAllocBytes; + } else { + estMemUsage = tableContainer.getEstimatedMemorySize(); + } + LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries[{}]: {} " + + "estimatedMemoryUsage: {} ", inputName, cacheKey, pos, numEntries, estMemUsage); } else { - LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {}", inputName, cacheKey, - numEntries); + LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries[{}]: {} ", + inputName, cacheKey, pos, numEntries); } + tableContainer.seal(); + mapJoinTables[pos] = tableContainer; } catch (Exception e) { throw new HiveException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java index 6c1ae2c..f4f103e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.util.Collections; import java.util.Map; @@ -42,6 +43,8 @@ import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.library.api.KeyValueReader; +import com.sun.management.ThreadMXBean; + /** * HashTableLoader for Tez constructs the hashtable from records read from * a broadcast edge. @@ -54,6 +57,9 @@ protected MapJoinDesc desc; private TezContext tezContext; private String cacheKey; + private long startMemAllocBytes = 0; + private ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean(); + private boolean threadAllocSupported = false; @Override public void init(ExecMapperContext context, MapredContext mrContext, @@ -62,13 +68,21 @@ public void init(ExecMapperContext context, MapredContext mrContext, this.hconf = hconf; this.desc = joinOp.getConf(); this.cacheKey = joinOp.getCacheKey(); + if (threadMXBean.isThreadAllocatedMemoryEnabled() && threadMXBean.isThreadAllocatedMemorySupported()) { + this.threadAllocSupported = true; + LOG.info("Thread allocated memory supported and enabled"); + } else { + LOG.info("Thread allocated memory is not supported or enabled"); + } } @Override public void load(MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException { - + if (threadAllocSupported) { + startMemAllocBytes = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId()); + } Map parentToInput = desc.getParentToInput(); Map parentKeyCounts = desc.getParentKeyCounts(); @@ -126,38 +140,51 @@ public void load(MapJoinTableContainer[] mapJoinTables, cacheKey, vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos); vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here. + long estMemUsage; while (kvReader.next()) { vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(), (BytesWritable)kvReader.getCurrentValue()); numEntries++; if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) { - final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize(); - if (estMemUsage > effectiveThreshold) { - String msg = "Hash table loading exceeded memory limits for input: " + inputName + - " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage + - " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo; - LOG.error(msg); - throw new MapJoinMemoryExhaustionError(msg); - } else { - if (LOG.isInfoEnabled()) { - LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " + - "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage, - effectiveThreshold); - } + if (threadAllocSupported) { + estMemUsage = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId()) - startMemAllocBytes; + } else { + estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize(); + } + if (estMemUsage > effectiveThreshold) { + String msg = "Hash table loading exceeded memory limits for input: " + inputName + + " numEntries[" + pos + "]: " + numEntries + + " threadAllocSupported: " + threadAllocSupported + + " estimatedMemoryUsage: " + estMemUsage + + " effectiveThreshold: " + effectiveThreshold + + " memoryMonitorInfo: " + memoryMonitorInfo; + LOG.error(msg); + throw new MapJoinMemoryExhaustionError(msg); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Checking hash table loader memory usage for input: {} numEntries[{}]: {} " + + "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, pos, numEntries, + estMemUsage, effectiveThreshold); } + } } } - vectorMapJoinFastTableContainer.seal(); - mapJoinTables[pos] = vectorMapJoinFastTableContainer; if (doMemCheck) { - LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} " + - "estimatedMemoryUsage: {}", inputName, cacheKey, numEntries, - vectorMapJoinFastTableContainer.getEstimatedMemorySize()); + // did not go into mem monitoring loop once + if (threadAllocSupported) { + estMemUsage = threadMXBean.getThreadAllocatedBytes(Thread.currentThread().getId()) - startMemAllocBytes; + } else { + estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize(); + } + LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries[{}]: {} " + + "estimatedMemoryUsage: {} ", inputName, cacheKey, pos, numEntries, estMemUsage); } else { - LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {}", inputName, cacheKey, - numEntries); + LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries[{}]: {} ", + inputName, cacheKey, pos, numEntries); } + vectorMapJoinFastTableContainer.seal(); + mapJoinTables[pos] = vectorMapJoinFastTableContainer; } catch (IOException e) { throw new HiveException(e); } catch (SerDeException e) {