diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index da171b1..d7995f9 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -767,6 +767,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", true, "Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,\n" + "because memory-optimized hashtable cannot be serialized."), + HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT("hive.mapjoin.optimized.hashtable.probe.percent", + (float) 0.5, "Probing space percentage of the optimized hashtable"), HIVEUSEHYBRIDGRACEHASHJOIN("hive.mapjoin.hybridgrace.hashtable", true, "Whether to use hybrid" + "grace hash join as the join method for mapjoin. Tez only."), HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ("hive.mapjoin.hybridgrace.memcheckfrequency", 1024, "For " + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 3bba890..0037ff9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -153,9 +153,19 @@ /** 8 Gb of refs is the max capacity if memory limit is not specified. If someone has 100s of * Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */ private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024; + /** Make sure maxCapacity has a lower limit */ + private final static int DEFAULT_MIN_MAX_CAPACITY = 10 * 1024 * 1024; - public BytesBytesMultiHashMap(int initialCapacity, - float loadFactor, int wbSize, long memUsage) { + /** + * Allocate the probing ref array and the underlying write buffers + * @param initialCapacity initial estimated size of the probe array + * @param loadFactor load factor + * @param wbSize write buffer size + * @param maxProbeSize the maximum size of the probe array + * @param wbOnDemand whether to allocate write buffer upfront, or on demand + */ + public BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize, + long maxProbeSize, boolean wbOnDemand) { if (loadFactor < 0 || loadFactor > 1) { throw new AssertionError("Load factor must be between (0, 1]."); } @@ -163,8 +173,11 @@ public BytesBytesMultiHashMap(int initialCapacity, initialCapacity = (Long.bitCount(initialCapacity) == 1) ? initialCapacity : nextHighestPowerOfTwo(initialCapacity); // 8 bytes per long in the refs, assume data will be empty. This is just a sanity check. - int maxCapacity = (memUsage <= 0) ? DEFAULT_MAX_CAPACITY - : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8); + int maxCapacity = (maxProbeSize <= 0) ? DEFAULT_MAX_CAPACITY + : (int)Math.min((long)DEFAULT_MAX_CAPACITY, maxProbeSize / 8); + if (maxCapacity < DEFAULT_MIN_MAX_CAPACITY) { + maxCapacity = DEFAULT_MIN_MAX_CAPACITY; + } if (maxCapacity < initialCapacity || initialCapacity <= 0) { // Either initialCapacity is too large, or nextHighestPowerOfTwo overflows initialCapacity = (Long.bitCount(maxCapacity) == 1) @@ -175,13 +188,15 @@ public BytesBytesMultiHashMap(int initialCapacity, startingHashBitCount = 63 - Long.numberOfLeadingZeros(initialCapacity); this.loadFactor = loadFactor; refs = new long[initialCapacity]; - writeBuffers = new WriteBuffers(wbSize, MAX_WB_SIZE); + if (!wbOnDemand) { + writeBuffers = new WriteBuffers(wbSize, MAX_WB_SIZE); + } resizeThreshold = (int)(initialCapacity * this.loadFactor); } @VisibleForTesting BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) { - this(initialCapacity, loadFactor, wbSize, -1); + this(initialCapacity, loadFactor, wbSize, -1, false); } /** @@ -538,16 +553,21 @@ public int getNumValues() { * @return number of bytes */ public long memorySize() { - return writeBuffers.size() + refs.length * 8 + 100; + long size = refs.length * 8 + 100; + return writeBuffers == null ? size : size + writeBuffers.size(); } public void seal() { - writeBuffers.seal(); + if (writeBuffers != null) { + writeBuffers.seal(); + } } public void clear() { // This will make the object completely unusable. Semantics of clear are not defined... - this.writeBuffers.clear(); + if (this.writeBuffers != null) { + this.writeBuffers.clear(); + } this.refs = new long[1]; this.keysAssigned = 0; this.numValues = 0; @@ -573,6 +593,21 @@ private static void validateCapacity(long capacity) { } } + public WriteBuffers getWriteBuffers() { + return writeBuffers; + } + + /** + * Allocate write buffer + * @param wbSize write buffer size + */ + public void allocateWriteBuffers(int wbSize) { + if (writeBuffers != null) { + throw new AssertionError("The writeBuffers has already been allocated!"); + } + writeBuffers = new WriteBuffers(wbSize, MAX_WB_SIZE); + } + /** * Finds the slot to use for writing, based on the key bytes already written to buffers. * @param keyOffset Offset to the key. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index ff64f52..d27e617 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -107,7 +107,7 @@ Path hashMapLocalPath; // Local file system path for spilled hashMap boolean hashMapOnDisk; // Status of hashMap. true: on disk, false: in memory boolean hashMapSpilledOnCreation; // When there's no enough memory, cannot create hashMap - int threshold; // Used to create an empty BytesBytesMultiHashMap + int initialCapacity; // Used to create an empty BytesBytesMultiHashMap float loadFactor; // Same as above int wbSize; // Same as above int rowsOnDisk; // How many rows saved to the on-disk hashmap (if on disk) @@ -115,17 +115,17 @@ /* It may happen that there's not enough memory to instantiate a hashmap for the partition. * In that case, we don't create the hashmap, but pretend the hashmap is directly "spilled". */ - public HashPartition(int threshold, float loadFactor, int wbSize, long memUsage, + public HashPartition(int initialCapacity, float loadFactor, int wbSize, long maxProbeSize, boolean createHashMap) { if (createHashMap) { - // Hash map should be at least the size of our designated wbSize - memUsage = Math.max(memUsage, wbSize); - hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize, memUsage); + // Probe space should be at least equal to the size of our designated wbSize + maxProbeSize = Math.max(maxProbeSize, wbSize); + hashMap = new BytesBytesMultiHashMap(initialCapacity, loadFactor, wbSize, maxProbeSize, true); } else { hashMapSpilledOnCreation = true; hashMapOnDisk = true; } - this.threshold = threshold; + this.initialCapacity = initialCapacity; this.loadFactor = loadFactor; this.wbSize = wbSize; } @@ -138,18 +138,18 @@ public BytesBytesMultiHashMap getHashMapFromMemory() { /* Restore the hashmap from disk by deserializing it. * Currently Kryo is used for this purpose. */ - public BytesBytesMultiHashMap getHashMapFromDisk(int initialCapacity) + public BytesBytesMultiHashMap getHashMapFromDisk(int rowCount) throws IOException, ClassNotFoundException { if (hashMapSpilledOnCreation) { - return new BytesBytesMultiHashMap(Math.max(threshold, initialCapacity) , loadFactor, wbSize, -1); + return new BytesBytesMultiHashMap(Math.max(this.initialCapacity, rowCount) , loadFactor, wbSize, -1, false); } else { InputStream inputStream = Files.newInputStream(hashMapLocalPath); com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream); Kryo kryo = Utilities.runtimeSerializationKryo.get(); BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class); - if (initialCapacity > 0) { - restoredHashMap.expandAndRehashToTarget(initialCapacity); + if (rowCount > 0) { + restoredHashMap.expandAndRehashToTarget(rowCount); } // some bookkeeping @@ -245,12 +245,13 @@ public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryA HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), + HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT), estimatedTableSize, keyCount, memoryAvailable, nwayConf); } private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, - int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, long estimatedTableSize, - long keyCount, long memoryAvailable, HybridHashTableConf nwayConf) + int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, float probePercent, + long estimatedTableSize, long keyCount, long memoryAvailable, HybridHashTableConf nwayConf) throws SerDeException, IOException { directWriteHelper = new MapJoinBytesTableContainer.DirectKeyValueWriter(); @@ -258,14 +259,13 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac keyCountAdj, threshold, loadFactor, keyCount); memoryThreshold = memoryAvailable; - tableRowSize = estimatedTableSize / (keyCount != 0 ? keyCount : 1); + tableRowSize = estimatedTableSize / (newKeyCount != 0 ? newKeyCount : 1); memoryCheckFrequency = memCheckFreq; this.nwayConf = nwayConf; int numPartitions; if (nwayConf == null) { // binary join - numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize, - nwayConf); + numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize); writeBufferSize = (int)(estimatedTableSize / numPartitions); } else { // n-way join // It has been calculated in HashTableLoader earlier, so just need to retrieve that number @@ -302,21 +302,31 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac int numPartitionsSpilledOnCreation = 0; memoryUsed = 0; int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / numPartitions); + // maxCapacity should be calculated based on a percentage of memoryThreshold, which is to divide + // row size using long size + float probePercentage = (float) (8 / (tableRowSize + 8)); // long_size / tableRowSize + long_size + if (probePercentage == 1) { + probePercentage = probePercent; + } + int maxCapacity = (int)(memoryThreshold * probePercentage); for (int i = 0; i < numPartitions; i++) { if (this.nwayConf == null || // binary join nwayConf.getLoadedContainerList().size() == 0) { // n-way join, first (biggest) small table if (i == 0) { // We unconditionally create a hashmap for the first hash partition - hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, memoryThreshold, true); + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, + maxCapacity, true); } else { - hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, memoryThreshold, - memoryUsed + writeBufferSize < memoryThreshold); + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, + maxCapacity, memoryUsed + writeBufferSize < memoryThreshold); } - } else { // n-way join + } else { // n-way join, all later small tables // For all later small tables, follow the same pattern of the previously loaded tables. if (this.nwayConf.doSpillOnCreation(i)) { - hashPartitions[i] = new HashPartition(threshold, loadFactor, writeBufferSize, memoryThreshold, false); + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, + maxCapacity, false); } else { - hashPartitions[i] = new HashPartition(threshold, loadFactor, writeBufferSize, memoryThreshold, true); + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, + maxCapacity, true); } } @@ -422,6 +432,10 @@ private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer(); kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue); } else { + if (hashPartition.hashMap.getWriteBuffers() == null) { + // Allocate write buffer only when needed, i.e. now + hashPartition.hashMap.allocateWriteBuffers(writeBufferSize); + } hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation totalInMemRowCount++; @@ -545,11 +559,10 @@ public long spillPartition(int partitionId) throws IOException { * @param dataSize total data size for the table * @param minNumParts minimum required number of partitions * @param minWbSize minimum required write buffer size - * @param nwayConf the n-way join configuration * @return number of partitions needed */ public static int calcNumPartitions(long memoryThreshold, long dataSize, int minNumParts, - int minWbSize, HybridHashTableConf nwayConf) throws IOException { + int minWbSize) throws IOException { int numPartitions = minNumParts; if (memoryThreshold < minNumParts * minWbSize) { @@ -803,6 +816,10 @@ public ReusableRowContainer() { return JoinUtil.JoinResult.SPILL; } else { + if (hashPartitions[partitionId].hashMap.getWriteBuffers() == null) { + aliasFilter = (byte) 0xff; + return JoinUtil.JoinResult.NOMATCH; + } aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(output.getData(), 0, output.getLength(), hashMapResult); dummyRow = null; if (hashMapResult.hasRows()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index 5df8e2b..5e2c40e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -99,7 +99,7 @@ private MapJoinBytesTableContainer(float keyCountAdj, int threshold, float loadF throws SerDeException { int newThreshold = HashMapWrapper.calculateTableSize( keyCountAdj, threshold, loadFactor, keyCount); - hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage); + hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage, false); directWriteHelper = new DirectKeyValueWriter(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 2b6571b..9310650 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -128,11 +128,9 @@ public void load(MapJoinTableContainer[] mapJoinTables, long memory = tableMemorySizes.get(biggest); int numPartitions = 0; try { - numPartitions = HybridHashTableContainer.calcNumPartitions(memory, - maxSize, + numPartitions = HybridHashTableContainer.calcNumPartitions(memory, maxSize, HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), - nwayConf); + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE)); } catch (IOException e) { throw new HiveException(e); }