diff --git a/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java new file mode 100644 index 0000000..36ae56f --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +/** + * Interface that can be used to provide size estimates based on data structures held in memory for an object instance. + */ +public interface MemoryEstimate { + /** + * Returns estimated memory size based {@link org.apache.hadoop.hive.ql.util.JavaDataModel} + * + * @return estimated memory size in bytes + */ + long getEstimatedMemorySize(); +} diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d3ea824..c8afc0c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3152,6 +3152,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.llap.mapjoin.memory.oversubscribe.factor amount of memory can be borrowed based on which mapjoin\n" + "conversion decision will be made). This is only an upper bound. Lower bound is determined by number of\n" + "executors and configured max concurrency."), + LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL("hive.llap.mapjoin.memory.monitor.check.interval", 10000, + "Check memory usage of mapjoin hash tables after this interval. If map join hash tables memory usage exceed\n" + + "hive.auto.convert.join.noconditionaltask.size when running in LLAP, tasks will get killed."), LLAP_DAEMON_AM_REPORTER_MAX_THREADS("hive.llap.daemon.am-reporter.max.threads", 4, "Maximum number of threads to be used for AM reporter. If this is lower than number of\n" + "executors in llap daemon, it would be set to number of executors at runtime.", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 04e24bd..360b639 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -24,6 +24,8 @@ import java.util.TreeMap; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.MemoryEstimate; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -46,7 +48,7 @@ * Initially inspired by HPPC LongLongOpenHashMap; however, the code is almost completely reworked * and there's very little in common left save for quadratic probing (and that with some changes). */ -public final class BytesBytesMultiHashMap { +public final class BytesBytesMultiHashMap implements MemoryEstimate { public static final Logger LOG = LoggerFactory.getLogger(BytesBytesMultiHashMap.class); /* @@ -521,7 +523,18 @@ public int getNumValues() { * @return number of bytes */ public long memorySize() { - return writeBuffers.size() + refs.length * 8 + 100; + return getEstimatedMemorySize(); + } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += writeBuffers.getEstimatedMemorySize(); + size += jdm.lengthForLongArrayOfSize(refs.length); + // 11 primitive1 fields, 2 refs above with alignment + size += JavaDataModel.alignUp(15 * jdm.primitive1(), jdm.memoryAlign()); + return size; } public void seal() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java index a3bccc6..81cd152 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java @@ -140,6 +140,14 @@ public ReusableGetAdaptor createGetter(MapJoinKey keyTypeFromLoader) { return new GetAdaptor(keyTypeFromLoader); } + @Override + public long getEstimatedMemorySize() { + // TODO: Eagerly deserialized in-memory java objects are difficult to estimate memory size and not really efficient. + // FlatRowContainer essentially contains Object[] which could be anything. + // assuming single entry is about 1KB + return size() * 1024; + } + private class GetAdaptor implements ReusableGetAdaptor { private Object[] currentKey; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index 04e89e8..6523f00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -118,6 +118,11 @@ private final String spillLocalDirs; + @Override + public long getEstimatedMemorySize() { + return memoryUsed; + } + /** * This class encapsulates the triplet together since they are closely related to each other * The triplet: hashmap (either in memory or on disk), small table container, big table container diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index c86e5f5..bb85e1e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -33,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -72,6 +74,11 @@ implements MapJoinTableContainer, MapJoinTableContainerDirectAccess { private static final Logger LOG = LoggerFactory.getLogger(MapJoinTableContainer.class); + // TODO: For object inspector fields, assigning 16KB for now. To better estimate the memory size every + // object inspectors have to implement MemoryEstimate interface which is a lot of change with little benefit compared + // to writing an instrumentation agent for object size estimation + public static final int DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE = 16 * 1024; + private final BytesBytesMultiHashMap hashMap; /** The OI used to deserialize values. We never deserialize keys. */ private LazyBinaryStructObjectInspector internalValueOi; @@ -147,7 +154,7 @@ public void setNotNullMarkers(byte[] notNullMarkers) { this.notNullMarkers = notNullMarkers; } - public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource { + public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource, MemoryEstimate { void setKeyValue(Writable key, Writable val) throws SerDeException; /** Get hash value from the key. */ int getHashFromKey() throws SerDeException; @@ -216,6 +223,22 @@ public byte updateStateByte(Byte previousValue) { public int getHashFromKey() throws SerDeException { throw new UnsupportedOperationException("Not supported for MapJoinBytesTableContainer"); } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += keySerDe == null ? 0 : jdm.object(); + size += valSerDe == null ? 0 : jdm.object(); + size += keySoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += valSoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += keyOis == null ? 0 : jdm.arrayList() + keyOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += valOis == null ? 0 : jdm.arrayList() + valOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += keyObjs == null ? 0 : jdm.array() + keyObjs.length * jdm.object(); + size += valObjs == null ? 0 : jdm.array() + valObjs.length * jdm.object(); + size += jdm.primitive1(); + return size; + } } static class LazyBinaryKvWriter implements KeyValueHelper { @@ -319,6 +342,15 @@ public byte updateStateByte(Byte previousValue) { aliasFilter &= filterGetter.getShort(); return aliasFilter; } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += (4 * jdm.object()); + size += jdm.primitive1(); + return size; + } } /* @@ -361,6 +393,15 @@ public int getHashFromKey() throws SerDeException { int keyLength = key.getLength(); return HashCodeUtil.murmurHash(keyBytes, 0, keyLength); } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += jdm.object() + (key == null ? 0 : key.getCapacity()); + size += jdm.object() + (val == null ? 0 : val.getCapacity()); + return size; + } } @Override @@ -768,4 +809,19 @@ public boolean hasSpill() { public int size() { return hashMap.size(); } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += hashMap.getEstimatedMemorySize(); + size += directWriteHelper == null ? 0 : directWriteHelper.getEstimatedMemorySize(); + size += writeHelper == null ? 0 : writeHelper.getEstimatedMemorySize(); + size += sortableSortOrders == null ? 0 : jdm.lengthForBooleanArrayOfSize(sortableSortOrders.length); + size += nullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(nullMarkers.length); + size += notNullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(notNullMarkers.length); + size += jdm.arrayList(); // empty list + size += DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + return size; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java index 6d71fef..5ca5ff6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -31,7 +32,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; -public interface MapJoinTableContainer { +public interface MapJoinTableContainer extends MemoryEstimate { /** * Retrieve rows from hashtable key by key, one key at a time, w/o copying the structures * for each key. "Old" HashMapWrapper will still create/retrieve new objects for java HashMap; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 7b13e90..5344455 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -146,7 +147,17 @@ public void load(MapJoinTableContainer[] mapJoinTables, } nwayConf.setNumberOfPartitions(numPartitions); } - + final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); + final long memoryCheckInterval = HiveConf.getLongVar(hconf, + HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); + final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + long numEntries = 0; + long noCondTaskSize = desc.getNoConditionalTaskSize(); + boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0; + if (!doMemCheck) { + LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {}", + isLlap, inflationFactor, noCondTaskSize); + } for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; @@ -209,8 +220,26 @@ public void load(MapJoinTableContainer[] mapJoinTables, tableContainer.setSerde(keyCtx, valCtx); while (kvReader.next()) { - tableContainer.putRow( - (Writable)kvReader.getCurrentKey(), (Writable)kvReader.getCurrentValue()); + tableContainer.putRow((Writable) kvReader.getCurrentKey(), (Writable) kvReader.getCurrentValue()); + numEntries++; + if (doMemCheck && numEntries >= memoryCheckInterval) { + final long estMemUsage = tableContainer.getEstimatedMemorySize(); + final long inflatedMemUsage = (long) (estMemUsage * inflationFactor); + final long threshold = (long) (inflationFactor * noCondTaskSize); + if (inflatedMemUsage > threshold) { + throw new MapJoinMemoryExhaustionException("Hash table loading exceeded memory limits." + + " noconditionalTaskSize: " + noCondTaskSize + " inflationFactor: " + + inflationFactor + " estimate memory usage: " + estMemUsage + " inflated memory usage: " + + inflatedMemUsage); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Checking hash table loader memory usage.. noconditionTaskSize: {} inflationFactor: {}" + + " estimatedMemoryUsage: {} inflatedMemoryUsage: {}", noCondTaskSize, inflationFactor, estMemUsage, + inflatedMemUsage); + } + } + numEntries = 0; + } } tableContainer.seal(); LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java index 72dcdd3..74e7d20 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java @@ -44,8 +44,6 @@ // before anything else. private volatile static ObjectRegistry staticRegistry; - private static ExecutorService staticPool; - private final ObjectRegistry registry; public ObjectCache() { @@ -56,7 +54,6 @@ public ObjectCache() { public static void setupObjectRegistry(ObjectRegistry objectRegistry) { staticRegistry = objectRegistry; - staticPool = Executors.newCachedThreadPool(); } @Override @@ -102,7 +99,7 @@ public void release(String key) { @Override public Future retrieveAsync(final String key, final Callable fn) throws HiveException { - return staticPool.submit(new Callable() { + return Executors.newSingleThreadExecutor().submit(new Callable() { @Override public T call() throws Exception { return retrieve(key, fn); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java index 6242daf..b5eab8b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java @@ -107,4 +107,9 @@ public VectorMapJoinFastBytesHashMap( // Share the same write buffers with our value store. keyStore = new VectorMapJoinFastKeyStore(valueStore.writeBuffers()); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + valueStore.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java index 1a41961..e779762 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java @@ -97,4 +97,9 @@ public VectorMapJoinFastBytesHashMultiSet( keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java index 331867c..d493319 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java @@ -84,4 +84,9 @@ public VectorMapJoinFastBytesHashSet( keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java index b93e977..10bc902 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable; @@ -218,4 +219,9 @@ public VectorMapJoinFastBytesHashTable( super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); allocateBucketArray(); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + JavaDataModel.get().lengthForLongArrayOfSize(slotTriples.length); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java index b6db3bc..f5e6e06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable; @@ -88,4 +89,10 @@ public VectorMapJoinFastHashTable( public int size() { return keysAssigned; } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + return JavaDataModel.alignUp(10 * jdm.primitive1() + jdm.primitive2(), jdm.memoryAlign()); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java index 49ecdd1..d2dcea7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -68,6 +69,18 @@ public void load(MapJoinTableContainer[] mapJoinTables, Map parentToInput = desc.getParentToInput(); Map parentKeyCounts = desc.getParentKeyCounts(); + final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); + final long memoryCheckInterval = HiveConf.getLongVar(hconf, + HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); + final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + long numEntries = 0; + long noCondTaskSize = desc.getNoConditionalTaskSize(); + boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0; + if (!doMemCheck) { + LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {}", + isLlap, inflationFactor, noCondTaskSize); + } + for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; @@ -97,11 +110,31 @@ public void load(MapJoinTableContainer[] mapJoinTables, while (kvReader.next()) { vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(), (BytesWritable)kvReader.getCurrentValue()); + numEntries++; + if (doMemCheck && numEntries >= memoryCheckInterval) { + final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize(); + final long inflatedMemUsage = (long) (estMemUsage * inflationFactor); + final long threshold = (long) (inflationFactor * noCondTaskSize); + if (inflatedMemUsage > threshold) { + throw new MapJoinMemoryExhaustionException("Hash table loading exceeded memory limits." + + " noconditionalTaskSize: " + noCondTaskSize + " inflationFactor: " + + inflationFactor + " estimate memory usage: " + estMemUsage + " inflated memory usage: " + + inflatedMemUsage); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Checking hash table loader memory usage.. noconditionTaskSize: {} inflationFactor: {}" + + " estimatedMemoryUsage: {} inflatedMemoryUsage: {}", noCondTaskSize, inflationFactor, estMemUsage, + inflatedMemUsage); + } + } + numEntries = 0; + } } vectorMapJoinFastTableContainer.seal(); - mapJoinTables[pos] = (MapJoinTableContainer) vectorMapJoinFastTableContainer; - + mapJoinTables[pos] = vectorMapJoinFastTableContainer; + LOG.info("Finished loading hashtable using " + vectorMapJoinFastTableContainer.getClass() + + ". Small table position: " + pos); } catch (IOException e) { throw new HiveException(e); } catch (SerDeException e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java index be51693..3e9ff84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java @@ -18,13 +18,14 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.serde2.WriteBuffers; // Optimized for sequential key lookup. -public class VectorMapJoinFastKeyStore { +public class VectorMapJoinFastKeyStore implements MemoryEstimate { private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastKeyStore.class.getName()); @@ -165,4 +166,12 @@ public VectorMapJoinFastKeyStore(WriteBuffers writeBuffers) { this.writeBuffers = writeBuffers; unsafeReadPos = new WriteBuffers.Position(); } + + @Override + public long getEstimatedMemorySize() { + long size = 0; + size += writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize(); + size += unsafeReadPos == null ? 0 : unsafeReadPos.getEstimatedMemorySize(); + return size; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java index 6fe98f9..d4847b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java @@ -20,6 +20,8 @@ import java.io.IOException; +import org.apache.hadoop.hive.common.MemoryEstimate; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -37,7 +39,7 @@ */ public class VectorMapJoinFastLongHashMap extends VectorMapJoinFastLongHashTable - implements VectorMapJoinLongHashMap { + implements VectorMapJoinLongHashMap, MemoryEstimate { public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMap.class); @@ -112,4 +114,9 @@ public VectorMapJoinFastLongHashMap( initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); valueStore = new VectorMapJoinFastValueStore(writeBuffersSize); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize() + valueStore.getEstimatedMemorySize(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java index 9140aee..566cfa2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java @@ -100,4 +100,9 @@ public VectorMapJoinFastLongHashMultiSet( super(minMaxEnabled, isOuterJoin, hashTableKeyType, initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java index d3efb11..fb7ae62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java @@ -96,4 +96,9 @@ public VectorMapJoinFastLongHashSet( super(minMaxEnabled, isOuterJoin, hashTableKeyType, initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java index 8bfa07c..d897308 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -280,4 +281,18 @@ public VectorMapJoinFastLongHashTable( min = Long.MAX_VALUE; max = Long.MIN_VALUE; } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = super.getEstimatedMemorySize(); + size += slotPairs == null ? 0 : jdm.lengthForLongArrayOfSize(slotPairs.length); + size += (2 * jdm.primitive2()); + size += (2 * jdm.primitive1()); + size += jdm.object(); + // adding 16KB constant memory for keyBinarySortableDeserializeRead as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + size += (16 * 1024); + return size; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java index add4788..eb08aa9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java @@ -53,4 +53,9 @@ public VectorMapJoinFastMultiKeyHashMap( int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) { super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java index faefdbb..56964bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java @@ -52,4 +52,8 @@ public VectorMapJoinFastMultiKeyHashMultiSet( super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java index 5328910..46bafe0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java @@ -52,5 +52,8 @@ public VectorMapJoinFastMultiKeyHashSet( super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); } - + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java index f13034f..df75424 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java @@ -43,4 +43,13 @@ public VectorMapJoinFastStringHashMap( super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin); } + + @Override + public long getEstimatedMemorySize() { + long size = 0; + // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + size += (16 * 1024); + return super.getEstimatedMemorySize() + size; + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java index 53ad7b4..458b19a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java @@ -43,4 +43,12 @@ public VectorMapJoinFastStringHashMultiSet( super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin); } + + @Override + public long getEstimatedMemorySize() { + // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + long size = (16 * 1024); + return super.getEstimatedMemorySize() + size; + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java index 723c729..5615def 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java @@ -43,4 +43,12 @@ public VectorMapJoinFastStringHashSet( super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount); stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin); } + + @Override + public long getEstimatedMemorySize() { + // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + long size = (16 * 1024); + return super.getEstimatedMemorySize() + size; + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java index 05f1cf1..2fe4b93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java @@ -19,6 +19,7 @@ import java.io.IOException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -26,7 +27,6 @@ import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; -import org.apache.hadoop.hive.ql.exec.tez.HashTableLoader; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -38,7 +38,6 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; -import org.apache.tez.runtime.library.api.KeyValueReader; /** * HashTableLoader for Tez constructs the hashtable from records read from @@ -46,7 +45,7 @@ */ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContainer { - private static final Logger LOG = LoggerFactory.getLogger(HashTableLoader.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastTableContainer.class.getName()); private final MapJoinDesc desc; private final Configuration hconf; @@ -219,6 +218,17 @@ public int size() { } @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += vectorMapJoinFastHashTable.getEstimatedMemorySize(); + size += (4 * jdm.primitive1()); + size += (2 * jdm.object()); + size += (jdm.primitive2()); + return size; + } + + @Override public void setSerde(MapJoinObjectSerDeContext keyCtx, MapJoinObjectSerDeContext valCtx) throws SerDeException { // Do nothing in this case. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java index f9c5b34..3cd06e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult; @@ -30,7 +31,7 @@ // Supports random access. -public class VectorMapJoinFastValueStore { +public class VectorMapJoinFastValueStore implements MemoryEstimate { private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastValueStore.class.getName()); @@ -113,6 +114,11 @@ public WriteBuffers writeBuffers() { return writeBuffers; } + @Override + public long getEstimatedMemorySize() { + return writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize(); + } + public static class HashMapResult extends VectorMapJoinHashMapResult { private VectorMapJoinFastValueStore valueStore; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java index c7e585c..9cc9ad4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.io.BytesWritable; @@ -28,7 +29,7 @@ * Root interface for a vector map join hash table (which could be a hash map, hash multi-set, or * hash set). */ -public interface VectorMapJoinHashTable { +public interface VectorMapJoinHashTable extends MemoryEstimate { /* diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java index 93a89d7..1560807 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java @@ -75,4 +75,9 @@ public VectorMapJoinOptimizedHashSet( MapJoinTableContainer originalTableContainer, ReusableGetAdaptor hashMapRowGetter) { super(originalTableContainer, hashMapRowGetter); } + + @Override + public long getEstimatedMemorySize() { + return super.getEstimatedMemorySize(); + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java index 5fe7861..5275e1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -96,4 +97,12 @@ public VectorMapJoinOptimizedHashTable( public int size() { return originalTableContainer.size(); } + + @Override + public long getEstimatedMemorySize() { + long size = 0; + size += originalTableContainer == null ? 0 : originalTableContainer.getEstimatedMemorySize(); + size += (2 * JavaDataModel.get().object()); + return size; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java index f921b9c..c09d9c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java @@ -60,4 +60,12 @@ public VectorMapJoinOptimizedStringHashSet(boolean isOuterJoin, super(originalTableContainer, hashMapRowGetter); stringCommon = new VectorMapJoinOptimizedStringCommon(isOuterJoin); } + + @Override + public long getEstimatedMemorySize() { + // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement + // MemoryEstimate interface, also it is constant overhead + long size = (16 * 1024); + return super.getEstimatedMemorySize() + size; + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index ad77e87..d0fdb52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -100,6 +100,7 @@ // adjust noconditional task size threshold for LLAP maxSize = getNoConditionalTaskSizeForLlap(maxSize, context.conf); + joinOp.getConf().setNoConditionalTaskSize(maxSize); TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { @@ -280,7 +281,7 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont joinOp.getConf().getBaseSrc(), joinOp).getSecond(), null, joinDesc.getExprs(), null, null, joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), - joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null); + joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null, joinDesc.getNoConditionalTaskSize()); mapJoinDesc.setNullSafes(joinDesc.getNullSafes()); mapJoinDesc.setFilterMap(joinDesc.getFilterMap()); mapJoinDesc.setResidualFilterExprs(joinDesc.getResidualFilterExprs()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index b2893e7..85d46f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -59,7 +59,6 @@ import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.parse.GenMapRedWalker; -import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -80,8 +79,6 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import com.clearspring.analytics.util.Lists; - /** * Implementation of one of the rule-based map join optimization. User passes hints to specify * map-joins and during this optimization, all user specified map joins are converted to MapJoins - @@ -434,7 +431,8 @@ public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf, smbJoinDesc.getValueTblDescs(), smbJoinDesc.getValueTblDescs(), smbJoinDesc.getOutputColumnNames(), bigTablePos, smbJoinDesc.getConds(), - smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix()); + smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix(), + smbJoinDesc.getNoConditionalTaskSize()); mapJoinDesc.setStatistics(smbJoinDesc.getStatistics()); @@ -1187,7 +1185,7 @@ public static MapJoinDesc getMapJoinDesc(HiveConf hconf, MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op - .getConf().getNoOuterJoin(), dumpFilePrefix); + .getConf().getNoOuterJoin(), dumpFilePrefix, op.getConf().getNoConditionalTaskSize()); mapJoinDescriptor.setStatistics(op.getConf().getStatistics()); mapJoinDescriptor.setTagOrder(tagOrder); mapJoinDescriptor.setNullSafes(desc.getNullSafes()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index d375d1b..40c0f3b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -1005,7 +1005,7 @@ private static JoinOperator genJoin(RelNode join, ExprNodeDesc[][] joinExpressio // 4. We create the join operator with its descriptor JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, joinCondns, - filters, joinExpressions); + filters, joinExpressions, 0); desc.setSemiJoinHints(semiJoinHints); desc.setReversedExprs(reversedExprs); desc.setFilterMap(filterMap); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index 93b8a5d..f78bd7c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -281,7 +281,8 @@ public static void processSkewJoin(JoinOperator joinOp, MapJoinDesc mapJoinDescriptor = new MapJoinDesc(newJoinKeys, keyTblDesc, newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc,joinDescriptor .getOutputColumnNames(), i, joinDescriptor.getConds(), - joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix); + joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix, + joinDescriptor.getNoConditionalTaskSize()); mapJoinDescriptor.setTagOrder(tags); mapJoinDescriptor.setHandleSkewJoin(false); mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 405c3ca..c970611 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -240,7 +240,8 @@ public static void processSkewJoin(JoinOperator joinOp, Task semiJoinHints; + // non-transient field, used at runtime to kill a task if it exceeded memory limits when running in LLAP + protected long noConditionalTaskSize; + public JoinDesc() { } public JoinDesc(final Map> exprs, List outputColumnNames, final boolean noOuterJoin, final JoinCondDesc[] conds, final Map> filters, - ExprNodeDesc[][] joinKeys) { + ExprNodeDesc[][] joinKeys, final long noConditionalTaskSize) { this.exprs = exprs; this.outputColumnNames = outputColumnNames; this.noOuterJoin = noOuterJoin; this.conds = conds; this.filters = filters; this.joinKeys = joinKeys; - + this.noConditionalTaskSize = noConditionalTaskSize; resetOrder(); } @@ -152,6 +155,7 @@ public Object clone() { ret.setHandleSkewJoin(handleSkewJoin); ret.setSkewKeyDefinition(getSkewKeyDefinition()); ret.setTagOrder(getTagOrder().clone()); + ret.setNoConditionalTaskSize(getNoConditionalTaskSize()); if (getKeyTableDesc() != null) { ret.setKeyTableDesc((TableDesc) getKeyTableDesc().clone()); } @@ -203,6 +207,7 @@ public JoinDesc(JoinDesc clone) { this.residualFilterExprs = clone.residualFilterExprs; this.statistics = clone.statistics; this.semiJoinHints = clone.semiJoinHints; + this.noConditionalTaskSize = clone.noConditionalTaskSize; } public Map> getExprs() { @@ -700,4 +705,12 @@ public void setSemiJoinHints(Map semiJoinHints) { return semiJoinHints; } + + public long getNoConditionalTaskSize() { + return noConditionalTaskSize; + } + + public void setNoConditionalTaskSize(final long noConditionalTaskSize) { + this.noConditionalTaskSize = noConditionalTaskSize; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 940630c..8da85d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -109,11 +109,12 @@ public MapJoinDesc(MapJoinDesc clone) { } public MapJoinDesc(final Map> keys, - final TableDesc keyTblDesc, final Map> values, - final List valueTblDescs,final List valueFilteredTblDescs, List outputColumnNames, - final int posBigTable, final JoinCondDesc[] conds, - final Map> filters, boolean noOuterJoin, String dumpFilePrefix) { - super(values, outputColumnNames, noOuterJoin, conds, filters, null); + final TableDesc keyTblDesc, final Map> values, + final List valueTblDescs, final List valueFilteredTblDescs, List outputColumnNames, + final int posBigTable, final JoinCondDesc[] conds, + final Map> filters, boolean noOuterJoin, String dumpFilePrefix, + final long noConditionalTaskSize) { + super(values, outputColumnNames, noOuterJoin, conds, filters, null, noConditionalTaskSize); vectorDesc = null; this.keys = keys; this.keyTblDesc = keyTblDesc; @@ -585,5 +586,4 @@ public SMBJoinOperatorExplainVectorization getSMBJoinVectorization() { } return new SMBJoinOperatorExplainVectorization((SMBJoinDesc) this, vectorDesc); } - } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java index a4ecd9f..049547d 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java @@ -21,6 +21,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import org.apache.hadoop.hive.common.MemoryEstimate; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; import org.apache.hadoop.io.WritableUtils; @@ -31,7 +33,7 @@ * The structure storing arbitrary amount of data as a set of fixed-size byte buffers. * Maintains read and write pointers for convenient single-threaded writing/reading. */ -public final class WriteBuffers implements RandomAccessOutput { +public final class WriteBuffers implements RandomAccessOutput, MemoryEstimate { private final ArrayList writeBuffers = new ArrayList(1); /** Buffer size in writeBuffers */ private final int wbSize; @@ -39,7 +41,7 @@ private final long offsetMask; private final long maxSize; - public static class Position { + public static class Position implements MemoryEstimate { private byte[] buffer = null; private int bufferIndex = 0; private int offset = 0; @@ -47,6 +49,14 @@ public void clear() { buffer = null; bufferIndex = offset = -1; } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + int memSize = buffer == null ? 0 : jdm.lengthForByteArrayOfSize(buffer.length); + memSize += (2 * jdm.primitive1()); + return memSize; + } } Position writePos = new Position(); // Position where we'd write @@ -610,6 +620,17 @@ public long size() { return writeBuffers.size() * (long) wbSize; } + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += writeBuffers == null ? 0 : jdm.arrayList() + (writeBuffers.size() * jdm.lengthForByteArrayOfSize(wbSize)); + size += (3 * jdm.primitive2()); + size += writePos == null ? 0 : writePos.getEstimatedMemorySize(); + size += unsafeReadPos == null ? 0 : unsafeReadPos.getEstimatedMemorySize(); + return size; + } + /** THIS METHOD IS NOT THREAD-SAFE. Use only at load time (or be mindful of thread safety). */ public Position getUnsafeReadPosition() { return unsafeReadPos;