diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 07aa2ea..4971707 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -132,6 +132,10 @@ protected HashTableLoader getHashTableLoader(Configuration hconf) { return HashTableLoaderFactory.getLoader(hconf); } + public String getCacheKey() { + return cacheKey; + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { this.hconf = hconf; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 7011d23..b5aaf33 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -63,6 +63,7 @@ private Configuration hconf; private MapJoinDesc desc; private TezContext tezContext; + private String cacheKey; @Override public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf, @@ -70,6 +71,7 @@ public void init(ExecMapperContext context, MapredContext mrContext, Configurati this.tezContext = (TezContext) mrContext; this.hconf = hconf; this.desc = joinOp.getConf(); + this.cacheKey = joinOp.getCacheKey(); } @Override @@ -151,7 +153,6 @@ public void load(MapJoinTableContainer[] mapJoinTables, final long memoryCheckInterval = HiveConf.getLongVar(hconf, HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); - long numEntries = 0; long noCondTaskSize = desc.getNoConditionalTaskSize(); boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0; if (!doMemCheck) { @@ -166,6 +167,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, continue; } + long numEntries = 0; String inputName = parentToInput.get(pos); LogicalInput input = tezContext.getInput(inputName); @@ -219,7 +221,8 @@ public void load(MapJoinTableContainer[] mapJoinTables, tableContainer = new HashMapWrapper(hconf, keyCount); } - LOG.info("Using tableContainer: " + tableContainer.getClass().getSimpleName()); + LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {}", inputName, + cacheKey, tableContainer.getClass().getSimpleName(), pos); tableContainer.setSerde(keyCtx, valCtx); while (kvReader.next()) { @@ -232,7 +235,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, // available for container/executor final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable()); if (estMemUsage > effectiveThreshold) { - String msg = "Hash table loading exceeded memory limits." + + String msg = "Hash table loading exceeded memory limits for input: " + inputName + " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize + " inflationFactor: " + inflationFactor + " threshold: " + threshold + " effectiveThreshold: " + effectiveThreshold; @@ -240,15 +243,21 @@ public void load(MapJoinTableContainer[] mapJoinTables, throw new MapJoinMemoryExhaustionError(msg); } else { if (LOG.isInfoEnabled()) { - LOG.info("Checking hash table loader memory usage.. numEntries: {} estimatedMemoryUsage: {} " + - "effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold); + LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " + + "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage, + effectiveThreshold); } } } } tableContainer.seal(); - LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos); mapJoinTables[pos] = tableContainer; + if (doMemCheck) { + LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} estimatedMemoryUsage: {}", + inputName, cacheKey, numEntries, tableContainer.getEstimatedMemorySize()); + } else { + LOG.info("Finished loading hash table for input: {} cacheKey: {}", inputName, cacheKey); + } } catch (Exception e) { throw new HiveException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java index b015e43..07e58bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java @@ -51,6 +51,7 @@ private Configuration hconf; protected MapJoinDesc desc; private TezContext tezContext; + private String cacheKey; @Override public void init(ExecMapperContext context, MapredContext mrContext, @@ -58,6 +59,7 @@ public void init(ExecMapperContext context, MapredContext mrContext, this.tezContext = (TezContext) mrContext; this.hconf = hconf; this.desc = joinOp.getConf(); + this.cacheKey = joinOp.getCacheKey(); } @Override @@ -72,7 +74,6 @@ public void load(MapJoinTableContainer[] mapJoinTables, final long memoryCheckInterval = HiveConf.getLongVar(hconf, HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); - long numEntries = 0; long noCondTaskSize = desc.getNoConditionalTaskSize(); boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0; if (!doMemCheck) { @@ -88,6 +89,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, continue; } + long numEntries = 0; String inputName = parentToInput.get(pos); LogicalInput input = tezContext.getInput(inputName); @@ -108,7 +110,8 @@ public void load(MapJoinTableContainer[] mapJoinTables, VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer = new VectorMapJoinFastTableContainer(desc, hconf, keyCount); - LOG.info("Using vectorMapJoinFastTableContainer: " + vectorMapJoinFastTableContainer.getClass().getSimpleName()); + LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {}", inputName, + cacheKey, vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos); vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here. while (kvReader.next()) { @@ -123,7 +126,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, // available for container/executor final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable()); if (estMemUsage > effectiveThreshold) { - String msg = "VectorMapJoin Hash table loading exceeded memory limits." + + String msg = "Hash table loading exceeded memory limits for input: " + inputName + " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize + " inflationFactor: " + inflationFactor + " threshold: " + threshold + " effectiveThreshold: " + effectiveThreshold; @@ -131,8 +134,9 @@ public void load(MapJoinTableContainer[] mapJoinTables, throw new MapJoinMemoryExhaustionError(msg); } else { if (LOG.isInfoEnabled()) { - LOG.info("Checking vector mapjoin hash table loader memory usage.. numEntries: {} " + - "estimatedMemoryUsage: {} effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold); + LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " + + "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage, + effectiveThreshold); } } } @@ -141,8 +145,13 @@ public void load(MapJoinTableContainer[] mapJoinTables, vectorMapJoinFastTableContainer.seal(); mapJoinTables[pos] = vectorMapJoinFastTableContainer; - LOG.info("Finished loading hashtable using " + vectorMapJoinFastTableContainer.getClass() + - ". Small table position: " + pos); + if (doMemCheck) { + LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} " + + "estimatedMemoryUsage: {}", inputName, cacheKey, numEntries, + vectorMapJoinFastTableContainer.getEstimatedMemorySize()); + } else { + LOG.info("Finished loading vector hash table for input: {} cacheKey: {}", inputName, cacheKey); + } } catch (IOException e) { throw new HiveException(e); } catch (SerDeException e) {