diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index e79fccd..ff292a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -83,7 +83,6 @@ private transient ObjectCache cache; protected HashTableLoader loader; - private boolean loadCalled; protected transient MapJoinTableContainer[] mapJoinTables; private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes; @@ -206,7 +205,7 @@ protected void completeInitializationOp(Object[] os) throws HiveException { } } - if (!loadCalled && spilled) { + if (spilled) { // we can't use the cached table because it has spilled. loadHashTable(getExecContext(), MapredContext.get()); @@ -290,8 +289,6 @@ public void generateMapMetaData() throws HiveException { protected Pair loadHashTable( ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { - loadCalled = true; - if (canSkipReload(mapContext)) { // no need to reload return new ImmutablePair( @@ -588,6 +585,7 @@ private void continueProcess(int partitionId) throws HiveException, IOException, SerDeException, ClassNotFoundException { for (byte pos = 0; pos < mapJoinTables.length; pos++) { if (pos != conf.getPosBigTable()) { + LOG.info("Going to reload hash partition " + partitionId); reloadHashTable(pos, partitionId); } } @@ -612,6 +610,7 @@ protected void reloadHashTable(byte pos, int partitionId) // Merge the sidefile into the newly created hash table // This is where the spilling may happen again + LOG.info("Going to restore sidefile..."); KeyValueContainer kvContainer = partition.getSidefileKVContainer(); int rowCount = kvContainer.size(); LOG.info("Hybrid Grace Hash Join: Number of rows restored from KeyValueContainer: " + @@ -624,6 +623,7 @@ protected void reloadHashTable(byte pos, int partitionId) // as the initialCapacity which cannot be 0, we provide a reasonable // positive number here } + LOG.info("Going to restore hashmap..."); BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk(rowCount); rowCount += restoredHashMap.getNumValues(); LOG.info("Hybrid Grace Hash Join: Deserializing spilled hash partition..."); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index bb35bae..233f66b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -315,12 +315,12 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac } else { LOG.info("Total available memory was: " + memoryThreshold); memoryThreshold += memFreed; - LOG.info("Total available memory is: " + memoryThreshold); } } writeBufferSize = (int)(memoryThreshold / numPartitions); } } + LOG.info("Total available memory is: " + memoryThreshold); // Round to power of 2 here, as is required by WriteBuffers writeBufferSize = Integer.bitCount(writeBufferSize) == 1 ? @@ -341,7 +341,7 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac hashPartitions = new HashPartition[numPartitions]; int numPartitionsSpilledOnCreation = 0; - memoryUsed = 0; + memoryUsed = bloom1.sizeInBytes(); int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / numPartitions); // maxCapacity should be calculated based on a percentage of memoryThreshold, which is to divide // row size using long size @@ -356,6 +356,7 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac if (i == 0) { // We unconditionally create a hashmap for the first hash partition hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, maxCapacity, true, spillLocalDirs); + LOG.info("Each new partition will require memory: " + hashPartitions[0].hashMap.memorySize()); } else { // To check whether we have enough memory to allocate for another hash partition, // we need to get the size of the first hash partition to get an idea. @@ -381,8 +382,10 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac if (this.nwayConf != null && this.nwayConf.getNextSpillPartition() == numPartitions - 1) { this.nwayConf.setNextSpillPartition(i - 1); } + LOG.info("Hash partition " + i + " is spilled on creation."); } else { memoryUsed += hashPartitions[i].hashMap.memorySize(); + LOG.info("Hash partition " + i + " is created in memory. Total memory usage so far: " + memoryUsed); } } @@ -419,8 +422,8 @@ public long getMemoryThreshold() { * Get the current memory usage by recalculating it. * @return current memory usage */ - public long refreshMemoryUsed() { - long memUsed = 0; + private long refreshMemoryUsed() { + long memUsed = bloom1.sizeInBytes(); for (HashPartition hp : hashPartitions) { if (hp.hashMap != null) { memUsed += hp.hashMap.memorySize(); @@ -544,6 +547,7 @@ public boolean isHashMapSpilledOnCreation(int partitionId) { * Check if the memory threshold is about to be reached. * Since all the write buffer will be lazily allocated in BytesBytesMultiHashMap, we need to * consider those as well. + * We also need to count in the next 1024 rows to be loaded. * @return true if memory is full, false if not */ private boolean isMemoryFull() { @@ -555,7 +559,8 @@ private boolean isMemoryFull() { } } - return refreshMemoryUsed() + writeBufferSize * numPartitionsInMem >= memoryThreshold; + return refreshMemoryUsed() + this.memoryCheckFrequency * getTableRowSize() + + writeBufferSize * numPartitionsInMem >= memoryThreshold; } /** @@ -636,6 +641,7 @@ public long spillPartition(int partitionId) throws IOException { partition.rowsOnDisk = inMemRowCount; totalInMemRowCount -= inMemRowCount; partition.hashMap.clear(); + partition.hashMap = null; return memFreed; } @@ -657,8 +663,9 @@ public static int calcNumPartitions(long memoryThreshold, long dataSize, int min if (memoryThreshold < minNumParts * minWbSize) { LOG.warn("Available memory is not enough to create a HybridHashTableContainer!"); } - if (memoryThreshold < dataSize) { - while (dataSize / numPartitions > memoryThreshold) { + + if (memoryThreshold / 2 < dataSize) { // The divided-by-2 logic is consistent to MapJoinOperator.reloadHashTable + while (dataSize / numPartitions > memoryThreshold / 2) { numPartitions *= 2; } } @@ -708,8 +715,10 @@ public int getToSpillPartitionId() { @Override public void clear() { - for (HashPartition hp : hashPartitions) { + for (int i = 0; i < hashPartitions.length; i++) { + HashPartition hp = hashPartitions[i]; if (hp != null) { + LOG.info("Going to clear hash partition " + i); hp.clear(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index a742458..7b13e90 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -116,6 +116,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, if (useHybridGraceHashJoin && mapJoinTables.length > 2) { // Create a Conf for n-way HybridHashTableContainers nwayConf = new HybridHashTableConf(); + LOG.info("N-way join: " + (mapJoinTables.length - 1) + " small tables."); // Find the biggest small table; also calculate total data size of all small tables long maxSize = Long.MIN_VALUE; // the size of the biggest small table @@ -212,6 +213,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, (Writable)kvReader.getCurrentKey(), (Writable)kvReader.getCurrentValue()); } tableContainer.seal(); + LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos); mapJoinTables[pos] = tableContainer; } catch (Exception e) { throw new HiveException(e);