diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 07aa2ea..4971707 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -132,6 +132,10 @@ protected HashTableLoader getHashTableLoader(Configuration hconf) { return HashTableLoaderFactory.getLoader(hconf); } + public String getCacheKey() { + return cacheKey; + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { this.hconf = hconf; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 7011d23..9fe1d6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +64,7 @@ private Configuration hconf; private MapJoinDesc desc; private TezContext tezContext; + private String cacheKey; @Override public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf, @@ -70,6 +72,7 @@ public void init(ExecMapperContext context, MapredContext mrContext, Configurati this.tezContext = (TezContext) mrContext; this.hconf = hconf; this.desc = joinOp.getConf(); + this.cacheKey = joinOp.getCacheKey(); } @Override @@ -150,8 +153,8 @@ public void load(MapJoinTableContainer[] mapJoinTables, final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); final long memoryCheckInterval = HiveConf.getLongVar(hconf, HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); - final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); - long numEntries = 0; + final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)) && + LlapDaemonInfo.INSTANCE.isLlap(); long noCondTaskSize = desc.getNoConditionalTaskSize(); boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0; if (!doMemCheck) { @@ -166,6 +169,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, continue; } + long numEntries = 0; String inputName = parentToInput.get(pos); LogicalInput input = tezContext.getInput(inputName); @@ -219,7 +223,8 @@ public void load(MapJoinTableContainer[] mapJoinTables, tableContainer = new HashMapWrapper(hconf, keyCount); } - LOG.info("Using tableContainer: " + tableContainer.getClass().getSimpleName()); + LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {}", inputName, + cacheKey, tableContainer.getClass().getSimpleName(), pos); tableContainer.setSerde(keyCtx, valCtx); while (kvReader.next()) { @@ -232,7 +237,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, // available for container/executor final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable()); if (estMemUsage > effectiveThreshold) { - String msg = "Hash table loading exceeded memory limits." + + String msg = "Hash table loading exceeded memory limits for input: " + inputName + " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize + " inflationFactor: " + inflationFactor + " threshold: " + threshold + " effectiveThreshold: " + effectiveThreshold; @@ -240,15 +245,21 @@ public void load(MapJoinTableContainer[] mapJoinTables, throw new MapJoinMemoryExhaustionError(msg); } else { if (LOG.isInfoEnabled()) { - LOG.info("Checking hash table loader memory usage.. numEntries: {} estimatedMemoryUsage: {} " + - "effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold); + LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " + + "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage, + effectiveThreshold); } } } } tableContainer.seal(); - LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos); mapJoinTables[pos] = tableContainer; + if (doMemCheck) { + LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} estimatedMemoryUsage: {}", + inputName, cacheKey, numEntries, tableContainer.getEstimatedMemorySize()); + } else { + LOG.info("Finished loading hash table for input: {} cacheKey: {}", inputName, cacheKey); + } } catch (Exception e) { throw new HiveException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java index b015e43..9180709 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Map; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,7 @@ private Configuration hconf; protected MapJoinDesc desc; private TezContext tezContext; + private String cacheKey; @Override public void init(ExecMapperContext context, MapredContext mrContext, @@ -58,6 +60,7 @@ public void init(ExecMapperContext context, MapredContext mrContext, this.tezContext = (TezContext) mrContext; this.hconf = hconf; this.desc = joinOp.getConf(); + this.cacheKey = joinOp.getCacheKey(); } @Override @@ -71,8 +74,8 @@ public void load(MapJoinTableContainer[] mapJoinTables, final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); final long memoryCheckInterval = HiveConf.getLongVar(hconf, HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); - final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); - long numEntries = 0; + final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)) && + LlapDaemonInfo.INSTANCE.isLlap(); long noCondTaskSize = desc.getNoConditionalTaskSize(); boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0; if (!doMemCheck) { @@ -88,6 +91,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, continue; } + long numEntries = 0; String inputName = parentToInput.get(pos); LogicalInput input = tezContext.getInput(inputName); @@ -108,7 +112,8 @@ public void load(MapJoinTableContainer[] mapJoinTables, VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer = new VectorMapJoinFastTableContainer(desc, hconf, keyCount); - LOG.info("Using vectorMapJoinFastTableContainer: " + vectorMapJoinFastTableContainer.getClass().getSimpleName()); + LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {}", inputName, + cacheKey, vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos); vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here. while (kvReader.next()) { @@ -123,7 +128,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, // available for container/executor final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable()); if (estMemUsage > effectiveThreshold) { - String msg = "VectorMapJoin Hash table loading exceeded memory limits." + + String msg = "Hash table loading exceeded memory limits for input: " + inputName + " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize + " inflationFactor: " + inflationFactor + " threshold: " + threshold + " effectiveThreshold: " + effectiveThreshold; @@ -131,8 +136,9 @@ public void load(MapJoinTableContainer[] mapJoinTables, throw new MapJoinMemoryExhaustionError(msg); } else { if (LOG.isInfoEnabled()) { - LOG.info("Checking vector mapjoin hash table loader memory usage.. numEntries: {} " + - "estimatedMemoryUsage: {} effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold); + LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " + + "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage, + effectiveThreshold); } } } @@ -141,8 +147,13 @@ public void load(MapJoinTableContainer[] mapJoinTables, vectorMapJoinFastTableContainer.seal(); mapJoinTables[pos] = vectorMapJoinFastTableContainer; - LOG.info("Finished loading hashtable using " + vectorMapJoinFastTableContainer.getClass() + - ". Small table position: " + pos); + if (doMemCheck) { + LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} " + + "estimatedMemoryUsage: {}", inputName, cacheKey, numEntries, + vectorMapJoinFastTableContainer.getEstimatedMemorySize()); + } else { + LOG.info("Finished loading vector hash table for input: {} cacheKey: {}", inputName, cacheKey); + } } catch (IOException e) { throw new HiveException(e); } catch (SerDeException e) {