diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 15cafdd..a40f0a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -55,6 +55,8 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -176,8 +178,7 @@ protected HashTableLoader getHashTableLoader(Configuration hconf) { } }); result.add(future); - } else if (mapContext == null || mapContext.getLocalWork() == null - || mapContext.getLocalWork().getInputFileChangeSensitive() == false) { + } else if (!isInputFileChangeSensitive(mapContext)) { loadHashTable(mapContext, mrContext); hashTblInitedOnce = true; } @@ -276,9 +277,7 @@ public void generateMapMetaData() throws HiveException { ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { loadCalled = true; - if (this.hashTblInitedOnce - && ((mapContext == null) || (mapContext.getLocalWork() == null) || (mapContext - .getLocalWork().getInputFileChangeSensitive() == false))) { + if (canSkipReload(mapContext)) { // no need to reload return new ImmutablePair( mapJoinTables, mapJoinTableSerdes); @@ -306,6 +305,11 @@ public void generateMapMetaData() throws HiveException { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); + if (canSkipJoinProcessing(mapContext)) { + LOG.info("Skipping big table join processing for " + this.toString()); + this.setDone(true); + } + return pair; } @@ -611,7 +615,7 @@ protected void reloadHashTable(byte pos, int partitionId) } container.setTotalInMemRowCount(container.getTotalInMemRowCount() - + restoredHashMap.getNumValues() + kvContainer.size()); + + restoredHashMap.getNumValues()); kvContainer.clear(); spilledMapJoinTables[pos] = new MapJoinBytesTableContainer(restoredHashMap); @@ -656,4 +660,47 @@ static public String getOperatorName() { public OperatorType getType() { return OperatorType.MAPJOIN; } + + protected boolean isInputFileChangeSensitive(ExecMapperContext mapContext) { + return !(mapContext == null + || mapContext.getLocalWork() == null + || mapContext.getLocalWork().getInputFileChangeSensitive() == false); + } + + protected boolean canSkipReload(ExecMapperContext mapContext) { + return (this.hashTblInitedOnce && !isInputFileChangeSensitive(mapContext)); + } + + // If the loaded hash table is empty, for some conditions we can skip processing the big table rows. + protected boolean canSkipJoinProcessing(ExecMapperContext mapContext) { + if (!canSkipReload(mapContext)) { + return false; + } + + JoinCondDesc[] joinConds = getConf().getConds(); + if (joinConds.length > 0) { + for (JoinCondDesc joinCond : joinConds) { + if (joinCond.getType() != JoinDesc.INNER_JOIN) { + return false; + } + } + } else { + return false; + } + + boolean skipJoinProcessing = false; + for (int idx = 0; idx < mapJoinTables.length; ++idx) { + if (idx == getConf().getPosBigTable()) { + continue; + } + MapJoinTableContainer mapJoinTable = mapJoinTables[idx]; + if (mapJoinTable.size() == 0) { + // If any table is empty, an inner join involving the tables should yield 0 rows. + LOG.info("Hash table number " + idx + " is empty"); + skipJoinProcessing = true; + break; + } + } + return skipJoinProcessing; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index e338a31..0a6461f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -110,6 +110,7 @@ int threshold; // Used to create an empty BytesBytesMultiHashMap float loadFactor; // Same as above int wbSize; // Same as above + int rowsOnDisk; // How many rows saved to the on-disk hashmap (if on disk) /* It may happen that there's not enough memory to instantiate a hashmap for the partition. * In that case, we don't create the hashmap, but pretend the hashmap is directly "spilled". @@ -149,6 +150,10 @@ public BytesBytesMultiHashMap getHashMapFromDisk(int initialCapacity) restoredHashMap.expandAndRehashToTarget(initialCapacity); } + // some bookkeeping + rowsOnDisk = 0; + hashMapOnDisk = false; + input.close(); inputStream.close(); Files.delete(hashMapLocalPath); @@ -197,6 +202,8 @@ public void clear() { } catch (Throwable ignored) { } hashMapLocalPath = null; + rowsOnDisk = 0; + hashMapOnDisk = false; } if (sidefileKVContainer != null) { @@ -214,6 +221,16 @@ public void clear() { matchfileRowBytesContainer = null; } } + + public int size() { + if (isHashMapOnDisk()) { + // Rows are in a combination of the on-disk hashmap and the sidefile + return rowsOnDisk + (sidefileKVContainer != null ? sidefileKVContainer.size() : 0); + } else { + // All rows should be in the in-memory hashmap + return hashMap.size(); + } + } } public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable, @@ -507,6 +524,7 @@ public long spillPartition(int partitionId) throws IOException { memoryUsed -= memFreed; LOG.info("Memory usage after spilling: " + memoryUsed); + partition.rowsOnDisk = inMemRowCount; totalInMemRowCount -= inMemRowCount; partition.hashMap.clear(); return memFreed; @@ -959,4 +977,13 @@ public void dumpStats() { numPartitionsInMem + " partitions in memory have been processed; " + numPartitionsOnDisk + " partitions have been spilled to disk and will be processed next."); } + + @Override + public int size() { + int totalSize = 0; + for (HashPartition hashPartition : hashPartitions) { + totalSize += hashPartition.size(); + } + return totalSize; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index 83a1521..5df8e2b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -736,4 +736,9 @@ public void dumpMetrics() { public boolean hasSpill() { return false; } + + @Override + public int size() { + return hashMap.size(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java index 9d8cbcb..869aefd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java @@ -106,4 +106,9 @@ MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentKey, * This is only applicable for HybridHashTableContainer. */ boolean hasSpill(); + + /** + * Return the size of the hash table + */ + int size(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java index fbe6b4c..666d666 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java @@ -65,4 +65,9 @@ public VectorMapJoinFastHashTable( this.loadFactor = loadFactor; this.writeBuffersSize = writeBuffersSize; } + + @Override + public int size() { + return keysAssigned; + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java index 4b1d6f6..f2080f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java @@ -213,6 +213,11 @@ public boolean hasSpill() { return false; } + @Override + public int size() { + return VectorMapJoinFastHashTable.size(); + } + /* @Override public com.esotericsoftware.kryo.io.Output getHybridBigTableSpillOutput(int partitionId) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java index 7e219ec..c7e585c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java @@ -40,4 +40,8 @@ void putRow(BytesWritable currentKey, BytesWritable currentValue) throws SerDeException, HiveException, IOException; + /** + * Get hash table size + */ + int size(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java index a2d4e4c..b2b86d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java @@ -92,4 +92,8 @@ public VectorMapJoinOptimizedHashTable( adapatorDirectAccess = (ReusableGetAdaptorDirectAccess) hashMapRowGetter; } + @Override + public int size() { + return originalTableContainer.size(); + } }