diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f3e2168..2715349 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2154,7 +2154,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_JAR_DIRECTORY("hive.jar.directory", null, "This is the location hive in tez mode will look for to find a site wide \n" + "installed hive instance."), - HIVE_USER_INSTALL_DIR("hive.user.install.directory", "hdfs:///user/", + HIVE_USER_INSTALL_DIR("hive.user.install.directory", "/tmp/", "If hive (in tez mode only) cannot find a usable hive jar in \"hive.jar.directory\", \n" + "it will upload the hive jar to \"hive.user.install.directory/user.name\"\n" + "and use it to run queries."), diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index aab3aab..648780e 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -908,6 +908,11 @@ public String cliInit(String tname, boolean recreate) throws Exception { HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER, "org.apache.hadoop.hive.ql.security.DummyAuthenticator"); Utilities.clearWorkMap(); + conf.setBoolean("tez.local.mode", true); + conf.set("fs.defaultFS", "file:///"); + conf.setBoolean("tez.runtime.optimize.local.fetch", true); + conf.set("tez.staging-dir", "/tmp"); + conf.setBoolean("tez.ignore.lib.uris", true); CliSessionState ss = createSessionState(); assert ss != null; ss.in = System.in; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java index 0aaa51a..3964b7e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java @@ -55,7 +55,8 @@ public static enum JoinResult { MATCH, // A match is found NOMATCH, // No match is found, and the current row will be dropped - SPILL // The current row has been spilled to disk, as the join is postponed + SPILL // The current row has been spilled to disk, as the join is postponed + //FILTERED // The row is ruled out by BloomFilter } public static List[] getObjectInspectorsFromEvaluators( diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 02d61eb..61377b4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -380,6 +380,9 @@ public void process(Object row, int tag) throws HiveException { joinResult = adaptor.setFromOther(firstSetKey); } MapJoinRowContainer rowContainer = adaptor.getCurrentRows(); + if (joinResult != JoinUtil.JoinResult.MATCH) { + assert (rowContainer == null || rowContainer.hasRows() == false) : "Expecting an empty result set for no match"; + } if (rowContainer != null && unwrapContainer[pos] != null) { Object[] currentKey = firstSetKey.getCurrentKey(); rowContainer = unwrapContainer[pos].setInternal(rowContainer, currentKey); @@ -388,9 +391,9 @@ public void process(Object row, int tag) throws HiveException { if (rowContainer == null || firstSetKey.hasAnyNulls(fieldCount, nullsafes)) { if (!noOuterJoin) { // For Hybrid Grace Hash Join, during the 1st round processing, - // we only keep the LEFT side if the row is not spilled - if (!conf.isHybridHashJoin() || hybridMapJoinLeftover - || (!hybridMapJoinLeftover && joinResult != JoinUtil.JoinResult.SPILL)) { + // we only keep the LEFT side if the row is not spilled, or not filtered by BloomFilter + if (!conf.isHybridHashJoin() || hybridMapJoinLeftover || + (joinResult != JoinUtil.JoinResult.SPILL)) { joinNeeded = true; storage[pos] = dummyObjVectors[pos]; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index 52c02ae..625ba39 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.JoinUtil.JoinResult; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; @@ -55,6 +56,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.BloomFilter; import com.esotericsoftware.kryo.Kryo; @@ -90,6 +92,18 @@ private boolean[] sortableSortOrders; private MapJoinBytesTableContainer.KeyValueHelper writeHelper; private MapJoinBytesTableContainer.DirectKeyValueWriter directWriteHelper; + /* + * this is not a real bloom filter, but is a cheap version of the 1-memory + * access bloom filters + * + * In several cases, we'll have map-join spills because the value columns are + * a few hundred columns of Text each, while there are very few keys in total + * (a few thousand). + * + * This is a cheap exit option to prevent spilling the big-table in such a + * scenario. + */ + private transient final BloomFilter bloom1; private final List EMPTY_LIST = new ArrayList(0); @@ -239,11 +253,11 @@ public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryA throws SerDeException, IOException { this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), - HiveConf.getFloatVar(hconf,HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), - HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ), - HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), - HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), - HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), + HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS), HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT), estimatedTableSize, keyCount, memoryAvailable, nwayConf); } @@ -296,7 +310,14 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac // Cap WriteBufferSize to avoid large preallocations writeBufferSize = writeBufferSize < minWbSize ? minWbSize : Math.min(maxWbSize, writeBufferSize); - LOG.info("Write buffer size: " + writeBufferSize); + + this.bloom1 = new BloomFilter(newKeyCount); + + if (LOG.isInfoEnabled()) { + LOG.info(String.format("Using a bloom-1 filter %d keys of size %d bytes", + newKeyCount, bloom1.sizeInBytes())); + LOG.info("Write buffer size: " + writeBufferSize); + } hashPartitions = new HashPartition[numPartitions]; int numPartitionsSpilledOnCreation = 0; @@ -430,6 +451,8 @@ private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper, int partitionId = keyHash & (hashPartitions.length - 1); HashPartition hashPartition = hashPartitions[partitionId]; + bloom1.addLong(keyHash); + if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer(); kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue); @@ -806,6 +829,18 @@ public ReusableRowContainer() { */ public JoinUtil.JoinResult setFromOutput(Output output) throws HiveException { int keyHash = WriteBuffers.murmurHash(output.getData(), 0, output.getLength()); + + if (!bloom1.testLong(keyHash)) { + /* + * if the keyHash is missing in the bloom filter, then the value cannot + * exist in any of the spilled partition - return NOMATCH + */ + dummyRow = null; + aliasFilter = (byte) 0xff; + hashMapResult.forget(); + return JoinResult.NOMATCH; + } + partitionId = keyHash & (hashPartitions.length - 1); // If the target hash table is on disk, spill this row to disk as well to be processed later diff --git shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index c08e76d..9b97966 100644 --- shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -373,6 +373,14 @@ public void setupConfiguration(Configuration conf) { @Override public MiniMrShim getMiniTezCluster(Configuration conf, int numberOfTaskTrackers, String nameNode, int numDir) throws IOException { + if (true) { + conf.setBoolean("tez.local.mode", true); + conf.set("fs.defaultFS", "file:///"); + conf.setBoolean("tez.runtime.optimize.local.fetch", true); + conf.set("tez.staging-dir", "/tmp"); + conf.setBoolean("tez.ignore.lib.uris", true); + return null; + } return new MiniTezShim(conf, numberOfTaskTrackers, nameNode, numDir); }