diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 15cafdd..31a0c24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -55,6 +55,8 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -176,8 +178,7 @@ protected HashTableLoader getHashTableLoader(Configuration hconf) { } }); result.add(future); - } else if (mapContext == null || mapContext.getLocalWork() == null - || mapContext.getLocalWork().getInputFileChangeSensitive() == false) { + } else if (!isInputFileChangeSensitive(mapContext)) { loadHashTable(mapContext, mrContext); hashTblInitedOnce = true; } @@ -276,9 +277,7 @@ public void generateMapMetaData() throws HiveException { ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { loadCalled = true; - if (this.hashTblInitedOnce - && ((mapContext == null) || (mapContext.getLocalWork() == null) || (mapContext - .getLocalWork().getInputFileChangeSensitive() == false))) { + if (canSkipReload(mapContext)) { // no need to reload return new ImmutablePair( mapJoinTables, mapJoinTableSerdes); @@ -306,6 +305,11 @@ public void generateMapMetaData() throws HiveException { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); + if (canSkipJoinProcessing(mapContext)) { + LOG.info("Skipping big table join processing for " + this.toString()); + this.setDone(true); + } + return pair; } @@ -656,4 +660,47 @@ static public String getOperatorName() { public OperatorType getType() { return OperatorType.MAPJOIN; } + + protected boolean isInputFileChangeSensitive(ExecMapperContext mapContext) { + return !(mapContext == null + || mapContext.getLocalWork() == null + || mapContext.getLocalWork().getInputFileChangeSensitive() == false); + } + + protected boolean canSkipReload(ExecMapperContext mapContext) { + return (this.hashTblInitedOnce && !isInputFileChangeSensitive(mapContext)); + } + + // If the loaded hash table is empty, for some conditions we can skip processing the big table rows. + protected boolean canSkipJoinProcessing(ExecMapperContext mapContext) { + if (!canSkipReload(mapContext)) { + return false; + } + + JoinCondDesc[] joinConds = getConf().getConds(); + if (joinConds.length > 0) { + for (JoinCondDesc joinCond : joinConds) { + if (joinCond.getType() != JoinDesc.INNER_JOIN) { + return false; + } + } + } else { + return false; + } + + boolean skipJoinProcessing = false; + for (int idx = 0; idx < mapJoinTables.length; ++idx) { + if (idx == getConf().getPosBigTable()) { + continue; + } + MapJoinTableContainer mapJoinTable = mapJoinTables[idx]; + if (mapJoinTable.size() == 0) { + // If any table is empty, an inner join involving the tables should yield 0 rows. + LOG.info("Hash table number " + idx + " is empty"); + skipJoinProcessing = true; + break; + } + } + return skipJoinProcessing; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index e338a31..3cf29bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -959,4 +959,16 @@ public void dumpStats() { numPartitionsInMem + " partitions in memory have been processed; " + numPartitionsOnDisk + " partitions have been spilled to disk and will be processed next."); } + + @Override + public int size() { + // Size should be in-memory rows + any spilled rows + int totalSize = getTotalInMemRowCount(); + if (hasSpill()) { + for (HashPartition hashPartition : hashPartitions) { + totalSize += hashPartition.getSidefileKVContainer().size(); + } + } + return totalSize; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index 83a1521..5df8e2b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -736,4 +736,9 @@ public void dumpMetrics() { public boolean hasSpill() { return false; } + + @Override + public int size() { + return hashMap.size(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java index 9d8cbcb..869aefd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java @@ -106,4 +106,9 @@ MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentKey, * This is only applicable for HybridHashTableContainer. */ boolean hasSpill(); + + /** + * Return the size of the hash table + */ + int size(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java index fbe6b4c..666d666 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java @@ -65,4 +65,9 @@ public VectorMapJoinFastHashTable( this.loadFactor = loadFactor; this.writeBuffersSize = writeBuffersSize; } + + @Override + public int size() { + return keysAssigned; + } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java index 4b1d6f6..f2080f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java @@ -213,6 +213,11 @@ public boolean hasSpill() { return false; } + @Override + public int size() { + return VectorMapJoinFastHashTable.size(); + } + /* @Override public com.esotericsoftware.kryo.io.Output getHybridBigTableSpillOutput(int partitionId) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java index 7e219ec..c7e585c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java @@ -40,4 +40,8 @@ void putRow(BytesWritable currentKey, BytesWritable currentValue) throws SerDeException, HiveException, IOException; + /** + * Get hash table size + */ + int size(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java index a2d4e4c..b2b86d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java @@ -92,4 +92,8 @@ public VectorMapJoinOptimizedHashTable( adapatorDirectAccess = (ReusableGetAdaptorDirectAccess) hashMapRowGetter; } + @Override + public int size() { + return originalTableContainer.size(); + } }