diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 02d61eb..ae871b8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -380,6 +380,10 @@ public void process(Object row, int tag) throws HiveException { joinResult = adaptor.setFromOther(firstSetKey); } MapJoinRowContainer rowContainer = adaptor.getCurrentRows(); + if (joinResult != JoinUtil.JoinResult.MATCH) { + assert (rowContainer == null || !rowContainer.hasRows()) : + "Expecting an empty result set for no match"; + } if (rowContainer != null && unwrapContainer[pos] != null) { Object[] currentKey = firstSetKey.getCurrentKey(); rowContainer = unwrapContainer[pos].setInternal(rowContainer, currentKey); @@ -388,9 +392,9 @@ public void process(Object row, int tag) throws HiveException { if (rowContainer == null || firstSetKey.hasAnyNulls(fieldCount, nullsafes)) { if (!noOuterJoin) { // For Hybrid Grace Hash Join, during the 1st round processing, - // we only keep the LEFT side if the row is not spilled - if (!conf.isHybridHashJoin() || hybridMapJoinLeftover - || (!hybridMapJoinLeftover && joinResult != JoinUtil.JoinResult.SPILL)) { + // we only keep the LEFT side if the row is not spilled, or not filtered by BloomFilter + if (!conf.isHybridHashJoin() || hybridMapJoinLeftover || + (joinResult != JoinUtil.JoinResult.SPILL)) { joinNeeded = true; storage[pos] = dummyObjVectors[pos]; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index 52c02ae..625ba39 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; @@ -55,6 +56,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.BloomFilter; import com.esotericsoftware.kryo.Kryo; @@ -90,6 +92,18 @@ private boolean[] sortableSortOrders; private MapJoinBytesTableContainer.KeyValueHelper writeHelper; private MapJoinBytesTableContainer.DirectKeyValueWriter directWriteHelper; + /* + * this is not a real bloom filter, but is a cheap version of the 1-memory + * access bloom filters + * + * In several cases, we'll have map-join spills because the value columns are + * a few hundred columns of Text each, while there are very few keys in total + * (a few thousand). + * + * This is a cheap exit option to prevent spilling the big-table in such a + * scenario. + */ + private transient final BloomFilter bloom1; private final List EMPTY_LIST = new ArrayList(0); @@ -239,11 +253,11 @@ public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryA throws SerDeException, IOException { this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), - HiveConf.getFloatVar(hconf,HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), - HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ), - HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), - HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), - HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), + HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT), estimatedTableSize, keyCount, memoryAvailable, nwayConf); } @@ -296,7 +310,14 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac // Cap WriteBufferSize to avoid large preallocations writeBufferSize = writeBufferSize < minWbSize ? minWbSize : Math.min(maxWbSize, writeBufferSize); - LOG.info("Write buffer size: " + writeBufferSize); + + this.bloom1 = new BloomFilter(newKeyCount); + + if (LOG.isInfoEnabled()) { + LOG.info(String.format("Using a bloom-1 filter %d keys of size %d bytes", + newKeyCount, bloom1.sizeInBytes())); + LOG.info("Write buffer size: " + writeBufferSize); + } hashPartitions = new HashPartition[numPartitions]; int numPartitionsSpilledOnCreation = 0; @@ -430,6 +451,8 @@ private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, int partitionId = keyHash & (hashPartitions.length - 1); HashPartition hashPartition = hashPartitions[partitionId]; + bloom1.addLong(keyHash); + if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer(); kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue); @@ -806,6 +829,18 @@ public ReusableRowContainer() { */ public JoinUtil.JoinResult setFromOutput(Output output) throws HiveException { int keyHash = WriteBuffers.murmurHash(output.getData(), 0, output.getLength()); + + if (!bloom1.testLong(keyHash)) { + /* + * if the keyHash is missing in the bloom filter, then the value cannot + * exist in any of the spilled partition - return NOMATCH + */ + dummyRow = null; + aliasFilter = (byte) 0xff; + hashMapResult.forget(); + return JoinResult.NOMATCH; + } + partitionId = keyHash & (hashPartitions.length - 1); // If the target hash table is on disk, spill this row to disk as well to be processed later