diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index f2f3c09..327e227 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -87,6 +87,7 @@ private boolean lastPartitionInMem; // only one (last one) partition is left in memory private final int memoryCheckFrequency; // how often (# of rows apart) to check if memory is full private HybridHashTableConf nwayConf; // configuration for n-way join + private int writeBufferSize; // write buffer size for BytesBytesMultiHashMap /** The OI used to deserialize values. We never deserialize keys. */ private LazyBinaryStructObjectInspector internalValueOi; @@ -282,7 +283,6 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac memoryCheckFrequency = memCheckFreq; this.nwayConf = nwayConf; - int writeBufferSize; int numPartitions; if (nwayConf == null) { // binary join numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize); @@ -404,6 +404,11 @@ public long refreshMemoryUsed() { for (HashPartition hp : hashPartitions) { if (hp.hashMap != null) { memUsed += hp.hashMap.memorySize(); + } else { + // also include the still-in-memory sidefile, before it has been truely spilled + if (hp.sidefileKVContainer != null) { + memUsed += hp.sidefileKVContainer.numRowsInReadBuffer() * tableRowSize; + } } } return memoryUsed = memUsed; @@ -459,15 +464,14 @@ private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, bloom1.addLong(keyHash); - if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { + if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { // destination on disk KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer(); kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue); - } else { - hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation - totalInMemRowCount++; - - if ((totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0 && // check periodically - !lastPartitionInMem) { // If this is the only partition in memory, proceed without check + } else { // destination in memory + if (!lastPartitionInMem && // If this is the only partition in memory, proceed without check + (hashPartition.size() == 0 || // Destination partition being empty indicates a write buffer + // will be allocated, thus need to check if memory is full + (totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0)) { // check periodically if (isMemoryFull()) { if ((numPartitionsSpilled == hashPartitions.length - 1) ) { LOG.warn("This LAST partition in memory won't be spilled!"); @@ -477,6 +481,13 @@ private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, int biggest = biggestPartition(); spillPartition(biggest); this.setSpill(true); + if (partitionId == biggest) { // destination hash partition has just be spilled + KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer(); + kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue); + } else { // still in memory + hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation + totalInMemRowCount++; + } } else { // n-way join LOG.info("N-way spilling: spill tail partition from previously loaded small tables"); memoryThreshold += nwayConf.spill(); @@ -484,7 +495,13 @@ private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, } numPartitionsSpilled++; } + } else { // We checked and found memory is not full + hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation + totalInMemRowCount++; } + } else { // Not time for checking memory yet + hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation + totalInMemRowCount++; } } @@ -511,11 +528,13 @@ public boolean isHashMapSpilledOnCreation(int partitionId) { } /** - * Check if the memory threshold is reached + * Check if the memory threshold is about to be reached. + * Since all the write buffer will be lazily allocated in BytesBytesMultiHashMap, we need to + * consider those as well. * @return true if memory is full, false if not */ private boolean isMemoryFull() { - return refreshMemoryUsed() >= memoryThreshold; + return refreshMemoryUsed() + writeBufferSize * hashPartitions.length >= memoryThreshold; } /** @@ -558,6 +577,7 @@ public long spillPartition(int partitionId) throws IOException { new com.esotericsoftware.kryo.io.Output(outputStream); Kryo kryo = SerializationUtilities.borrowKryo(); try { + LOG.info("Trying to spill hash partition " + partitionId + " ..."); kryo.writeObject(output, partition.hashMap); // use Kryo to serialize hashmap output.close(); outputStream.close(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java index d403c58..ac101dd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java @@ -216,6 +216,10 @@ public boolean hasNext() { return row; } + public int numRowsInReadBuffer() { + return rowsInReadBuffer; + } + public int size() { return rowsInReadBuffer + rowsOnDisk; }