diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a00079..f83193a 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -767,6 +767,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", true, "Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,\n" + "because memory-optimized hashtable cannot be serialized."), + HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT("hive.mapjoin.optimized.hashtable.probe.percent", + (float) 0.5, "Probing space percentage of the optimized hashtable"), HIVEUSEHYBRIDGRACEHASHJOIN("hive.mapjoin.hybridgrace.hashtable", true, "Whether to use hybrid" + "grace hash join as the join method for mapjoin. Tez only."), HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ("hive.mapjoin.hybridgrace.memcheckfrequency", 1024, "For " + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 3bba890..77c7ead 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -153,9 +153,11 @@ /** 8 Gb of refs is the max capacity if memory limit is not specified. If someone has 100s of * Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */ private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024; + /** Make sure maxCapacity has a lower limit */ + private final static int DEFAULT_MIN_MAX_CAPACITY = 16 * 1024 * 1024; public BytesBytesMultiHashMap(int initialCapacity, - float loadFactor, int wbSize, long memUsage) { + float loadFactor, int wbSize, long maxProbeSize) { if (loadFactor < 0 || loadFactor > 1) { throw new AssertionError("Load factor must be between (0, 1]."); } @@ -163,8 +165,11 @@ public BytesBytesMultiHashMap(int initialCapacity, initialCapacity = (Long.bitCount(initialCapacity) == 1) ? initialCapacity : nextHighestPowerOfTwo(initialCapacity); // 8 bytes per long in the refs, assume data will be empty. This is just a sanity check. - int maxCapacity = (memUsage <= 0) ? DEFAULT_MAX_CAPACITY - : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8); + int maxCapacity = (maxProbeSize <= 0) ? DEFAULT_MAX_CAPACITY + : (int)Math.min((long)DEFAULT_MAX_CAPACITY, maxProbeSize / 8); + if (maxCapacity < DEFAULT_MIN_MAX_CAPACITY) { + maxCapacity = DEFAULT_MIN_MAX_CAPACITY; + } if (maxCapacity < initialCapacity || initialCapacity <= 0) { // Either initialCapacity is too large, or nextHighestPowerOfTwo overflows initialCapacity = (Long.bitCount(maxCapacity) == 1) diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index ff64f52..52c02ae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -76,7 +76,6 @@ private int totalInMemRowCount = 0; // total number of small table rows in memory private long memoryThreshold; // the max memory limit that can be allocated private long memoryUsed; // the actual memory used - private int writeBufferSize; // write buffer size for this HybridHashTableContainer private final long tableRowSize; // row size of the small table private boolean isSpilled; // whether there's any spilled partition private int toSpillPartitionId; // the partition into which to spill the big table row; @@ -107,7 +106,7 @@ Path hashMapLocalPath; // Local file system path for spilled hashMap boolean hashMapOnDisk; // Status of hashMap. true: on disk, false: in memory boolean hashMapSpilledOnCreation; // When there's no enough memory, cannot create hashMap - int threshold; // Used to create an empty BytesBytesMultiHashMap + int initialCapacity; // Used to create an empty BytesBytesMultiHashMap float loadFactor; // Same as above int wbSize; // Same as above int rowsOnDisk; // How many rows saved to the on-disk hashmap (if on disk) @@ -115,17 +114,17 @@ /* It may happen that there's not enough memory to instantiate a hashmap for the partition. * In that case, we don't create the hashmap, but pretend the hashmap is directly "spilled". */ - public HashPartition(int threshold, float loadFactor, int wbSize, long memUsage, + public HashPartition(int initialCapacity, float loadFactor, int wbSize, long maxProbeSize, boolean createHashMap) { if (createHashMap) { - // Hash map should be at least the size of our designated wbSize - memUsage = Math.max(memUsage, wbSize); - hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize, memUsage); + // Probe space should be at least equal to the size of our designated wbSize + maxProbeSize = Math.max(maxProbeSize, wbSize); + hashMap = new BytesBytesMultiHashMap(initialCapacity, loadFactor, wbSize, maxProbeSize); } else { hashMapSpilledOnCreation = true; hashMapOnDisk = true; } - this.threshold = threshold; + this.initialCapacity = initialCapacity; this.loadFactor = loadFactor; this.wbSize = wbSize; } @@ -138,18 +137,18 @@ public BytesBytesMultiHashMap getHashMapFromMemory() { /* Restore the hashmap from disk by deserializing it. * Currently Kryo is used for this purpose. */ - public BytesBytesMultiHashMap getHashMapFromDisk(int initialCapacity) + public BytesBytesMultiHashMap getHashMapFromDisk(int rowCount) throws IOException, ClassNotFoundException { if (hashMapSpilledOnCreation) { - return new BytesBytesMultiHashMap(Math.max(threshold, initialCapacity) , loadFactor, wbSize, -1); + return new BytesBytesMultiHashMap(rowCount, loadFactor, wbSize, -1); } else { InputStream inputStream = Files.newInputStream(hashMapLocalPath); com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream); Kryo kryo = Utilities.runtimeSerializationKryo.get(); BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class); - if (initialCapacity > 0) { - restoredHashMap.expandAndRehashToTarget(initialCapacity); + if (rowCount > 0) { + restoredHashMap.expandAndRehashToTarget(rowCount); } // some bookkeeping @@ -237,7 +236,7 @@ public int size() { public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable, long estimatedTableSize, HybridHashTableConf nwayConf) - throws SerDeException, IOException { + throws SerDeException, IOException { this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), HiveConf.getFloatVar(hconf,HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), @@ -245,12 +244,13 @@ public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryA HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), + HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT), estimatedTableSize, keyCount, memoryAvailable, nwayConf); } private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, - int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, long estimatedTableSize, - long keyCount, long memoryAvailable, HybridHashTableConf nwayConf) + int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, float probePercent, + long estimatedTableSize, long keyCount, long memoryAvailable, HybridHashTableConf nwayConf) throws SerDeException, IOException { directWriteHelper = new MapJoinBytesTableContainer.DirectKeyValueWriter(); @@ -262,10 +262,10 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac memoryCheckFrequency = memCheckFreq; this.nwayConf = nwayConf; + int writeBufferSize; int numPartitions; if (nwayConf == null) { // binary join - numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize, - nwayConf); + numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize); writeBufferSize = (int)(estimatedTableSize / numPartitions); } else { // n-way join // It has been calculated in HashTableLoader earlier, so just need to retrieve that number @@ -302,21 +302,33 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac int numPartitionsSpilledOnCreation = 0; memoryUsed = 0; int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / numPartitions); + // maxCapacity should be calculated based on a percentage of memoryThreshold, which is to divide + // row size using long size + float probePercentage = (float) 8 / (tableRowSize + 8); // long_size / tableRowSize + long_size + if (probePercentage == 1) { + probePercentage = probePercent; + } + int maxCapacity = (int)(memoryThreshold * probePercentage); for (int i = 0; i < numPartitions; i++) { if (this.nwayConf == null || // binary join nwayConf.getLoadedContainerList().size() == 0) { // n-way join, first (biggest) small table if (i == 0) { // We unconditionally create a hashmap for the first hash partition - hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, memoryThreshold, true); + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, + maxCapacity, true); } else { - hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, memoryThreshold, - memoryUsed + writeBufferSize < memoryThreshold); + // To check whether we have enough memory to allocate for another hash partition, + // we need to get the size of the first hash partition to get an idea. + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, + maxCapacity, memoryUsed + hashPartitions[0].hashMap.memorySize() < memoryThreshold); } - } else { // n-way join + } else { // n-way join, all later small tables // For all later small tables, follow the same pattern of the previously loaded tables. if (this.nwayConf.doSpillOnCreation(i)) { - hashPartitions[i] = new HashPartition(threshold, loadFactor, writeBufferSize, memoryThreshold, false); + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, + maxCapacity, false); } else { - hashPartitions[i] = new HashPartition(threshold, loadFactor, writeBufferSize, memoryThreshold, true); + hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, + maxCapacity, true); } } @@ -513,7 +525,8 @@ public long spillPartition(int partitionId) throws IOException { Path path = Files.createTempFile("partition-" + partitionId + "-", null); OutputStream outputStream = Files.newOutputStream(path); - com.esotericsoftware.kryo.io.Output output = new com.esotericsoftware.kryo.io.Output(outputStream); + com.esotericsoftware.kryo.io.Output output = + new com.esotericsoftware.kryo.io.Output(outputStream); Kryo kryo = Utilities.runtimeSerializationKryo.get(); kryo.writeObject(output, partition.hashMap); // use Kryo to serialize hashmap output.close(); @@ -545,11 +558,10 @@ public long spillPartition(int partitionId) throws IOException { * @param dataSize total data size for the table * @param minNumParts minimum required number of partitions * @param minWbSize minimum required write buffer size - * @param nwayConf the n-way join configuration * @return number of partitions needed */ public static int calcNumPartitions(long memoryThreshold, long dataSize, int minNumParts, - int minWbSize, HybridHashTableConf nwayConf) throws IOException { + int minWbSize) throws IOException { int numPartitions = minNumParts; if (memoryThreshold < minNumParts * minWbSize) { @@ -803,7 +815,8 @@ public ReusableRowContainer() { return JoinUtil.JoinResult.SPILL; } else { - aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(output.getData(), 0, output.getLength(), hashMapResult); + aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(output.getData(), 0, + output.getLength(), hashMapResult); dummyRow = null; if (hashMapResult.hasRows()) { return JoinUtil.JoinResult.MATCH; @@ -941,7 +954,8 @@ public void write(MapJoinObjectSerDeContext valueContext, ObjectOutputStream out return JoinUtil.JoinResult.SPILL; } else { - aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(bytes, offset, length, hashMapResult); + aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(bytes, offset, length, + hashMapResult); dummyRow = null; if (hashMapResult.hasRows()) { return JoinUtil.JoinResult.MATCH; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 2b6571b..f7d165a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -84,6 +84,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, // Get the total available memory from memory manager long totalMapJoinMemory = desc.getMemoryNeeded(); + LOG.info("Memory manager allocates " + totalMapJoinMemory + " bytes for the loading hashtable."); if (totalMapJoinMemory <= 0) { totalMapJoinMemory = HiveConf.getLongVar( hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); @@ -128,11 +129,9 @@ public void load(MapJoinTableContainer[] mapJoinTables, long memory = tableMemorySizes.get(biggest); int numPartitions = 0; try { - numPartitions = HybridHashTableContainer.calcNumPartitions(memory, - maxSize, + numPartitions = HybridHashTableContainer.calcNumPartitions(memory, maxSize, HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), - HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), - nwayConf); + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE)); } catch (IOException e) { throw new HiveException(e); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java index 05d9359..62250ec 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java +++ serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java @@ -61,7 +61,6 @@ public WriteBuffers(int wbSize, long maxSize) { this.offsetMask = this.wbSize - 1; this.maxSize = maxSize; writePos.bufferIndex = -1; - nextBufferToWrite(); } public int readVInt() { @@ -207,6 +206,9 @@ public void write(byte[] b) { @Override public void write(byte[] b, int off, int len) { + if (writePos.bufferIndex == -1) { + nextBufferToWrite(); + } int srcOffset = 0; while (srcOffset < len) { int toWrite = Math.min(len - srcOffset, wbSize - writePos.offset); @@ -355,6 +357,9 @@ private void clearState() { public long getWritePoint() { + if (writePos.bufferIndex == -1) { + nextBufferToWrite(); + } return ((long)writePos.bufferIndex << wbSizeLog2) + writePos.offset; } @@ -498,6 +503,9 @@ private boolean isAllInOneWriteBuffer(int length) { } public void seal() { + if (writePos.bufferIndex == -1) { + return; + } if (writePos.offset < (wbSize * 0.8)) { // arbitrary byte[] smallerBuffer = new byte[writePos.offset]; System.arraycopy(writePos.buffer, 0, smallerBuffer, 0, writePos.offset);