diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index da171b1..d7995f9 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -767,6 +767,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", true, "Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,\n" + "because memory-optimized hashtable cannot be serialized."), + HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT("hive.mapjoin.optimized.hashtable.probe.percent", + (float) 0.5, "Probing space percentage of the optimized hashtable"), HIVEUSEHYBRIDGRACEHASHJOIN("hive.mapjoin.hybridgrace.hashtable", true, "Whether to use hybrid" + "grace hash join as the join method for mapjoin. Tez only."), HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ("hive.mapjoin.hybridgrace.memcheckfrequency", 1024, "For " + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 3bba890..77c7ead 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -153,9 +153,11 @@ /** 8 Gb of refs is the max capacity if memory limit is not specified. If someone has 100s of * Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */ private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024; + /** Make sure maxCapacity has a lower limit */ + private final static int DEFAULT_MIN_MAX_CAPACITY = 16 * 1024 * 1024; public BytesBytesMultiHashMap(int initialCapacity, - float loadFactor, int wbSize, long memUsage) { + float loadFactor, int wbSize, long maxProbeSize) { if (loadFactor < 0 || loadFactor > 1) { throw new AssertionError("Load factor must be between (0, 1]."); } @@ -163,8 +165,11 @@ public BytesBytesMultiHashMap(int initialCapacity, initialCapacity = (Long.bitCount(initialCapacity) == 1) ? initialCapacity : nextHighestPowerOfTwo(initialCapacity); // 8 bytes per long in the refs, assume data will be empty. This is just a sanity check. - int maxCapacity = (memUsage <= 0) ? DEFAULT_MAX_CAPACITY - : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8); + int maxCapacity = (maxProbeSize <= 0) ? DEFAULT_MAX_CAPACITY + : (int)Math.min((long)DEFAULT_MAX_CAPACITY, maxProbeSize / 8); + if (maxCapacity < DEFAULT_MIN_MAX_CAPACITY) { + maxCapacity = DEFAULT_MIN_MAX_CAPACITY; + } if (maxCapacity < initialCapacity || initialCapacity <= 0) { // Either initialCapacity is too large, or nextHighestPowerOfTwo overflows initialCapacity = (Long.bitCount(maxCapacity) == 1) diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index ff64f52..9a9b163 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -107,7 +107,7 @@ Path hashMapLocalPath; // Local file system path for spilled hashMap boolean hashMapOnDisk; // Status of hashMap. true: on disk, false: in memory boolean hashMapSpilledOnCreation; // When there's no enough memory, cannot create hashMap - int threshold; // Used to create an empty BytesBytesMultiHashMap + int initialCapacity; // Used to create an empty BytesBytesMultiHashMap float loadFactor; // Same as above int wbSize; // Same as above int rowsOnDisk; // How many rows saved to the on-disk hashmap (if on disk) @@ -115,17 +115,17 @@ /* It may happen that there's not enough memory to instantiate a hashmap for the partition. * In that case, we don't create the hashmap, but pretend the hashmap is directly "spilled". */ - public HashPartition(int threshold, float loadFactor, int wbSize, long memUsage, + public HashPartition(int initialCapacity, float loadFactor, int wbSize, long maxProbeSize, boolean createHashMap) { if (createHashMap) { - // Hash map should be at least the size of our designated wbSize - memUsage = Math.max(memUsage, wbSize); - hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize, memUsage); + // Probe space should be at least equal to the size of our designated wbSize + maxProbeSize = Math.max(maxProbeSize, wbSize); + hashMap = new BytesBytesMultiHashMap(initialCapacity, loadFactor, wbSize, maxProbeSize); } else { hashMapSpilledOnCreation = true; hashMapOnDisk = true; } - this.threshold = threshold; + this.initialCapacity = initialCapacity; this.loadFactor = loadFactor; this.wbSize = wbSize; } @@ -138,18 +138,18 @@ public BytesBytesMultiHashMap getHashMapFromMemory() { /* Restore the hashmap from disk by deserializing it. * Currently Kryo is used for this purpose. */ - public BytesBytesMultiHashMap getHashMapFromDisk(int initialCapacity) + public BytesBytesMultiHashMap getHashMapFromDisk(int rowCount) throws IOException, ClassNotFoundException { if (hashMapSpilledOnCreation) { - return new BytesBytesMultiHashMap(Math.max(threshold, initialCapacity) , loadFactor, wbSize, -1); + return new BytesBytesMultiHashMap(Math.max(this.initialCapacity, rowCount) , loadFactor, wbSize, -1); } else { InputStream inputStream = Files.newInputStream(hashMapLocalPath); com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream); Kryo kryo = Utilities.runtimeSerializationKryo.get(); BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class); - if (initialCapacity > 0) { - restoredHashMap.expandAndRehashToTarget(initialCapacity); + if (rowCount > 0) { + restoredHashMap.expandAndRehashToTarget(rowCount); } // some bookkeeping @@ -245,12 +245,13 @@ public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryA HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), + HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT), estimatedTableSize, keyCount, memoryAvailable, nwayConf); } private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, - int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, long estimatedTableSize, - long keyCount, long memoryAvailable, HybridHashTableConf nwayConf) + int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, float probePercent, + long estimatedTableSize, long keyCount, long memoryAvailable, HybridHashTableConf nwayConf) throws SerDeException, IOException { directWriteHelper = new MapJoinBytesTableContainer.DirectKeyValueWriter(); @@ -258,14 +259,13 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac keyCountAdj, threshold, loadFactor, keyCount); memoryThreshold = memoryAvailable; - tableRowSize = estimatedTableSize / (keyCount != 0 ? keyCount : 1); + tableRowSize = estimatedTableSize / (newKeyCount != 0 ? newKeyCount : 1); memoryCheckFrequency = memCheckFreq; this.nwayConf = nwayConf; int numPartitions; if (nwayConf == null) { // binary join - numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize, - nwayConf); + numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize); writeBufferSize = (int)(estimatedTableSize / numPartitions); } else { // n-way join // It has been calculated in HashTableLoader earlier, so just need to retrieve that number @@ -302,21 +302,31 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac int numPartitionsSpilledOnCreation = 0; memoryUsed = 0; int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / numPartitions); + // maxCapacity should be calculated based on a percentage of memoryThreshold, which is to divide + // row size using long size + float probePercentage = (float) (8 / (tableRowSize + 8)); // long_size / tableRowSize + long_size + if (probePercentage == 1) { + probePercentage = probePercent; + } + int maxCapacity = (int)(memoryThreshold * probePercentage); for (int i = 0; i < numPartitions; i++) { if (this.nwayConf == null || // binary join nwayConf.getLoadedContainerList().size() == 0) { // n-way join, first (biggest) small table if (i == 0) { // We unconditionally create a hashmap for the first hash partition - hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, memoryThreshold, true); + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, + maxCapacity, true); } else { - hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, memoryThreshold, - memoryUsed + writeBufferSize < memoryThreshold); + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, + maxCapacity, memoryUsed + writeBufferSize < memoryThreshold); } - } else { // n-way join + } else { // n-way join, all later small tables // For all later small tables, follow the same pattern of the previously loaded tables. if (this.nwayConf.doSpillOnCreation(i)) { - hashPartitions[i] = new HashPartition(threshold, loadFactor, writeBufferSize, memoryThreshold, false); + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, + maxCapacity, false); } else { - hashPartitions[i] = new HashPartition(threshold, loadFactor, writeBufferSize, memoryThreshold, true); + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, + maxCapacity, true); } } @@ -545,11 +555,10 @@ public long spillPartition(int partitionId) throws IOException { * @param dataSize total data size for the table * @param minNumParts minimum required number of partitions * @param minWbSize minimum required write buffer size - * @param nwayConf the n-way join configuration * @return number of partitions needed */ public static int calcNumPartitions(long memoryThreshold, long dataSize, int minNumParts, - int minWbSize, HybridHashTableConf nwayConf) throws IOException { + int minWbSize) throws IOException { int numPartitions = minNumParts; if (memoryThreshold < minNumParts * minWbSize) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 2b6571b..9310650 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -128,11 +128,9 @@ public void load(MapJoinTableContainer[] mapJoinTables, long memory = tableMemorySizes.get(biggest); int numPartitions = 0; try { - numPartitions = HybridHashTableContainer.calcNumPartitions(memory, - maxSize, + numPartitions = HybridHashTableContainer.calcNumPartitions(memory, maxSize, HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), - nwayConf); + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE)); } catch (IOException e) { throw new HiveException(e); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java index 05d9359..15308b4 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java +++ serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java @@ -61,7 +61,6 @@ public WriteBuffers(int wbSize, long maxSize) { this.offsetMask = this.wbSize - 1; this.maxSize = maxSize; writePos.bufferIndex = -1; - nextBufferToWrite(); } public int readVInt() { @@ -202,11 +201,17 @@ public void write(int b) { @Override public void write(byte[] b) { + if (writePos.bufferIndex == -1) { + nextBufferToWrite(); + } write(b, 0, b.length); } @Override public void write(byte[] b, int off, int len) { + if (writePos.bufferIndex == -1) { + nextBufferToWrite(); + } int srcOffset = 0; while (srcOffset < len) { int toWrite = Math.min(len - srcOffset, wbSize - writePos.offset); @@ -355,6 +360,9 @@ private void clearState() { public long getWritePoint() { + if (writePos.bufferIndex == -1) { + nextBufferToWrite(); + } return ((long)writePos.bufferIndex << wbSizeLog2) + writePos.offset; }