diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index f2f3c09..2cb2be8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -87,6 +87,7 @@ private boolean lastPartitionInMem; // only one (last one) partition is left in memory private final int memoryCheckFrequency; // how often (# of rows apart) to check if memory is full private HybridHashTableConf nwayConf; // configuration for n-way join + private int writeBufferSize; // write buffer size for BytesBytesMultiHashMap /** The OI used to deserialize values. We never deserialize keys. */ private LazyBinaryStructObjectInspector internalValueOi; @@ -282,7 +283,6 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac memoryCheckFrequency = memCheckFreq; this.nwayConf = nwayConf; - int writeBufferSize; int numPartitions; if (nwayConf == null) { // binary join numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize); @@ -404,6 +404,11 @@ public long refreshMemoryUsed() { for (HashPartition hp : hashPartitions) { if (hp.hashMap != null) { memUsed += hp.hashMap.memorySize(); + } else { + // also include the still-in-memory sidefile, before it has been truely spilled + if (hp.sidefileKVContainer != null) { + memUsed += hp.sidefileKVContainer.numRowsInReadBuffer() * tableRowSize; + } } } return memoryUsed = memUsed; @@ -452,6 +457,8 @@ public MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentK private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, Writable currentKey, Writable currentValue) throws SerDeException, IOException { + boolean putToSidefile = false; // by default we put row into partition in memory + // Next, put row into corresponding hash partition int keyHash = keyValueHelper.getHashFromKey(); int partitionId = keyHash & (hashPartitions.length - 1); @@ -459,15 +466,13 @@ private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, bloom1.addLong(keyHash); - if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { - KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer(); - kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue); - } else { - hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation - totalInMemRowCount++; - - if ((totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0 && // check periodically - !lastPartitionInMem) { // If this is the only partition in memory, proceed without check + if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { // destination on disk + putToSidefile = true; + } else { // destination in memory + if (!lastPartitionInMem && // If this is the only partition in memory, proceed without check + (hashPartition.size() == 0 || // Destination partition being empty indicates a write buffer + // will be allocated, thus need to check if memory is full + (totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0)) { // check periodically if (isMemoryFull()) { if ((numPartitionsSpilled == hashPartitions.length - 1) ) { LOG.warn("This LAST partition in memory won't be spilled!"); @@ -477,9 +482,16 @@ private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, int biggest = biggestPartition(); spillPartition(biggest); this.setSpill(true); + if (partitionId == biggest) { // destination hash partition has just be spilled + putToSidefile = true; + } } else { // n-way join LOG.info("N-way spilling: spill tail partition from previously loaded small tables"); + int biggest = nwayConf.getNextSpillPartition(); memoryThreshold += nwayConf.spill(); + if (biggest != 0 && partitionId == biggest) { // destination hash partition has just be spilled + putToSidefile = true; + } LOG.info("Memory threshold has been increased to: " + memoryThreshold); } numPartitionsSpilled++; @@ -488,6 +500,15 @@ private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, } } + // Now we know where to put row + if (putToSidefile) { + KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer(); + kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue); + } else { + hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation + totalInMemRowCount++; + } + return null; // there's no key to return } @@ -511,11 +532,13 @@ public boolean isHashMapSpilledOnCreation(int partitionId) { } /** - * Check if the memory threshold is reached + * Check if the memory threshold is about to be reached. + * Since all the write buffer will be lazily allocated in BytesBytesMultiHashMap, we need to + * consider those as well. * @return true if memory is full, false if not */ private boolean isMemoryFull() { - return refreshMemoryUsed() >= memoryThreshold; + return refreshMemoryUsed() + writeBufferSize * hashPartitions.length >= memoryThreshold; } /** @@ -558,6 +581,7 @@ public long spillPartition(int partitionId) throws IOException { new com.esotericsoftware.kryo.io.Output(outputStream); Kryo kryo = SerializationUtilities.borrowKryo(); try { + LOG.info("Trying to spill hash partition " + partitionId + " ..."); kryo.writeObject(output, partition.hashMap); // use Kryo to serialize hashmap output.close(); outputStream.close(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java index d403c58..ac101dd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java @@ -216,6 +216,10 @@ public boolean hasNext() { return row; } + public int numRowsInReadBuffer() { + return rowsInReadBuffer; + } + public int size() { return rowsInReadBuffer + rowsOnDisk; }