diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5db6bea..7694d8f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -710,6 +710,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", true, "Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,\n" + "because memory-optimized hashtable cannot be serialized."), + HIVEMAPJOINUSEHYBRIDGRACEHASHJOIN("hive.mapjoin.hybridgrace.hashtable", false, "Whether to use " + + "hybrid grace hash join as the join method for mapjoin."), HIVEHASHTABLEWBSIZE("hive.mapjoin.optimized.hashtable.wbsize", 10 * 1024 * 1024, "Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a chain of buffers to\n" + "store data. This is one buffer size. HT may be slightly faster if this is larger, but for small\n" + diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 91dcc03..3f304ba 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -287,6 +287,7 @@ minitez.query.files=bucket_map_join_tez1.q,\ bucket_map_join_tez2.q,\ dynamic_partition_pruning.q,\ dynamic_partition_pruning_2.q,\ + hybridhashjoin.q,\ mapjoin_decimal.q,\ lvj_mapjoin.q, \ mrr.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java index 4013b7f..fc2b5d2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java @@ -48,6 +48,12 @@ public class JoinUtil { + public static enum JoinResult { + MATCH, // a match is found + NOMATCH, // no match is found, and the current row will be dropped + SPILL // no match is found, and the current row has been spilled to disk + } + public static List[] getObjectInspectorsFromEvaluators( List[] exprEntries, ObjectInspector[] inputObjInspector, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 30731b3..bba2db9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -18,26 +18,25 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; +import com.esotericsoftware.kryo.Kryo; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.HashTableLoaderFactory; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.*; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; -import org.apache.hadoop.hive.ql.exec.persistence.UnwrapRowContainer; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; @@ -47,8 +46,13 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; +import static org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition; +import static org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper; + /** * Map side Join operator implementation. */ @@ -70,6 +74,13 @@ private transient ReusableGetAdaptor[] hashMapRowGetters; private UnwrapRowContainer[] unwrapContainer; + private transient Configuration hconf; + private transient boolean useHybridGraceHashJoin; // whether Hybrid Grace Hash Join is enabled + private transient boolean hybridMapJoinLeftover; // indicates there's on-disk data to be processed in another round + private transient HashPartition currentPartition; + private transient MapJoinBytesTableContainer currentSmallTable; + private transient int tag; + private transient int smallTable; public MapJoinOperator() { } @@ -95,6 +106,7 @@ public void startGroup() throws HiveException { @Override protected void initializeOp(Configuration hconf) throws HiveException { + this.hconf = hconf; unwrapContainer = new UnwrapRowContainer[conf.getTagLength()]; super.initializeOp(hconf); @@ -126,7 +138,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { * also ability to schedule tasks to re-use containers that have cached the specific bucket. */ if (isLogInfoEnabled) { - LOG.info("This is not bucket map join, so cache"); + LOG.info("This is not bucket map join, so cache"); } Pair pair = @@ -150,6 +162,9 @@ public Object call() throws HiveException { this.getExecContext().setLastInputPath(null); this.getExecContext().setCurrentInputPath(null); } + + useHybridGraceHashJoin = + HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINUSEHYBRIDGRACEHASHJOIN); } @Override @@ -242,9 +257,9 @@ public void cleanUpInputFileChangedOp() throws HiveException { loadHashTable(); } - protected void setMapJoinKey( + protected JoinUtil.JoinResult setMapJoinKey( ReusableGetAdaptor dest, Object row, byte alias) throws HiveException { - dest.setFromRow(row, joinKeys[alias], joinKeysObjectInspectors[alias]); + return dest.setFromRow(row, joinKeys[alias], joinKeysObjectInspectors[alias]); } protected MapJoinKey getRefKey(byte alias) { @@ -261,6 +276,15 @@ protected MapJoinKey getRefKey(byte alias) { @Override public void processOp(Object row, int tag) throws HiveException { + this.tag = tag; + + // As we're calling processOp again to process the leftover triplets, we know the "row" is + // coming from the on-disk matchfile. We need to recreate hashMapRowGetter against new hashtable. + if (hybridMapJoinLeftover) { + MapJoinKey refKey = getRefKey((byte)tag); + hashMapRowGetters[smallTable] = currentSmallTable.createGetter(refKey); + } + try { alias = (byte) tag; if (hashMapRowGetters == null) { @@ -279,14 +303,16 @@ public void processOp(Object row, int tag) throws HiveException { boolean joinNeeded = false; for (byte pos = 0; pos < order.length; pos++) { if (pos != alias) { + smallTable = pos; + JoinUtil.JoinResult joinResult; ReusableGetAdaptor adaptor; if (firstSetKey == null) { adaptor = firstSetKey = hashMapRowGetters[pos]; - setMapJoinKey(firstSetKey, row, alias); + joinResult = setMapJoinKey(firstSetKey, row, alias); } else { // Keys for all tables are the same, so only the first has to deserialize them. adaptor = hashMapRowGetters[pos]; - adaptor.setFromOther(firstSetKey); + joinResult = adaptor.setFromOther(firstSetKey); } MapJoinRowContainer rowContainer = adaptor.getCurrentRows(); if (rowContainer != null && unwrapContainer[pos] != null) { @@ -296,8 +322,13 @@ public void processOp(Object row, int tag) throws HiveException { // there is no join-value or join-key has all null elements if (rowContainer == null || firstSetKey.hasAnyNulls(fieldCount, nullsafes)) { if (!noOuterJoin) { - joinNeeded = true; - storage[pos] = dummyObjVectors[pos]; + // For Hybrid Grace Hash Join, during the 1st round processing, + // we only keep the LEFT side if the row is not spilled + if (!useHybridGraceHashJoin || hybridMapJoinLeftover || + (!hybridMapJoinLeftover && joinResult != JoinUtil.JoinResult.SPILL)) { + joinNeeded = true; + storage[pos] = dummyObjVectors[pos]; + } } else { storage[pos] = emptyList; } @@ -306,6 +337,20 @@ public void processOp(Object row, int tag) throws HiveException { storage[pos] = rowContainer.copy(); aliasFilterTags[pos] = rowContainer.getAliasFilter(); } + // Spill the big table rows into appropriate partition + if (joinResult == JoinUtil.JoinResult.SPILL) { + HybridHashTableContainer hybridHashTableContainer = + (HybridHashTableContainer) mapJoinTables[pos]; + int partitionId = hybridHashTableContainer.getToSpillPartitionId(); + HashPartition hp = hybridHashTableContainer.getHashPartitions()[partitionId]; + + ObjectContainer bigTable = hp.getMatchfileObjContainer(); + // Use Kryo to make a deep copy of original row + Kryo kryo = Utilities.runtimeSerializationKryo.get(); + kryo.getDefaultSerializer(row.getClass()); + Object rowCopy = kryo.copy(row); + bigTable.add(rowCopy); + } } } if (joinNeeded) { @@ -334,6 +379,42 @@ public void closeOp(boolean abort) throws HiveException { for (MapJoinTableContainer tableContainer : mapJoinTables) { if (tableContainer != null) { tableContainer.dumpMetrics(); + + if (tableContainer instanceof HybridHashTableContainer) { + HybridHashTableContainer hybridHashTableContainer = (HybridHashTableContainer) tableContainer; + hybridHashTableContainer.dumpStats(); + + HashPartition[] hashPartitions = hybridHashTableContainer.getHashPartitions(); + + // Clear all in memory partitions first + for (int i = 0; i < hashPartitions.length; i++) { + if (!hashPartitions[i].getHashMapOnDisk()) { + hybridHashTableContainer.setTotalInMemRowCount( + hybridHashTableContainer.getTotalInMemRowCount() - + hashPartitions[i].getInMemRowCount()); + hashPartitions[i].getHashMapFromMemory().clear(); + } + } + assert hybridHashTableContainer.getTotalInMemRowCount() == 0; + + for (int i = 0; i < hashPartitions.length; i++) { + if (hashPartitions[i].getHashMapOnDisk()) { + // Recursively process on-disk triplets (hash partition, sidefile, matchfile) + try { + hybridMapJoinLeftover = true; + continueProcessOp(hashPartitions[i], hybridHashTableContainer); + } catch (IOException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } catch (SerDeException e) { + e.printStackTrace(); + } + } + currentPartition = null; + currentSmallTable = null; + } + } } } if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) @@ -350,6 +431,64 @@ public void closeOp(boolean abort) throws HiveException { } /** + * This will call processOp again to deal with the leftover partitions/rowcontainers on disk + */ + private void continueProcessOp(HashPartition partition, HybridHashTableContainer hybridHashTableContainer) + throws HiveException, IOException, ClassNotFoundException, SerDeException { + reloadHashTable(partition, hybridHashTableContainer); + // Iterate thru the on-disk matchfile, and feed processOp with leftover rows + ObjectContainer bigTable = partition.getMatchfileObjContainer(); + while (bigTable.hasNext()) { + Object row = bigTable.next(); + processOp(row, tag); + } + bigTable.clear(); + } + + /** + * Create a new in-memory hashtable by: + * a) deserializing an on-disk hash table, and + * b) merging an on-disk row container + */ + private void reloadHashTable(HashPartition partition, HybridHashTableContainer hybridHashTableContainer) + throws IOException, ClassNotFoundException, HiveException, SerDeException { + // Deserialize the on-disk hash table + // We're sure this part is smaller than memory limit + BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk(); + int keyCount = restoredHashMap.size(); + + // Merge the sidefile into the newly created hash table + // This is where the spilling may happen again + KeyValueContainer kvContainer = partition.getSidefileKVContainer(); + keyCount += kvContainer.size(); + // If based on the new key count, keyCount is smaller than a threshold, + // then just load the entire restored hashmap into memory. + // The size of deserialized partition shouldn't exceed half of memory limit + long totalOrigMemUsed = hybridHashTableContainer.getHashPartitions().length * partition.getWbSize(); + assert keyCount * hybridHashTableContainer.getTableRowSize() < totalOrigMemUsed / 2 : + "Hash table cannot be reloaded since it will be greater than memory limit and recursive" + + " spilling is currently not supported"; + + KeyValueHelper writeHelper = hybridHashTableContainer.getWriteHelper(); + while (kvContainer.hasNext()) { + ObjectPair pair = kvContainer.next(); + Writable key = pair.getFirst(); + Writable val = pair.getSecond(); + writeHelper.setKeyValue(key, val); + restoredHashMap.put(writeHelper, -1); + } + + hybridHashTableContainer.setTotalInMemRowCount(hybridHashTableContainer.getTotalInMemRowCount() + + restoredHashMap.size() + kvContainer.size()); + kvContainer.clear(); + + // Since there's only one hashmap to deal with, it's OK to create a MapJoinBytesTableContainer + currentSmallTable = new MapJoinBytesTableContainer(restoredHashMap); + currentSmallTable.setInternalValueOi(hybridHashTableContainer.getInternalValueOi()); + currentSmallTable.setSortableSortOrders(hybridHashTableContainer.getSortableSortOrders()); + } + + /** * Implements the getName function for the Node Interface. * * @return the name of the operator diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 8d3e3cc..e02c115 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -18,6 +18,12 @@ package org.apache.hadoop.hive.ql.exec.persistence; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -45,7 +51,8 @@ * Initially inspired by HPPC LongLongOpenHashMap; however, the code is almost completely reworked * and there's very little in common left save for quadratic probing (and that with some changes). */ -public final class BytesBytesMultiHashMap { +public final class BytesBytesMultiHashMap implements Serializable { + private static final long serialVersionUID = 1L; public static final Log LOG = LogFactory.getLog(BytesBytesMultiHashMap.class); /* @@ -120,7 +127,7 @@ */ private WriteBuffers writeBuffers; - private final float loadFactor; + private float loadFactor; private int resizeThreshold; private int keysAssigned; @@ -154,7 +161,7 @@ private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024; public BytesBytesMultiHashMap(int initialCapacity, - float loadFactor, int wbSize, long memUsage, int defaultCapacity) { + float loadFactor, int wbSize, long memUsage) { if (loadFactor < 0 || loadFactor > 1) { throw new AssertionError("Load factor must be between (0, 1]."); } @@ -180,7 +187,7 @@ public BytesBytesMultiHashMap(int initialCapacity, @VisibleForTesting BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) { - this(initialCapacity, loadFactor, wbSize, -1, 100000); + this(initialCapacity, loadFactor, wbSize, -1); } /** The source of keys and values to put into hashtable; avoids byte copying. */ @@ -204,7 +211,7 @@ public BytesBytesMultiHashMap(int initialCapacity, * @param kv Keyvalue writer. Each method will be called at most once. */ private static final byte[] FOUR_ZEROES = new byte[] { 0, 0, 0, 0 }; - public void put(KvSource kv) throws SerDeException { + public void put(KvSource kv, int keyHashCode) throws SerDeException { if (resizeThreshold <= keysAssigned) { expandAndRehash(); } @@ -218,7 +225,7 @@ public void put(KvSource kv) throws SerDeException { kv.writeKey(writeBuffers); int keyLength = (int)(writeBuffers.getWritePoint() - keyOffset); - int hashCode = writeBuffers.hashCode(keyOffset, keyLength); + int hashCode = (keyHashCode == -1) ? writeBuffers.hashCode(keyOffset, keyLength) : keyHashCode; int slot = findKeySlotToWrite(keyOffset, keyLength, hashCode); // LOG.info("Write hash code is " + Integer.toBinaryString(hashCode) + " - " + slot); @@ -301,10 +308,24 @@ public void populateValue(WriteBuffers.ByteSegmentRef valueRef) { writeBuffers.populateValue(valueRef); } + /** + * Number of keys in the hashmap + * @return number of keys + */ public int size() { return keysAssigned; } + /** + * Number of bytes used by the hashmap + * There are two main components that take most memory: writeBuffers and refs + * Others include instance fields: 100 + * @return number of bytes + */ + public long memorySize() { + return writeBuffers.size() + refs.length * 8 + 100; + } + public void seal() { writeBuffers.seal(); } @@ -750,4 +771,22 @@ private void debugDumpKeyProbe(long keyOffset, int keyLength, int hashCode, int } LOG.info(sb.toString()); } + + + public static void serialize(OutputStream fos, BytesBytesMultiHashMap hashmap) + throws IOException { + ObjectOutputStream oos = new ObjectOutputStream(fos); + oos.writeObject(hashmap); + oos.close(); + fos.close(); + } + + public static BytesBytesMultiHashMap deserialize(InputStream fis) + throws IOException, ClassNotFoundException { + ObjectInputStream ois = new ObjectInputStream(fis); + BytesBytesMultiHashMap hashmap = (BytesBytesMultiHashMap) ois.readObject(); + ois.close(); + fis.close(); + return hashmap; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java index 3adaab7..7ca7897 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -153,7 +154,7 @@ public GetAdaptor(MapJoinKey key) { } @Override - public void setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, + public JoinUtil.JoinResult setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException { if (currentKey == null) { currentKey = new Object[keyOutputWriters.length]; @@ -168,11 +169,17 @@ public void setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyO key = MapJoinKey.readFromVector(output, key, currentKey, vectorKeyOIs, !isFirstKey); isFirstKey = false; this.currentValue = mHash.get(key); + if (this.currentValue == null) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } } @Override - public void setFromRow(Object row, List fields, - List ois) throws HiveException { + public JoinUtil.JoinResult setFromRow(Object row, List fields, List ois) + throws HiveException { if (currentKey == null) { currentKey = new Object[fields.size()]; } @@ -182,15 +189,27 @@ public void setFromRow(Object row, List fields, key = MapJoinKey.readFromRow(output, key, currentKey, ois, !isFirstKey); isFirstKey = false; this.currentValue = mHash.get(key); + if (this.currentValue == null) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } } @Override - public void setFromOther(ReusableGetAdaptor other) { + public JoinUtil.JoinResult setFromOther(ReusableGetAdaptor other) { assert other instanceof GetAdaptor; GetAdaptor other2 = (GetAdaptor)other; this.key = other2.key; this.isFirstKey = other2.isFirstKey; this.currentValue = mHash.get(key); + if (this.currentValue == null) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } } @Override @@ -223,4 +242,9 @@ public MapJoinKey getAnyKey() { public void dumpMetrics() { // Nothing to do. } + + @Override + public boolean hasSpill() { + return false; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java new file mode 100644 index 0000000..0d0a247 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -0,0 +1,720 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.persistence; + + +import com.esotericsoftware.kryo.Kryo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; +import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.WriteBuffers; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; + +/** + * Table container that serializes keys and values using LazyBinarySerDe into + * BytesBytesMultiHashMap, with very low memory overhead. However, + * there may be some perf overhead when retrieving rows. + */ +public class HybridHashTableContainer implements MapJoinTableContainer { + private static final Log LOG = LogFactory.getLog(MapJoinTableContainer.class); + private final Configuration hconf; + + /** + * This class encapsulates the triplet together since they are closely related to each other + */ + public static class HashPartition { + BytesBytesMultiHashMap hashMap; // In memory hashMap + int inMemRowCount; // Number of rows that reside in memory + KeyValueContainer sidefileKVContainer; // Stores small table key/value pairs + ObjectContainer matchfileObjContainer; // Stores big table rows + Path hashMapLocalPath; // Local file system path for spilled hashMap + boolean hashMapOnDisk; // Status of hashMap. true: on disk, false: in memory + boolean hashMapSpilledOnCreation; // When there's not enough memory, we cannot create hashMap + int partitionThreshold; // This info is recorded and to be used in getHashMapFromDisk + float loadFactor; // Same as above + int wbSize; // Same as above + + public HashPartition(int partitionThreshold, float loadFactor, int wbSize, long memUsage) { + if (memUsage < wbSize) { + hashMapSpilledOnCreation = true; + hashMapOnDisk = true; + this.partitionThreshold = partitionThreshold; + this.loadFactor = loadFactor; + this.wbSize = wbSize; + } else { + hashMap = + new BytesBytesMultiHashMap(partitionThreshold, loadFactor, wbSize, memUsage); + } + } + + public HashPartition(BytesBytesMultiHashMap hashMap) { + this.hashMap = hashMap; + } + + public BytesBytesMultiHashMap getHashMapFromMemory() { + return hashMap; + } + + public BytesBytesMultiHashMap getHashMapFromDisk() + throws IOException, ClassNotFoundException { + if (hashMapSpilledOnCreation) { + return new BytesBytesMultiHashMap(partitionThreshold, loadFactor, wbSize, -1); + } else { + InputStream inputStream = Files.newInputStream(hashMapLocalPath); + com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream); + Kryo kryo = Utilities.runtimeSerializationKryo.get(); + BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class); + input.close(); + inputStream.close(); + Files.delete(hashMapLocalPath); + return restoredHashMap; + } + } + + public KeyValueContainer getSidefileKVContainer() { + if (sidefileKVContainer == null) { + sidefileKVContainer = new KeyValueContainer(); + } + return sidefileKVContainer; + } + + public ObjectContainer getMatchfileObjContainer() { + if (matchfileObjContainer == null) { + matchfileObjContainer = new ObjectContainer(); + } + return matchfileObjContainer; + } + + public boolean getHashMapOnDisk() { + return hashMapOnDisk; + } + + public int getInMemRowCount() { + return inMemRowCount; + } + + public int getWbSize() { + return wbSize; + } + } + + private HashPartition[] hashPartitions; // an array of partitions holding all small table rows + private int totalInMemRowCount = 0; // total number of rows in memory + private long memoryThreshold; // the max memory limit allocated + private long tableRowSize; // row size of the small table + private boolean isSpilled; // whether there's any spilled partition + private int toSpillPartitionId; // the partition into which to spill the big table row + + /** The OI used to deserialize values. We never deserialize keys. */ + private LazyBinaryStructObjectInspector internalValueOi; + private boolean[] sortableSortOrders; + private MapJoinBytesTableContainer.KeyValueHelper writeHelper; + + private final List EMPTY_LIST = new ArrayList(0); + + // How often to check if memory is full. Default is to check every 1024 rows. + // Note: this number MUST be power of 2. + private final static int MEMORY_CHECK_FREQUENCY = 1024; + + public HybridHashTableContainer(Configuration hconf, MapJoinObjectSerDeContext valCtx, + long keyCount, long memUsage, long tableSize) + throws SerDeException { + this(hconf, + HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), + HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), + HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD), + tableSize, valCtx, keyCount, memUsage); + } + + private HybridHashTableContainer(Configuration hconf, float keyCountAdj, int threshold, + float loadFactor, int wbSize, long noConditionalTaskThreshold, + long tableSize, MapJoinObjectSerDeContext valCtx, + long keyCount, long memUsage) + throws SerDeException { + this.hconf = hconf; + this.memoryThreshold = noConditionalTaskThreshold; + this.tableRowSize = tableSize / keyCount; + + int numPartitions = calcNumPartitions(tableSize, wbSize); + hashPartitions = new HashPartition[numPartitions]; + + int keyCountPerPart = (int) (keyCount / numPartitions); + if (keyCountPerPart == 0) { + keyCountPerPart = (int) keyCount; + } + + int partitionThreshold = HashMapWrapper.calculateTableSize( + keyCountAdj, threshold, loadFactor, keyCountPerPart); + + long memoryUsed = 0; + int numPartitionsSpilledOnCreation = 0; + for (int i = 0; i < numPartitions; i++) { + hashPartitions[i] = + new HashPartition(partitionThreshold, loadFactor, wbSize, memoryThreshold - memoryUsed); + if (!isHashMapSpilledOnCreation(i)) { + memoryUsed += hashPartitions[i].hashMap.memorySize(); + } else { + numPartitionsSpilledOnCreation++; + } + } + + LOG.info("[Hybrid Grace Hash Join] Number of partitions spilled directly to disk on creation: " + + numPartitionsSpilledOnCreation); + } + + + public MapJoinBytesTableContainer.KeyValueHelper getWriteHelper() { + return writeHelper; + } + + public HashPartition[] getHashPartitions() { + return hashPartitions; + } + + public LazyBinaryStructObjectInspector getInternalValueOi() { + return internalValueOi; + } + + public boolean[] getSortableSortOrders() { + return sortableSortOrders; + } + + @SuppressWarnings("deprecation") + @Override + public MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentKey, + MapJoinObjectSerDeContext valueContext, Writable currentValue) + throws SerDeException, HiveException, IOException { + SerDe keySerde = keyContext.getSerDe(), valSerde = valueContext.getSerDe(); + + if (writeHelper == null) { + LOG.info("Initializing container with " + + keySerde.getClass().getName() + " and " + valSerde.getClass().getName()); + + // We assume this hashtable is loaded only when tez is enabled + LazyBinaryStructObjectInspector valSoi = + (LazyBinaryStructObjectInspector) valSerde.getObjectInspector(); + writeHelper = new MapJoinBytesTableContainer.LazyBinaryKvWriter(keySerde, valSoi, valueContext.hasFilterTag()); + if (internalValueOi == null) { + internalValueOi = valSoi; + } + if (sortableSortOrders == null) { + sortableSortOrders = ((BinarySortableSerDe) keySerde).getSortOrders(); + } + } + writeHelper.setKeyValue(currentKey, currentValue); + + // Next, put row into corresponding hash partition + int keyHash = writeHelper.getHashFromKey(); + int partitionId = keyHash & (hashPartitions.length - 1); + HashPartition hashPartition = hashPartitions[partitionId]; + + if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { + KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer(); + kvContainer.add((HiveKey) WritableUtils.clone(currentKey, hconf), + (BytesWritable) WritableUtils.clone(currentValue, hconf)); + } else { + int sizeBefore = hashPartition.hashMap.size(); + hashPartition.hashMap.put(writeHelper, keyHash); // Pass along hashcode to avoid recalculation + int sizeAfter = hashPartition.hashMap.size(); + if (sizeBefore != sizeAfter) { + hashPartition.inMemRowCount += 1; + totalInMemRowCount += 1; + } + + if (totalInMemRowCount != 0 && (totalInMemRowCount & (MEMORY_CHECK_FREQUENCY - 1)) == 0) { + if (isMemoryFull()) { + spillPartition(biggestPartition()); + this.setSpill(true); + } + } + } + + return null; // there's no key to return + } + + /** + * Check if a specific hashmap is on disk + * @param partitionId hashmap ID + * @return true if on disk, false if in memory + */ + private boolean isOnDisk(int partitionId) { + return hashPartitions[partitionId].hashMapOnDisk; + } + + /** + * Check if a specific hashmap has been "spilled" to disk when it was created. + * In fact, in other words, check if a hashmap does exist or not. + * @param partitionId hashMap ID + * @return true if it was not created at all, false if there is a hashMap existing there + */ + private boolean isHashMapSpilledOnCreation(int partitionId) { + return hashPartitions[partitionId].hashMapSpilledOnCreation; + } + + /** + * Check if the memory threshold has been reached by hash tables + * @return true if memory is full, false if not + */ + private boolean isMemoryFull() { + long size = 0; + for (int i = 0; i < hashPartitions.length; i++) { + if (!isOnDisk(i) && !isHashMapSpilledOnCreation(i)) { + size += hashPartitions[i].hashMap.memorySize(); + } + } + LOG.info("[Hybrid Grace Hash Join] Memory used at this moment: " + size); + return size >= memoryThreshold; + } + + /** + * Returns the biggest hashmap in memory at this moment + * @return the biggest hashmap ID + */ + private int biggestPartition() { + int res = 0; + int maxSize = 0; + + // If a partition has been spilled to disk, its size will be 0, i.e. it won't be picked + for (int i = 0; i < hashPartitions.length; i++) { + int size; + if (isOnDisk(i) || isHashMapSpilledOnCreation(i)) { + continue; + } else { + size = hashPartitions[i].hashMap.size(); + } + if (size > maxSize) { + maxSize = size; + res = i; + } + } + return res; + } + + /** + * Move a hashtable from memory into local file system + * @param partitionId the hashtable to be moved + */ + private void spillPartition(int partitionId) throws IOException { + HashPartition partition = hashPartitions[partitionId]; + + Path path = Files.createTempFile("partition-" + partitionId + "-", null); + OutputStream outputStream = Files.newOutputStream(path); + + com.esotericsoftware.kryo.io.Output output = new com.esotericsoftware.kryo.io.Output(outputStream); + Kryo kryo = Utilities.runtimeSerializationKryo.get(); + kryo.writeObject(output, hashPartitions[partitionId].hashMap); // use Kryo to serialize hashmap + output.close(); + outputStream.close(); + + partition.hashMapLocalPath = path; + partition.hashMapOnDisk = true; + + long size = 0; + for (HashPartition hashPartition : hashPartitions) { + size += hashPartition.hashMap.memorySize(); + } + LOG.info("[Hybrid Grace Hash Join] Memory usage before spilling: " + size); + LOG.info("Hash partition " + partitionId + " (Rows: " + partition.inMemRowCount + ", Mem size: " + + partition.hashMap.memorySize() + ") is spilled to disk: " + path); + LOG.info("[Hybrid Grace Hash Join] Memory usage after spilling: " + + (size - partition.hashMap.memorySize())); + + totalInMemRowCount -= partition.inMemRowCount; + partition.inMemRowCount = 0; + partition.hashMap.clear(); + } + + /** + * Calculate how many partitions are needed. This is an estimation. + * @param dataSize total data size for the table + * @param wbSize write buffer size + * @return number of partitions needed + */ + private int calcNumPartitions(long dataSize, int wbSize) { + assert memoryThreshold >= wbSize : "Available memory is less than hashtable writebuffer size!" + + " Try increasing hive.auto.convert.join.noconditionaltask.size."; + + int lowerLimit = 2; + int numPartitions = (int) (dataSize / wbSize); + + if (numPartitions <= lowerLimit) { + return lowerLimit; + } else if (dataSize > memoryThreshold) { + numPartitions = (int) (memoryThreshold / wbSize); + } + // Make sure numPartitions is power of 2, to make N & (M - 1) easy when calculating partition No. + numPartitions = (Long.bitCount(numPartitions) == 1) ? numPartitions : Integer.highestOneBit(numPartitions) << 1; + while (dataSize / numPartitions > memoryThreshold) { + numPartitions *= 2; + } + + LOG.info("[Hybrid Grace Hash Join] Total available memory: " + memoryThreshold); + LOG.info("[Hybrid Grace Hash Join] Estimated data size: " + dataSize); + LOG.info("[Hybrid Grace Hash Join] Number of hash partitions to be created: " + numPartitions); + LOG.info("[Hybrid Grace Hash Join] Hash partition size: " + wbSize); + + return numPartitions; + } + + public int getTotalInMemRowCount() { + return totalInMemRowCount; + } + + public void setTotalInMemRowCount(int totalInMemRowCount) { + this.totalInMemRowCount = totalInMemRowCount; + } + + public long getTableRowSize() { + return tableRowSize; + } + + @Override + public boolean hasSpill() { + return isSpilled; + } + + public void setSpill(boolean isSpilled) { + this.isSpilled = isSpilled; + } + + /** + * Gets the partition Id into which to spill the big table row + * @return partition Id + */ + public int getToSpillPartitionId() { + return toSpillPartitionId; + } + + private void setToSpillPartitionId(int partitionId) { + this.toSpillPartitionId = partitionId; + } + + @Override + public void clear() { + for (HashPartition hp : hashPartitions) { + if (hp.hashMap != null) { + hp.hashMap.clear(); + } + } + } + + @Override + public MapJoinKey getAnyKey() { + return null; // This table has no keys. + } + + @Override + public ReusableGetAdaptor createGetter(MapJoinKey keyTypeFromLoader) { + if (keyTypeFromLoader != null) { + throw new AssertionError("No key expected from loader but got " + keyTypeFromLoader); + } + return new GetAdaptor(); + } + + @Override + public void seal() { + for (HashPartition hp : hashPartitions) { + // Only seal those partitions that haven't been spilled and cleared, + // because once a hashMap is cleared, it will become unusable + if (hp.hashMap != null && hp.hashMap.size() != 0) { + hp.hashMap.seal(); + } + } + } + + /** Implementation of ReusableGetAdaptor that has Output for key serialization; row + * container is also created once and reused for every row. */ + private class GetAdaptor implements ReusableGetAdaptor { + + private Object[] currentKey; + private boolean[] nulls; + private List vectorKeyOIs; + + private final ReusableRowContainer currentValue; + private final Output output; + + public GetAdaptor() { + currentValue = new ReusableRowContainer(); + output = new Output(); + } + + @Override + public JoinUtil.JoinResult setFromVector(VectorHashKeyWrapper kw, + VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) + throws HiveException { + if (nulls == null) { + nulls = new boolean[keyOutputWriters.length]; + currentKey = new Object[keyOutputWriters.length]; + vectorKeyOIs = new ArrayList(); + for (int i = 0; i < keyOutputWriters.length; i++) { + vectorKeyOIs.add(keyOutputWriters[i].getObjectInspector()); + } + } else { + assert nulls.length == keyOutputWriters.length; + } + for (int i = 0; i < keyOutputWriters.length; i++) { + currentKey[i] = keyWrapperBatch.getWritableKeyValue(kw, i, keyOutputWriters[i]); + nulls[i] = currentKey[i] == null; + } + return currentValue.setFromOutput(MapJoinKey.serializeRow(output, currentKey, vectorKeyOIs, sortableSortOrders)); + } + + @Override + public JoinUtil.JoinResult setFromRow(Object row, List fields, + List ois) throws HiveException { + if (nulls == null) { + nulls = new boolean[fields.size()]; + currentKey = new Object[fields.size()]; + } + for (int keyIndex = 0; keyIndex < fields.size(); ++keyIndex) { + currentKey[keyIndex] = fields.get(keyIndex).evaluate(row); + nulls[keyIndex] = currentKey[keyIndex] == null; + } + return currentValue.setFromOutput(MapJoinKey.serializeRow(output, currentKey, ois, sortableSortOrders)); + } + + @Override + public JoinUtil.JoinResult setFromOther(ReusableGetAdaptor other) throws HiveException { + assert other instanceof GetAdaptor; + GetAdaptor other2 = (GetAdaptor)other; + nulls = other2.nulls; + currentKey = other2.currentKey; + return currentValue.setFromOutput(other2.output); + } + + @Override + public boolean hasAnyNulls(int fieldCount, boolean[] nullsafes) { + if (nulls == null || nulls.length == 0) return false; + for (int i = 0; i < nulls.length; i++) { + if (nulls[i] && (nullsafes == null || !nullsafes[i])) { + return true; + } + } + return false; + } + + @Override + public MapJoinRowContainer getCurrentRows() { + return currentValue.isEmpty() ? null : currentValue; + } + + @Override + public Object[] getCurrentKey() { + return currentKey; + } + } + + /** Row container that gets and deserializes the rows on demand from bytes provided. */ + private class ReusableRowContainer + implements MapJoinRowContainer, AbstractRowContainer.RowIterator> { + private byte aliasFilter; + private List refs; + private int currentRow; + /** + * Sometimes, when container is empty in multi-table mapjoin, we need to add a dummy row. + * This container does not normally support adding rows; this is for the dummy row. + */ + private List dummyRow = null; + + private final ByteArrayRef uselessIndirection; // LBStruct needs ByteArrayRef + private final LazyBinaryStruct valueStruct; + + private int partitionId; // Current hashMap in use + + public ReusableRowContainer() { + if (internalValueOi != null) { + valueStruct = (LazyBinaryStruct) + LazyBinaryFactory.createLazyBinaryObject(internalValueOi); + } else { + valueStruct = null; // No rows? + } + uselessIndirection = new ByteArrayRef(); + clearRows(); + } + + public JoinUtil.JoinResult setFromOutput(Output output) throws HiveException { + if (refs == null) { + refs = new ArrayList(0); + } + + int keyHash = WriteBuffers.murmurHash(output.getData(), 0, output.getLength()); + partitionId = keyHash & (hashPartitions.length - 1); + + // If the target hash table is on disk, spill this row to disk as well to be processed later + if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { + toSpillPartitionId = partitionId; + refs.clear(); + return JoinUtil.JoinResult.SPILL; + } + else { + byte aliasFilter = hashPartitions[partitionId].hashMap.getValueRefs(output.getData(), output.getLength(), refs); + this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter; + this.dummyRow = null; + if (refs.isEmpty()) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } + } + } + + public boolean isEmpty() { + return refs.isEmpty() && (dummyRow == null); + } + + // Implementation of row container + @Override + public RowIterator> rowIter() throws HiveException { + currentRow = -1; + return this; + } + + @Override + public int rowCount() throws HiveException { + return dummyRow != null ? 1 : refs.size(); + } + + @Override + public void clearRows() { + // Doesn't clear underlying hashtable + if (refs != null) { + refs.clear(); + } + dummyRow = null; + currentRow = -1; + aliasFilter = (byte) 0xff; + } + + @Override + public byte getAliasFilter() throws HiveException { + return aliasFilter; + } + + @Override + public MapJoinRowContainer copy() throws HiveException { + return this; // Independent of hashtable and can be modified, no need to copy. + } + + // Implementation of row iterator + @Override + public List first() throws HiveException { + currentRow = 0; + return next(); + } + + + @Override + public List next() throws HiveException { + if (dummyRow != null) { + List result = dummyRow; + dummyRow = null; + return result; + } + if (currentRow < 0 || refs.size() < currentRow) throw new HiveException("No rows"); + if (refs.size() == currentRow) return null; + WriteBuffers.ByteSegmentRef ref = refs.get(currentRow++); + if (ref.getLength() == 0) { + return EMPTY_LIST; // shortcut, 0 length means no fields + } + if (ref.getBytes() == null) { + // partitionId is derived from previously calculated value in setFromOutput() + hashPartitions[partitionId].hashMap.populateValue(ref); + } + uselessIndirection.setData(ref.getBytes()); + valueStruct.init(uselessIndirection, (int)ref.getOffset(), ref.getLength()); + return valueStruct.getFieldsAsList(); + } + + @Override + public void addRow(List t) { + if (dummyRow != null || !refs.isEmpty()) { + throw new RuntimeException("Cannot add rows when not empty"); + } + dummyRow = t; + } + + // Various unsupported methods. + @Override + public void addRow(Object[] value) { + throw new RuntimeException(this.getClass().getCanonicalName() + " cannot add arrays"); + } + @Override + public void write(MapJoinObjectSerDeContext valueContext, ObjectOutputStream out) { + throw new RuntimeException(this.getClass().getCanonicalName() + " cannot be serialized"); + } + } + + @Override + public void dumpMetrics() { + for (int i = 0; i < hashPartitions.length; i++) { + HashPartition hp = hashPartitions[i]; + if (hp.hashMap != null) { + hp.hashMap.debugDumpMetrics(); + } + } + } + + public void dumpStats() { + int numPartitionsInMem = 0; + int numPartitionsOnDisk = 0; + + for (HashPartition hp : hashPartitions) { + if (hp.getHashMapOnDisk()) { + numPartitionsOnDisk++; + } else { + numPartitionsInMem++; + } + } + + LOG.info("[Hybrid Grace Hash Join] Summary: " + numPartitionsInMem + + " partitions in memory have been processed; " + numPartitionsOnDisk + + " partitions have been spilled to disk and will be processed next."); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java new file mode 100644 index 0000000..acbf804 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.persistence; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.io.BytesWritable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +/** + * A simple key/value container with the ability to spill to disk when the buffer is full. + * Kryo is used for the serialization/deserialization. + */ +@SuppressWarnings("unchecked") +public class KeyValueContainer { + private static final Log LOG = LogFactory.getLog(KeyValueContainer.class); + + @VisibleForTesting + static final int IN_MEMORY_NUM_ROWS = 1024; + + private ObjectPair[] writeBuffer; + private ObjectPair[] readBuffer; + + private File parentFile; + private File tmpFile; + + private int readCursor = 0; + private int writeCursor = 0; + + // Indicate if the read buffer has data, for example, + // when in reading, data on disk could be pull in + private boolean readBufferUsed = false; + private int rowsInReadBuffer = 0; + + private Input input; + private Output output; + + private int size; + + public KeyValueContainer() { + writeBuffer = new ObjectPair[IN_MEMORY_NUM_ROWS]; + readBuffer = new ObjectPair[IN_MEMORY_NUM_ROWS]; + for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) { + writeBuffer[i] = new ObjectPair(); + readBuffer[i] = new ObjectPair(); + } + } + + private void switchBufferAndResetCursor() { + ObjectPair[] tmp = readBuffer; + rowsInReadBuffer = writeCursor; + readBuffer = writeBuffer; + readBufferUsed = true; + readCursor = 0; + writeBuffer = tmp; + writeCursor = 0; + } + + private void setupOutput() throws IOException { + if (parentFile == null) { + while (true) { + parentFile = File.createTempFile("key-value-container", ""); + if (parentFile.delete() && parentFile.mkdir()) { + parentFile.deleteOnExit(); + break; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Retry creating tmp key-value-container directory..."); + } + } + } + + if (tmpFile == null || input != null) { + tmpFile = File.createTempFile("KeyValueContainer", ".tmp", parentFile); + LOG.info("KeyValueContainer created temp file " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + } + + FileOutputStream fos = null; + try { + fos = new FileOutputStream(tmpFile); + output = new Output(fos); + } finally { + if (output == null && fos != null) { + fos.close(); + } + } + } + + private BytesWritable readValue(Input input) { + return new BytesWritable(input.readBytes(input.readInt())); + } + + private void writeValue(Output output, BytesWritable bytesWritable) { + int size = bytesWritable.getLength(); + output.writeInt(size); + output.writeBytes(bytesWritable.getBytes(), 0, size); + } + + private HiveKey readHiveKey(Input input) { + HiveKey hiveKey = new HiveKey( + input.readBytes(input.readInt()), input.readInt()); + hiveKey.setDistKeyLength(input.readInt()); + return hiveKey; + } + + private void writeHiveKey(Output output, HiveKey hiveKey) { + int size = hiveKey.getLength(); + output.writeInt(size); + output.writeBytes(hiveKey.getBytes(), 0, size); + output.writeInt(0); // Since hashCode is not used, just put an arbitrary number + output.writeInt(hiveKey.getDistKeyLength()); + } + + public void add(HiveKey key, BytesWritable value) { + if (writeCursor >= IN_MEMORY_NUM_ROWS) { // Write buffer is full + if (!readBufferUsed) { // Read buffer isn't used, switch buffer + switchBufferAndResetCursor(); + } else { + // Need to spill from write buffer to disk + try { + if (output == null) { + setupOutput(); + } + for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) { + ObjectPair pair = writeBuffer[i]; + writeHiveKey(output, pair.getFirst()); + writeValue(output, pair.getSecond()); + pair.setFirst(null); + pair.setSecond(null); + } + writeCursor = 0; + } catch (Exception e) { + clear(); // Clean up the cache + throw new RuntimeException("Failed to spill key/value pairs to disk", e); + } + } + } + ObjectPair pair = writeBuffer[writeCursor++]; + pair.setFirst(key); + pair.setSecond(value); + size++; + } + + public void clear() { + writeCursor = readCursor = rowsInReadBuffer = 0; + readBufferUsed = false; + + if (parentFile != null) { + if (input != null) { + try { + input.close(); + } catch (Throwable ignored) { + } + input = null; + } + if (output != null) { + try { + output.close(); + } catch (Throwable ignored) { + } + output = null; + } + try { + FileUtil.fullyDelete(parentFile); + } catch (Throwable ignored) { + } + parentFile = null; + tmpFile = null; + } + size = 0; + } + + public boolean hasNext() { + return readBufferUsed || writeCursor > 0; + } + + public ObjectPair next() { + Preconditions.checkState(hasNext()); + if (!readBufferUsed) { + try { + if (input == null && output != null) { + // Close output stream if open + output.close(); + output = null; + + FileInputStream fis = null; + try { + fis = new FileInputStream(tmpFile); + input = new Input(fis); + } finally { + if (input == null && fis != null) { + fis.close(); + } + } + } + if (input != null) { + // Load next batch from disk + for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) { + ObjectPair pair = readBuffer[i]; + pair.setFirst(readHiveKey(input)); + pair.setSecond(readValue(input)); + } + if (input.eof()) { + input.close(); + input = null; + } + rowsInReadBuffer = IN_MEMORY_NUM_ROWS; + readBufferUsed = true; + readCursor = 0; + } else if (writeCursor == 1) { + ObjectPair pair = writeBuffer[0]; + ObjectPair row = new ObjectPair( + pair.getFirst(), pair.getSecond()); + pair.setFirst(null); + pair.setSecond(null); + writeCursor = 0; + return row; + } else { + // No record on disk, more data in write buffer + switchBufferAndResetCursor(); + } + } catch (Exception e) { + clear(); // Clean up the cache + throw new RuntimeException("Failed to load key/value pairs from disk", e); + } + } + ObjectPair pair = readBuffer[readCursor]; + ObjectPair row = new ObjectPair<>( + pair.getFirst(), pair.getSecond()); + pair.setFirst(null); + pair.setSecond(null); + if (++readCursor >= rowsInReadBuffer) { + readBufferUsed = false; + rowsInReadBuffer = 0; + readCursor = 0; + } + return row; + } + + public int size() { + return size; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index 28f6c63..e4a90f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -91,7 +92,11 @@ private MapJoinBytesTableContainer(float keyCountAdj, int threshold, float loadF throws SerDeException { int newThreshold = HashMapWrapper.calculateTableSize( keyCountAdj, threshold, loadFactor, keyCount); - hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage, threshold); + hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage); + } + + public MapJoinBytesTableContainer(BytesBytesMultiHashMap hashMap) { + this.hashMap = hashMap; } private LazyBinaryStructObjectInspector createInternalOi( @@ -114,8 +119,18 @@ private LazyBinaryStructObjectInspector createInternalOi( .getLazyBinaryStructObjectInspector(colNames, colOis); } - private static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource { + public void setInternalValueOi(LazyBinaryStructObjectInspector internalValueOi) { + this.internalValueOi = internalValueOi; + } + + public void setSortableSortOrders(boolean[] sortableSortOrders) { + this.sortableSortOrders = sortableSortOrders; + } + + public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource { void setKeyValue(Writable key, Writable val) throws SerDeException; + /** Get hash value from the key. */ + int getHashFromKey() throws SerDeException; } private static class KeyValueWriter implements KeyValueHelper { @@ -176,9 +191,14 @@ public byte updateStateByte(Byte previousValue) { aliasFilter &= ((ShortWritable)valObjs[valObjs.length - 1]).get(); return aliasFilter; } + + @Override + public int getHashFromKey() throws SerDeException { + throw new UnsupportedOperationException("Not supported for MapJoinBytesTableContainer"); + } } - private static class LazyBinaryKvWriter implements KeyValueHelper { + static class LazyBinaryKvWriter implements KeyValueHelper { private final LazyBinaryStruct.SingleFieldGetter filterGetter; private Writable key, value; private final SerDe keySerDe; @@ -210,6 +230,16 @@ public void writeKey(RandomAccessOutput dest) throws SerDeException { dest.write(b.getBytes(), 0, b.getLength() - (hasTag ? 1 : 0)); } + @Override + public int getHashFromKey() throws SerDeException { + if (!(key instanceof BinaryComparable)) { + throw new SerDeException("Unexpected type " + key.getClass().getCanonicalName()); + } + sanityCheckKeyForTag(); + BinaryComparable b = (BinaryComparable)key; + return WriteBuffers.murmurHash(b.getBytes(), 0, b.getLength() - (hasTag ? 1 : 0)); + } + /** * If we received data with tags from ReduceSinkOperators, no keys will match. This should * not happen, but is important enough that we want to find out and work around it if some @@ -285,7 +315,7 @@ public MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentK } } writeHelper.setKeyValue(currentKey, currentValue); - hashMap.put(writeHelper); + hashMap.put(writeHelper, -1); return null; // there's no key to return } @@ -329,9 +359,9 @@ public GetAdaptor() { } @Override - public void setFromVector(VectorHashKeyWrapper kw, - VectorExpressionWriter[] keyOutputWriters, - VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException { + public JoinUtil.JoinResult setFromVector(VectorHashKeyWrapper kw, + VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) + throws HiveException { if (nulls == null) { nulls = new boolean[keyOutputWriters.length]; currentKey = new Object[keyOutputWriters.length]; @@ -346,12 +376,12 @@ public void setFromVector(VectorHashKeyWrapper kw, currentKey[i] = keyWrapperBatch.getWritableKeyValue(kw, i, keyOutputWriters[i]); nulls[i] = currentKey[i] == null; } - currentValue.setFromOutput( + return currentValue.setFromOutput( MapJoinKey.serializeRow(output, currentKey, vectorKeyOIs, sortableSortOrders)); } @Override - public void setFromRow(Object row, List fields, + public JoinUtil.JoinResult setFromRow(Object row, List fields, List ois) throws HiveException { if (nulls == null) { nulls = new boolean[fields.size()]; @@ -361,17 +391,17 @@ public void setFromRow(Object row, List fields, currentKey[keyIndex] = fields.get(keyIndex).evaluate(row); nulls[keyIndex] = currentKey[keyIndex] == null; } - currentValue.setFromOutput( + return currentValue.setFromOutput( MapJoinKey.serializeRow(output, currentKey, ois, sortableSortOrders)); } @Override - public void setFromOther(ReusableGetAdaptor other) { + public JoinUtil.JoinResult setFromOther(ReusableGetAdaptor other) { assert other instanceof GetAdaptor; GetAdaptor other2 = (GetAdaptor)other; nulls = other2.nulls; currentKey = other2.currentKey; - currentValue.setFromOutput(other2.output); + return currentValue.setFromOutput(other2.output); } @Override @@ -422,13 +452,19 @@ public ReusableRowContainer() { clearRows(); } - public void setFromOutput(Output output) { + public JoinUtil.JoinResult setFromOutput(Output output) { if (refs == null) { refs = new ArrayList(0); } byte aliasFilter = hashMap.getValueRefs(output.getData(), output.getLength(), refs); this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter; this.dummyRow = null; + if (refs.isEmpty()) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } } public boolean isEmpty() { @@ -530,4 +566,9 @@ public static boolean isSupportedKey(ObjectInspector keyOi) { public void dumpMetrics() { hashMap.debugDumpMetrics(); } + + @Override + public boolean hasSpill() { + return false; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java index ff6e5d4..8d39df1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hive.ql.exec.persistence; +import java.io.IOException; import java.util.List; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -40,21 +42,21 @@ * Changes current rows to which adaptor is referring to the rows corresponding to * the key represented by a VHKW object, and writers and batch used to interpret it. */ - void setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, + JoinUtil.JoinResult setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException; /** * Changes current rows to which adaptor is referring to the rows corresponding to * the key represented by a row object, and fields and ois used to interpret it. */ - void setFromRow(Object row, List fields, List ois) + JoinUtil.JoinResult setFromRow(Object row, List fields, List ois) throws HiveException; /** * Changes current rows to which adaptor is referring to the rows corresponding to * the key that another adaptor has already deserialized via setFromVector/setFromRow. */ - void setFromOther(ReusableGetAdaptor other); + JoinUtil.JoinResult setFromOther(ReusableGetAdaptor other) throws HiveException; /** * Checks whether the current key has any nulls. @@ -77,7 +79,7 @@ void setFromRow(Object row, List fields, List { + private static final Log LOG = LogFactory.getLog(ObjectContainer.class); + + @VisibleForTesting + static final int IN_MEMORY_NUM_ROWS = 1024; + + private ROW[] writeBuffer; + private ROW[] readBuffer; + + private File parentFile; + private File tmpFile; + + private int readCursor = 0; + private int writeCursor = 0; + + // Indicate if the read buffer has data, for example, + // when in reading, data on disk could be pull in + private boolean readBufferUsed = false; + private int rowsInReadBuffer = 0; + + private Input input; + private Output output; + + private Kryo kryo; + private int size; + + public ObjectContainer() { + writeBuffer = (ROW[]) new Object[IN_MEMORY_NUM_ROWS]; + readBuffer = (ROW[]) new Object[IN_MEMORY_NUM_ROWS]; + for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) { + writeBuffer[i] = (ROW) new Object(); + readBuffer[i] = (ROW) new Object(); + } + kryo = Utilities.runtimeSerializationKryo.get(); + } + + private void switchBufferAndResetCursor() { + ROW[] tmp = readBuffer; + rowsInReadBuffer = writeCursor; + readBuffer = writeBuffer; + readBufferUsed = true; + readCursor = 0; + writeBuffer = tmp; + writeCursor = 0; + } + + private void setupOutput() throws IOException { + if (parentFile == null) { + while (true) { + parentFile = File.createTempFile("object-container", ""); + if (parentFile.delete() && parentFile.mkdir()) { + parentFile.deleteOnExit(); + break; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Retry creating tmp object-container directory..."); + } + } + } + + if (tmpFile == null || input != null) { + tmpFile = File.createTempFile("ObjectContainer", ".tmp", parentFile); + LOG.info("ObjectContainer created temp file " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + } + + FileOutputStream fos = null; + try { + fos = new FileOutputStream(tmpFile); + output = new Output(fos); + } finally { + if (output == null && fos != null) { + fos.close(); + } + } + } + + public void add(ROW row) { + if (writeCursor >= IN_MEMORY_NUM_ROWS) { // Write buffer is full + if (!readBufferUsed) { // Read buffer isn't used, switch buffer + switchBufferAndResetCursor(); + } else { + // Need to spill from write buffer to disk + try { + if (output == null) { + setupOutput(); + } + for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) { + kryo.writeClassAndObject(output, writeBuffer[i]); + } + writeCursor = 0; + } catch (Exception e) { + clear(); // Clean up the cache + throw new RuntimeException("Failed to spill rows to disk", e); + } + } + } + writeBuffer[writeCursor++] = row; + size++; + } + + public void clear() { + writeCursor = readCursor = rowsInReadBuffer = 0; + readBufferUsed = false; + + if (parentFile != null) { + if (input != null) { + try { + input.close(); + } catch (Throwable ignored) { + } + input = null; + } + if (output != null) { + try { + output.close(); + } catch (Throwable ignored) { + } + output = null; + } + try { + FileUtil.fullyDelete(parentFile); + } catch (Throwable ignored) { + } + parentFile = null; + tmpFile = null; + } + size = 0; + } + + public boolean hasNext() { + return readBufferUsed || writeCursor > 0; + } + + public ROW next() { + Preconditions.checkState(hasNext()); + if (!readBufferUsed) { + try { + if (input == null && output != null) { + // Close output stream if open + output.close(); + output = null; + + FileInputStream fis = null; + try { + fis = new FileInputStream(tmpFile); + input = new Input(fis); + } finally { + if (input == null && fis != null) { + fis.close(); + } + } + } + if (input != null) { + // Load next batch from disk + for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) { + readBuffer[i] = (ROW) kryo.readClassAndObject(input); + } + if (input.eof()) { + input.close(); + input = null; + } + rowsInReadBuffer = IN_MEMORY_NUM_ROWS; + readBufferUsed = true; + readCursor = 0; + } else if (writeCursor == 1) { + ROW row = kryo.copy(writeBuffer[0]); + writeBuffer[0] = null; + writeCursor = 0; + return row; + } else { + // No record on disk, more data in write buffer + switchBufferAndResetCursor(); + } + } catch (Exception e) { + clear(); // Clean up the cache + throw new RuntimeException("Failed to load rows from disk", e); + } + } + ROW row = kryo.copy(readBuffer[readCursor]); + readBuffer[readCursor] = null; + + if (++readCursor >= rowsInReadBuffer) { + readBufferUsed = false; + rowsInReadBuffer = 0; + readCursor = 0; + } + return row; + } + + public int size() { + return size; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 7402ba3..d44d1a7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -26,14 +26,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; -import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; +import org.apache.hadoop.hive.ql.exec.persistence.*; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.serde2.SerDeException; @@ -76,13 +70,21 @@ public void load( boolean useOptimizedTables = HiveConf.getBoolVar( hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE); + boolean useHybridGraceHashJoin = HiveConf.getBoolVar( + hconf, HiveConf.ConfVars.HIVEMAPJOINUSEHYBRIDGRACEHASHJOIN); boolean isFirstKey = true; TezCacheAccess tezCacheAccess = TezCacheAccess.createInstance(hconf); + // Force not to use hybrid grace hash join for n-way join + if (mapJoinTables.length > 2) { + useHybridGraceHashJoin = false; + } + for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; } + boolean hasSpill = false; String inputName = parentToInput.get(pos); LogicalInput input = tezContext.getInput(inputName); @@ -104,15 +106,19 @@ public void load( isFirstKey = false; Long keyCountObj = parentKeyCounts.get(pos); long keyCount = (keyCountObj == null) ? -1 : keyCountObj.longValue(); + if (desc.getParentDataSizes().size() == 0) { + useHybridGraceHashJoin = false; + } MapJoinTableContainer tableContainer = useOptimizedTables - ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage) + ? (useHybridGraceHashJoin ? new HybridHashTableContainer(hconf, valCtx, keyCount, memUsage, + desc.getParentDataSizes().get(pos)) + : new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage)) : new HashMapWrapper(hconf, keyCount); while (kvReader.next()) { tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(), valCtx, (Writable)kvReader.getCurrentValue()); } - tableContainer.seal(); mapJoinTables[pos] = tableContainer; } catch (IOException e) { @@ -126,7 +132,8 @@ public void load( LOG.info("Is this a bucket map join: " + desc.isBucketMapJoin()); // cache is disabled for bucket map join because of the same reason // given in loadHashTable in MapJoinOperator. - if (!desc.isBucketMapJoin()) { + // Also, only cache the input if there's no spilled partition in Hybrid Grace Hash Join case + if (!desc.isBucketMapJoin() && !mapJoinTables[pos].hasSpill()) { tezCacheAccess.registerCachedInput(inputName); LOG.info("Setting Input: " + inputName + " as cached"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index 2c8aee1..92f7bd9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; @@ -211,12 +212,13 @@ public void closeOp(boolean aborted) throws HiveException { if (!aborted && 0 < outputBatch.size) { flushOutput(); } + super.closeOp(aborted); } @Override - protected void setMapJoinKey(ReusableGetAdaptor dest, Object row, byte alias) + protected JoinUtil.JoinResult setMapJoinKey(ReusableGetAdaptor dest, Object row, byte alias) throws HiveException { - dest.setFromVector(keyValues[batchIndex], keyOutputWriters, keyWrapperBatch); + return dest.setFromVector(keyValues[batchIndex], keyOutputWriters, keyWrapperBatch); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index b184cf4..d3e270d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -122,12 +122,17 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, } MapJoinDesc joinConf = mapJoinOp.getConf(); long keyCount = Long.MAX_VALUE, rowCount = Long.MAX_VALUE, bucketCount = 1; + long tableSize = Long.MAX_VALUE; Statistics stats = parentRS.getStatistics(); if (stats != null) { keyCount = rowCount = stats.getNumRows(); if (keyCount <= 0) { keyCount = rowCount = Long.MAX_VALUE; } + tableSize = stats.getDataSize(); + if (tableSize <= 0) { + tableSize = Long.MAX_VALUE; + } ArrayList keyCols = parentRS.getConf().getOutputKeyColumnNames(); if (keyCols != null && !keyCols.isEmpty()) { // See if we can arrive at a smaller number using distinct stats from key columns. @@ -155,6 +160,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, // We cannot obtain a better estimate without CustomPartitionVertex providing it // to us somehow; in which case using statistics would be completely unnecessary. keyCount /= bucketCount; + tableSize /= bucketCount; } } } @@ -164,6 +170,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, if (keyCount != Long.MAX_VALUE) { joinConf.getParentKeyCounts().put(pos, keyCount); } + if (tableSize != Long.MAX_VALUE) { + joinConf.getParentDataSizes().put(pos, tableSize); + } int numBuckets = -1; EdgeType edgeType = EdgeType.BROADCAST_EDGE; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index aca4273..e987f6a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -1296,6 +1296,9 @@ private void fixupParentChildOperators(Operator op, switch (op.getType()) { case MAPJOIN: + // Disable Hybrid Grace Hash Join when vectorization is in effect, for now + HiveConf.setBoolVar(physicalContext.getConf(), + HiveConf.ConfVars.HIVEMAPJOINUSEHYBRIDGRACEHASHJOIN, false); case GROUPBY: case FILTER: case SELECT: diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 9fdd417..ceb95a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -52,6 +52,7 @@ // TODO: should these rather be arrays? private Map parentToInput = new HashMap(); private Map parentKeyCounts = new HashMap(); + private Map parentDataSizes = new HashMap(); // for tez. used to remember which type of a Bucket Map Join this is. private boolean customBucketMapJoin; @@ -90,6 +91,7 @@ public MapJoinDesc(MapJoinDesc clone) { this.dumpFilePrefix = clone.dumpFilePrefix; this.parentToInput = clone.parentToInput; this.parentKeyCounts = clone.parentKeyCounts; + this.parentDataSizes = clone.parentDataSizes; this.customBucketMapJoin = clone.customBucketMapJoin; } @@ -136,6 +138,14 @@ public void setParentToInput(Map parentToInput) { return parentKeyCounts; } + public Map getParentDataSizes() { + return parentDataSizes; + } + + public void setParentDataSizes(Map parentDataSizes) { + this.parentDataSizes = parentDataSizes; + } + @Explain(displayName = "Estimated key counts", normalExplain = false) public String getKeyCountsExplainDesc() { StringBuilder result = null; diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java index b3582b2..04c7fc4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java @@ -49,10 +49,10 @@ public void testCapacityValidation() { public void testPutGetOne() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, LOAD_FACTOR, WB_SIZE); RandomKvSource kv = new RandomKvSource(0, 0); - map.put(kv); + map.put(kv, -1); verifyResults(map, kv.getLastKey(), kv.getLastValue()); kv = new RandomKvSource(10, 100); - map.put(kv); + map.put(kv, -1); verifyResults(map, kv.getLastKey(), kv.getLastValue()); } @@ -60,12 +60,12 @@ public void testPutGetOne() throws Exception { public void testPutGetMultiple() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, LOAD_FACTOR, WB_SIZE); RandomKvSource kv = new RandomKvSource(0, 100); - map.put(kv); + map.put(kv, -1); verifyResults(map, kv.getLastKey(), kv.getLastValue()); FixedKeyKvSource kv2 = new FixedKeyKvSource(kv.getLastKey(), 0, 100); kv2.values.add(kv.getLastValue()); for (int i = 0; i < 3; ++i) { - map.put(kv2); + map.put(kv2, -1); verifyResults(map, kv2.key, kv2.values.toArray(new byte[kv2.values.size()][])); } } @@ -74,11 +74,11 @@ public void testPutGetMultiple() throws Exception { public void testGetNonExistent() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, LOAD_FACTOR, WB_SIZE); RandomKvSource kv = new RandomKvSource(1, 100); - map.put(kv); + map.put(kv, -1); byte[] key = kv.getLastKey(); key[0] = (byte)(key[0] + 1); FixedKeyKvSource kv2 = new FixedKeyKvSource(kv.getLastKey(), 0, 100); - map.put(kv2); + map.put(kv2, -1); key[0] = (byte)(key[0] + 1); List results = new ArrayList(0); map.getValueRefs(key, key.length, results); @@ -93,7 +93,7 @@ public void testPutWithFullMap() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, 1f, WB_SIZE); UniqueKeysKvSource kv = new UniqueKeysKvSource(); for (int i = 0; i < CAPACITY; ++i) { - map.put(kv); + map.put(kv, -1); } for (int i = 0; i < kv.keys.size(); ++i) { verifyResults(map, kv.keys.get(i), kv.values.get(i)); @@ -111,7 +111,7 @@ public void testExpand() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(1, 0.0000001f, WB_SIZE); UniqueKeysKvSource kv = new UniqueKeysKvSource(); for (int i = 0; i < 18; ++i) { - map.put(kv); + map.put(kv, -1); for (int j = 0; j <= i; ++j) { verifyResults(map, kv.keys.get(j), kv.values.get(j)); } diff --git ql/src/test/queries/clientpositive/hybridhashjoin.q ql/src/test/queries/clientpositive/hybridhashjoin.q new file mode 100644 index 0000000..6742ad4 --- /dev/null +++ ql/src/test/queries/clientpositive/hybridhashjoin.q @@ -0,0 +1,72 @@ +set hive.auto.convert.join=true; +set hive.mapjoin.hybridgrace.hashtable=true; +set hive.auto.convert.join.noconditionaltask.size=20000000; +set hive.mapjoin.optimized.hashtable.wbsize=10485760; + +DROP TABLE IF EXISTS dimension; +CREATE TABLE dimension (d1 int, d2 string); + +DROP TABLE IF EXISTS fact; +CREATE TABLE fact (f1 int, f2 int); + +DROP TABLE IF EXISTS fact_part; +CREATE TABLE fact_part (f1 int, f2 int) PARTITIONED BY (range int); + +INSERT INTO TABLE dimension VALUES (NULL, NULL), + (1,"a"), (2,"b"), (3,"c"), (4,"d"), (5,"e"), + (6,"f"), (7,"g"), (8,"h"), (9,"i"), (10,"j"), + (11,"k"), (12,"l"), (13,"m"), (14,"n"), (15,"o"), + (16,"p"), (17,"q"), (18,"r"), (19,"s"), (20,"t"); + +INSERT INTO TABLE fact VALUES (NULL, NULL), + (1,2), (2,2), (3,2), (4,2), (5,2), (6,2), (7,2), (8,2), (9,2), (10,2), + (11,2), (12,2), (13,2), (14,2), (15,2), (16,2), (17,2), (18,2), (19,2), (20,2), + (21,2), (22,2), (23,2), (24,2), (25,2), (26,2), (27,2), (28,2), (29,2), (30,2), + (31,2), (32,2), (33,2), (34,2), (35,2), (36,2), (37,2), (38,2), (39,2), (40,2), + (41,2), (42,2), (43,2), (44,2), (45,2), (46,2), (47,2), (48,2), (49,2), (50,2), + (51,2), (52,2), (53,2), (54,2), (55,2), (56,2), (57,2), (58,2), (59,2), (60,2), + (61,2), (62,2), (63,2), (64,2), (65,2), (66,2), (67,2), (68,2), (69,2), (70,2), + (71,2), (72,2), (73,2), (74,2), (75,2), (76,2), (77,2), (78,2), (79,2), (80,2), + (81,2), (82,2), (83,2), (84,2), (85,2), (86,2), (87,2), (88,2), (89,2), (90,2), + (91,2), (92,2), (93,2), (94,2), (95,2), (96,2), (97,2), (98,2), (99,2), (100,2); + +INSERT OVERWRITE TABLE fact_part partition(range=20) +SELECT * FROM fact WHERE fact.f1 <= 20; + +INSERT OVERWRITE TABLE fact_part partition(range=40) +SELECT * FROM fact WHERE fact.f1 > 20 AND fact.f1 <= 40; + +INSERT OVERWRITE TABLE fact_part partition(range=60) +SELECT * FROM fact WHERE fact.f1 > 40 AND fact.f1 <= 60; + +INSERT OVERWRITE TABLE fact_part partition(range=80) +SELECT * FROM fact WHERE fact.f1 > 60 AND fact.f1 <= 80; + +INSERT OVERWRITE TABLE fact_part partition(range=100) +SELECT * FROM fact WHERE fact.f1 > 80 AND fact.f1 <= 100; + + +EXPLAIN SELECT * FROM dimension d, fact f WHERE d.d1 = f.f1; +SELECT * FROM dimension d, fact f WHERE d.d1 = f.f1; + +EXPLAIN SELECT * FROM dimension d, fact_part fp WHERE d.d1 = fp.f1; +SELECT * FROM dimension d, fact_part fp WHERE d.d1 = fp.f1; + +DROP TABLE dimension; +DROP TABLE fact; +DROP TABLE fact_part; + +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +; + +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +; diff --git ql/src/test/results/clientpositive/tez/hybridhashjoin.q.out ql/src/test/results/clientpositive/tez/hybridhashjoin.q.out new file mode 100644 index 0000000..36207ca --- /dev/null +++ ql/src/test/results/clientpositive/tez/hybridhashjoin.q.out @@ -0,0 +1,479 @@ +PREHOOK: query: DROP TABLE IF EXISTS dimension +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS dimension +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE dimension (d1 int, d2 string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dimension +POSTHOOK: query: CREATE TABLE dimension (d1 int, d2 string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dimension +PREHOOK: query: DROP TABLE IF EXISTS fact +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS fact +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE fact (f1 int, f2 int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@fact +POSTHOOK: query: CREATE TABLE fact (f1 int, f2 int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@fact +PREHOOK: query: DROP TABLE IF EXISTS fact_part +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS fact_part +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE fact_part (f1 int, f2 int) PARTITIONED BY (range int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@fact_part +POSTHOOK: query: CREATE TABLE fact_part (f1 int, f2 int) PARTITIONED BY (range int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@fact_part +PREHOOK: query: INSERT INTO TABLE dimension VALUES (NULL, NULL), + (1,"a"), (2,"b"), (3,"c"), (4,"d"), (5,"e"), + (6,"f"), (7,"g"), (8,"h"), (9,"i"), (10,"j"), + (11,"k"), (12,"l"), (13,"m"), (14,"n"), (15,"o"), + (16,"p"), (17,"q"), (18,"r"), (19,"s"), (20,"t") +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@dimension +POSTHOOK: query: INSERT INTO TABLE dimension VALUES (NULL, NULL), + (1,"a"), (2,"b"), (3,"c"), (4,"d"), (5,"e"), + (6,"f"), (7,"g"), (8,"h"), (9,"i"), (10,"j"), + (11,"k"), (12,"l"), (13,"m"), (14,"n"), (15,"o"), + (16,"p"), (17,"q"), (18,"r"), (19,"s"), (20,"t") +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@dimension +POSTHOOK: Lineage: dimension.d1 EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: dimension.d2 SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: INSERT INTO TABLE fact VALUES (NULL, NULL), + (1,2), (2,2), (3,2), (4,2), (5,2), (6,2), (7,2), (8,2), (9,2), (10,2), + (11,2), (12,2), (13,2), (14,2), (15,2), (16,2), (17,2), (18,2), (19,2), (20,2), + (21,2), (22,2), (23,2), (24,2), (25,2), (26,2), (27,2), (28,2), (29,2), (30,2), + (31,2), (32,2), (33,2), (34,2), (35,2), (36,2), (37,2), (38,2), (39,2), (40,2), + (41,2), (42,2), (43,2), (44,2), (45,2), (46,2), (47,2), (48,2), (49,2), (50,2), + (51,2), (52,2), (53,2), (54,2), (55,2), (56,2), (57,2), (58,2), (59,2), (60,2), + (61,2), (62,2), (63,2), (64,2), (65,2), (66,2), (67,2), (68,2), (69,2), (70,2), + (71,2), (72,2), (73,2), (74,2), (75,2), (76,2), (77,2), (78,2), (79,2), (80,2), + (81,2), (82,2), (83,2), (84,2), (85,2), (86,2), (87,2), (88,2), (89,2), (90,2), + (91,2), (92,2), (93,2), (94,2), (95,2), (96,2), (97,2), (98,2), (99,2), (100,2) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__2 +PREHOOK: Output: default@fact +POSTHOOK: query: INSERT INTO TABLE fact VALUES (NULL, NULL), + (1,2), (2,2), (3,2), (4,2), (5,2), (6,2), (7,2), (8,2), (9,2), (10,2), + (11,2), (12,2), (13,2), (14,2), (15,2), (16,2), (17,2), (18,2), (19,2), (20,2), + (21,2), (22,2), (23,2), (24,2), (25,2), (26,2), (27,2), (28,2), (29,2), (30,2), + (31,2), (32,2), (33,2), (34,2), (35,2), (36,2), (37,2), (38,2), (39,2), (40,2), + (41,2), (42,2), (43,2), (44,2), (45,2), (46,2), (47,2), (48,2), (49,2), (50,2), + (51,2), (52,2), (53,2), (54,2), (55,2), (56,2), (57,2), (58,2), (59,2), (60,2), + (61,2), (62,2), (63,2), (64,2), (65,2), (66,2), (67,2), (68,2), (69,2), (70,2), + (71,2), (72,2), (73,2), (74,2), (75,2), (76,2), (77,2), (78,2), (79,2), (80,2), + (81,2), (82,2), (83,2), (84,2), (85,2), (86,2), (87,2), (88,2), (89,2), (90,2), + (91,2), (92,2), (93,2), (94,2), (95,2), (96,2), (97,2), (98,2), (99,2), (100,2) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__2 +POSTHOOK: Output: default@fact +POSTHOOK: Lineage: fact.f1 EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: fact.f2 EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: INSERT OVERWRITE TABLE fact_part partition(range=20) +SELECT * FROM fact WHERE fact.f1 <= 20 +PREHOOK: type: QUERY +PREHOOK: Input: default@fact +PREHOOK: Output: default@fact_part@range=20 +POSTHOOK: query: INSERT OVERWRITE TABLE fact_part partition(range=20) +SELECT * FROM fact WHERE fact.f1 <= 20 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@fact +POSTHOOK: Output: default@fact_part@range=20 +POSTHOOK: Lineage: fact_part PARTITION(range=20).f1 SIMPLE [(fact)fact.FieldSchema(name:f1, type:int, comment:null), ] +POSTHOOK: Lineage: fact_part PARTITION(range=20).f2 SIMPLE [(fact)fact.FieldSchema(name:f2, type:int, comment:null), ] +PREHOOK: query: INSERT OVERWRITE TABLE fact_part partition(range=40) +SELECT * FROM fact WHERE fact.f1 > 20 AND fact.f1 <= 40 +PREHOOK: type: QUERY +PREHOOK: Input: default@fact +PREHOOK: Output: default@fact_part@range=40 +POSTHOOK: query: INSERT OVERWRITE TABLE fact_part partition(range=40) +SELECT * FROM fact WHERE fact.f1 > 20 AND fact.f1 <= 40 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@fact +POSTHOOK: Output: default@fact_part@range=40 +POSTHOOK: Lineage: fact_part PARTITION(range=40).f1 SIMPLE [(fact)fact.FieldSchema(name:f1, type:int, comment:null), ] +POSTHOOK: Lineage: fact_part PARTITION(range=40).f2 SIMPLE [(fact)fact.FieldSchema(name:f2, type:int, comment:null), ] +PREHOOK: query: INSERT OVERWRITE TABLE fact_part partition(range=60) +SELECT * FROM fact WHERE fact.f1 > 40 AND fact.f1 <= 60 +PREHOOK: type: QUERY +PREHOOK: Input: default@fact +PREHOOK: Output: default@fact_part@range=60 +POSTHOOK: query: INSERT OVERWRITE TABLE fact_part partition(range=60) +SELECT * FROM fact WHERE fact.f1 > 40 AND fact.f1 <= 60 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@fact +POSTHOOK: Output: default@fact_part@range=60 +POSTHOOK: Lineage: fact_part PARTITION(range=60).f1 SIMPLE [(fact)fact.FieldSchema(name:f1, type:int, comment:null), ] +POSTHOOK: Lineage: fact_part PARTITION(range=60).f2 SIMPLE [(fact)fact.FieldSchema(name:f2, type:int, comment:null), ] +PREHOOK: query: INSERT OVERWRITE TABLE fact_part partition(range=80) +SELECT * FROM fact WHERE fact.f1 > 60 AND fact.f1 <= 80 +PREHOOK: type: QUERY +PREHOOK: Input: default@fact +PREHOOK: Output: default@fact_part@range=80 +POSTHOOK: query: INSERT OVERWRITE TABLE fact_part partition(range=80) +SELECT * FROM fact WHERE fact.f1 > 60 AND fact.f1 <= 80 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@fact +POSTHOOK: Output: default@fact_part@range=80 +POSTHOOK: Lineage: fact_part PARTITION(range=80).f1 SIMPLE [(fact)fact.FieldSchema(name:f1, type:int, comment:null), ] +POSTHOOK: Lineage: fact_part PARTITION(range=80).f2 SIMPLE [(fact)fact.FieldSchema(name:f2, type:int, comment:null), ] +PREHOOK: query: INSERT OVERWRITE TABLE fact_part partition(range=100) +SELECT * FROM fact WHERE fact.f1 > 80 AND fact.f1 <= 100 +PREHOOK: type: QUERY +PREHOOK: Input: default@fact +PREHOOK: Output: default@fact_part@range=100 +POSTHOOK: query: INSERT OVERWRITE TABLE fact_part partition(range=100) +SELECT * FROM fact WHERE fact.f1 > 80 AND fact.f1 <= 100 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@fact +POSTHOOK: Output: default@fact_part@range=100 +POSTHOOK: Lineage: fact_part PARTITION(range=100).f1 SIMPLE [(fact)fact.FieldSchema(name:f1, type:int, comment:null), ] +POSTHOOK: Lineage: fact_part PARTITION(range=100).f2 SIMPLE [(fact)fact.FieldSchema(name:f2, type:int, comment:null), ] +PREHOOK: query: EXPLAIN SELECT * FROM dimension d, fact f WHERE d.d1 = f.f1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT * FROM dimension d, fact f WHERE d.d1 = f.f1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 2 <- Map 1 (BROADCAST_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: d + Statistics: Num rows: 21 Data size: 76 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: d1 is not null (type: boolean) + Statistics: Num rows: 11 Data size: 39 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: d1 (type: int) + sort order: + + Map-reduce partition columns: d1 (type: int) + Statistics: Num rows: 11 Data size: 39 Basic stats: COMPLETE Column stats: NONE + value expressions: d2 (type: string) + Map 2 + Map Operator Tree: + TableScan + alias: f + Statistics: Num rows: 101 Data size: 397 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: f1 is not null (type: boolean) + Statistics: Num rows: 51 Data size: 200 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 d1 (type: int) + 1 f1 (type: int) + outputColumnNames: _col0, _col1, _col5, _col6 + input vertices: + 0 Map 1 + Statistics: Num rows: 56 Data size: 220 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col0 = _col5) (type: boolean) + Statistics: Num rows: 28 Data size: 110 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: int), _col1 (type: string), _col5 (type: int), _col6 (type: int) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 28 Data size: 110 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 28 Data size: 110 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT * FROM dimension d, fact f WHERE d.d1 = f.f1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dimension +PREHOOK: Input: default@fact +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM dimension d, fact f WHERE d.d1 = f.f1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dimension +POSTHOOK: Input: default@fact +#### A masked pattern was here #### +4 d 4 2 +5 e 5 2 +9 i 9 2 +10 j 10 2 +12 l 12 2 +14 n 14 2 +16 p 16 2 +1 a 1 2 +2 b 2 2 +3 c 3 2 +6 f 6 2 +7 g 7 2 +8 h 8 2 +11 k 11 2 +13 m 13 2 +15 o 15 2 +17 q 17 2 +18 r 18 2 +19 s 19 2 +20 t 20 2 +PREHOOK: query: EXPLAIN SELECT * FROM dimension d, fact_part fp WHERE d.d1 = fp.f1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT * FROM dimension d, fact_part fp WHERE d.d1 = fp.f1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 2 <- Map 1 (BROADCAST_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: d + Statistics: Num rows: 21 Data size: 76 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: d1 is not null (type: boolean) + Statistics: Num rows: 11 Data size: 39 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: d1 (type: int) + sort order: + + Map-reduce partition columns: d1 (type: int) + Statistics: Num rows: 11 Data size: 39 Basic stats: COMPLETE Column stats: NONE + value expressions: d2 (type: string) + Map 2 + Map Operator Tree: + TableScan + alias: fp + Statistics: Num rows: 100 Data size: 392 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: f1 is not null (type: boolean) + Statistics: Num rows: 50 Data size: 196 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 d1 (type: int) + 1 f1 (type: int) + outputColumnNames: _col0, _col1, _col5, _col6, _col7 + input vertices: + 0 Map 1 + Statistics: Num rows: 55 Data size: 215 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col0 = _col5) (type: boolean) + Statistics: Num rows: 27 Data size: 105 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: int), _col1 (type: string), _col5 (type: int), _col6 (type: int), _col7 (type: int) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 27 Data size: 105 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 27 Data size: 105 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT * FROM dimension d, fact_part fp WHERE d.d1 = fp.f1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dimension +PREHOOK: Input: default@fact_part +PREHOOK: Input: default@fact_part@range=100 +PREHOOK: Input: default@fact_part@range=20 +PREHOOK: Input: default@fact_part@range=40 +PREHOOK: Input: default@fact_part@range=60 +PREHOOK: Input: default@fact_part@range=80 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM dimension d, fact_part fp WHERE d.d1 = fp.f1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dimension +POSTHOOK: Input: default@fact_part +POSTHOOK: Input: default@fact_part@range=100 +POSTHOOK: Input: default@fact_part@range=20 +POSTHOOK: Input: default@fact_part@range=40 +POSTHOOK: Input: default@fact_part@range=60 +POSTHOOK: Input: default@fact_part@range=80 +#### A masked pattern was here #### +4 d 4 2 20 +5 e 5 2 20 +9 i 9 2 20 +10 j 10 2 20 +12 l 12 2 20 +14 n 14 2 20 +16 p 16 2 20 +1 a 1 2 20 +2 b 2 2 20 +3 c 3 2 20 +6 f 6 2 20 +7 g 7 2 20 +8 h 8 2 20 +11 k 11 2 20 +13 m 13 2 20 +15 o 15 2 20 +17 q 17 2 20 +18 r 18 2 20 +19 s 19 2 20 +20 t 20 2 20 +PREHOOK: query: DROP TABLE dimension +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@dimension +PREHOOK: Output: default@dimension +POSTHOOK: query: DROP TABLE dimension +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@dimension +POSTHOOK: Output: default@dimension +PREHOOK: query: DROP TABLE fact +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@fact +PREHOOK: Output: default@fact +POSTHOOK: query: DROP TABLE fact +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@fact +POSTHOOK: Output: default@fact +PREHOOK: query: DROP TABLE fact_part +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@fact_part +PREHOOK: Output: default@fact_part +POSTHOOK: query: DROP TABLE fact_part +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@fact_part +POSTHOOK: Output: default@fact_part +PREHOOK: query: explain +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Left Outer Join0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + input vertices: + 1 Map 3 + Statistics: Num rows: 13516 Data size: 2906160 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +3155128 diff --git serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java index bed4d0a..a00104a 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java +++ serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java @@ -18,20 +18,21 @@ package org.apache.hadoop.hive.serde2; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.util.hash.MurmurHash; /** * The structure storing arbitrary amount of data as a set of fixed-size byte buffers. * Maintains read and write pointers for convenient single-threaded writing/reading. */ -public final class WriteBuffers implements RandomAccessOutput { +public final class WriteBuffers implements RandomAccessOutput, Serializable { + private static final long serialVersionUID = 1L; private final ArrayList writeBuffers = new ArrayList(1); /** Buffer size in writeBuffers */ private final int wbSize; @@ -399,12 +400,14 @@ private boolean isAllInOneWriteBuffer(int length) { return currentWriteOffset + length <= wbSize; } + /** Truncates the buffers at currentWriteOffset. */ public void seal() { if (currentWriteOffset < (wbSize * 0.8)) { // arbitrary byte[] smallerBuffer = new byte[currentWriteOffset]; System.arraycopy(currentWriteBuffer, 0, smallerBuffer, 0, currentWriteOffset); writeBuffers.set(currentWriteBufferIndex, smallerBuffer); } + // Remove the buffers after current one. if (currentWriteBufferIndex + 1 < writeBuffers.size()) { writeBuffers.subList(currentWriteBufferIndex + 1, writeBuffers.size()).clear(); } @@ -479,7 +482,7 @@ public void writeInt(long offset, int v) { } // Lifted from org.apache.hadoop.util.hash.MurmurHash... but supports offset. - private static int murmurHash(byte[] data, int offset, int length) { + public static int murmurHash(byte[] data, int offset, int length) { int m = 0x5bd1e995; int r = 24; @@ -528,4 +531,8 @@ private static int murmurHash(byte[] data, int offset, int length) { return h; } + + public long size() { + return writeBuffers.size() * (long) wbSize; + } } \ No newline at end of file diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java index 789e5a6..c373047 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java @@ -126,7 +126,8 @@ public void initialize(Configuration job, Properties tbl) // Create the ObjectInspectors for the fields cachedObjectInspector = LazyFactory.createLazyStructInspector(serdeParams - .getColumnNames(), serdeParams.getColumnTypes(), serdeParams); + .getColumnNames(), serdeParams.getColumnTypes(), + new LazyObjectInspectorParametersImpl(serdeParams)); cachedLazyStruct = (LazyStruct) LazyFactory .createLazyObject(cachedObjectInspector);