diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 15cafdd..370c99e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -375,6 +375,9 @@ public void process(Object row, int tag) throws HiveException { joinResult = adaptor.setFromOther(firstSetKey); } MapJoinRowContainer rowContainer = adaptor.getCurrentRows(); + if (joinResult != JoinUtil.JoinResult.MATCH) { + assert (rowContainer == null || rowContainer.hasRows() == false) : "Expecting an empty result set for no match"; + } if (rowContainer != null && unwrapContainer[pos] != null) { Object[] currentKey = firstSetKey.getCurrentKey(); rowContainer = unwrapContainer[pos].setInternal(rowContainer, currentKey); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index e338a31..146cecb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; @@ -55,6 +56,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.BloomFilter; import com.esotericsoftware.kryo.Kryo; @@ -91,6 +93,18 @@ private boolean[] sortableSortOrders; private MapJoinBytesTableContainer.KeyValueHelper writeHelper; private MapJoinBytesTableContainer.DirectKeyValueWriter directWriteHelper; + /* + * this is not a real bloom filter, but is a cheap version of the 1-memory + * access bloom filters + * + * In several cases, we'll have map-join spills because the value columns are + * a few hundred columns of Text each, while there are very few keys in total + * (a few thousand). + * + * This is a cheap exit option to prevent spilling the big-table in such a + * scenario. + */ + private transient final BloomFilter bloom1; private final List EMPTY_LIST = new ArrayList(0); @@ -273,7 +287,14 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac // Cap WriteBufferSize to avoid large preallocations writeBufferSize = writeBufferSize < minWbSize ? minWbSize : Math.min(maxWbSize, writeBufferSize); - LOG.info("Write buffer size: " + writeBufferSize); + + this.bloom1 = new BloomFilter(newKeyCount); + + if (LOG.isInfoEnabled()) { + LOG.info(String.format("Using a bloom-1 filter %d keys of size %d bytes", + newKeyCount, bloom1.sizeInBytes())); + LOG.info("Write buffer size: " + writeBufferSize); + } hashPartitions = new HashPartition[numPartitions]; int numPartitionsSpilledOnCreation = 0; @@ -395,6 +416,8 @@ private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, int partitionId = keyHash & (hashPartitions.length - 1); HashPartition hashPartition = hashPartitions[partitionId]; + bloom1.addLong(keyHash); + if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer(); kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue); @@ -770,16 +793,29 @@ public ReusableRowContainer() { */ public JoinUtil.JoinResult setFromOutput(Output output) throws HiveException { int keyHash = WriteBuffers.murmurHash(output.getData(), 0, output.getLength()); + + if (!bloom1.testLong(keyHash)) { + /* + * if the keyHash is missing in the bloom filter, then the value cannot + * exist in any of the spilled containers - return NOMATCH + */ + dummyRow = null; + aliasFilter = (byte) 0xff; + hashMapResult.forget(); + return JoinUtil.JoinResult.NOMATCH; + } + partitionId = keyHash & (hashPartitions.length - 1); - // If the target hash table is on disk, spill this row to disk as well to be processed later + // If the target hash table is on disk, spill this row to disk as well to + // be processed later if (isOnDisk(partitionId)) { toSpillPartitionId = partitionId; hashMapResult.forget(); return JoinUtil.JoinResult.SPILL; - } - else { - aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(output.getData(), 0, output.getLength(), hashMapResult); + } else { + aliasFilter = hashPartitions[partitionId].hashMap.getValueResult( + output.getData(), 0, output.getLength(), hashMapResult); dummyRow = null; if (hashMapResult.hasRows()) { return JoinUtil.JoinResult.MATCH;