diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f4d44ee..655e825 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -710,6 +710,11 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", true, "Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,\n" + "because memory-optimized hashtable cannot be serialized."), + HIVEUSEHYBRIDGRACEHASHJOIN("hive.mapjoin.hybridgrace.hashtable", false, "Whether to use hybrid" + + "grace hash join as the join method for mapjoin."), + HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ("hive.mapjoin.hybridgrace.memcheckfrequency", 1024, "For " + + "hybrid grace hash join, how often (how many rows apart) we check if memory is full. " + + "This number should be power of 2."), HIVEHASHTABLEWBSIZE("hive.mapjoin.optimized.hashtable.wbsize", 10 * 1024 * 1024, "Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a chain of buffers to\n" + "store data. This is one buffer size. HT may be slightly faster if this is larger, but for small\n" + diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 5d9f1a2..288270e 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -287,6 +287,7 @@ minitez.query.files=bucket_map_join_tez1.q,\ bucket_map_join_tez2.q,\ dynamic_partition_pruning.q,\ dynamic_partition_pruning_2.q,\ + hybridhashjoin.q,\ mapjoin_decimal.q,\ lvj_mapjoin.q, \ mrr.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java index 4013b7f..6c6556b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java @@ -48,6 +48,15 @@ public class JoinUtil { + /** + * Represents the join result between two tables + */ + public static enum JoinResult { + MATCH, // A match is found + NOMATCH, // No match is found, and the current row will be dropped + SPILL // The current row has been spilled to disk, as the join is postponed + } + public static List[] getObjectInspectorsFromEvaluators( List[] exprEntries, ObjectInspector[] inputObjInspector, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index f99aaa8..a7a6da3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -31,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.HashTableLoaderFactory; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; @@ -39,9 +41,15 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; import org.apache.hadoop.hive.ql.exec.persistence.UnwrapRowContainer; +import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.KeyValueContainer; +import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; @@ -51,8 +59,13 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; +import static org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition; +import static org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper; + /** * Map side Join operator implementation. */ @@ -74,6 +87,12 @@ private transient ReusableGetAdaptor[] hashMapRowGetters; private UnwrapRowContainer[] unwrapContainer; + private transient Configuration hconf; + private transient boolean useHybridGraceHashJoin; // whether Hybrid Grace Hash Join is enabled + private transient boolean hybridMapJoinLeftover; // whether there's spilled data to be processed + private transient MapJoinBytesTableContainer currentSmallTable; // reloaded hashmap from disk + private transient int tag; // big table alias + private transient int smallTable; // small table alias public MapJoinOperator() { } @@ -99,6 +118,7 @@ public void startGroup() throws HiveException { @Override protected Collection> initializeOp(Configuration hconf) throws HiveException { + this.hconf = hconf; unwrapContainer = new UnwrapRowContainer[conf.getTagLength()]; Collection> result = super.initializeOp(hconf); @@ -121,13 +141,15 @@ public void startGroup() throws HiveException { mapJoinTables = new MapJoinTableContainer[tagLen]; mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen]; hashTblInitedOnce = false; + useHybridGraceHashJoin = + HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN); generateMapMetaData(); final ExecMapperContext mapContext = getExecContext(); final MapredContext mrContext = MapredContext.get(); - if (!conf.isBucketMapJoin()) { + if (!conf.isBucketMapJoin() && !useHybridGraceHashJoin) { /* * The issue with caching in case of bucket map join is that different tasks * process different buckets and if the container is reused to join a different bucket, @@ -267,9 +289,9 @@ public void cleanUpInputFileChangedOp() throws HiveException { loadHashTable(getExecContext(), MapredContext.get()); } - protected void setMapJoinKey( + protected JoinUtil.JoinResult setMapJoinKey( ReusableGetAdaptor dest, Object row, byte alias) throws HiveException { - dest.setFromRow(row, joinKeys[alias], joinKeysObjectInspectors[alias]); + return dest.setFromRow(row, joinKeys[alias], joinKeysObjectInspectors[alias]); } protected MapJoinKey getRefKey(byte alias) { @@ -286,6 +308,18 @@ protected MapJoinKey getRefKey(byte alias) { @Override public void process(Object row, int tag) throws HiveException { + this.tag = tag; + + // As we're calling processOp again to process the leftover triplets, we know the "row" is + // coming from the on-disk matchfile. We need to recreate hashMapRowGetter against new hashtable + if (hybridMapJoinLeftover) { + assert hashMapRowGetters != null; + if (hashMapRowGetters[smallTable] == null) { + MapJoinKey refKey = getRefKey((byte) tag); + hashMapRowGetters[smallTable] = currentSmallTable.createGetter(refKey); + } + } + try { alias = (byte) tag; if (hashMapRowGetters == null) { @@ -304,14 +338,16 @@ public void process(Object row, int tag) throws HiveException { boolean joinNeeded = false; for (byte pos = 0; pos < order.length; pos++) { if (pos != alias) { + smallTable = pos; // record small table alias + JoinUtil.JoinResult joinResult; ReusableGetAdaptor adaptor; if (firstSetKey == null) { adaptor = firstSetKey = hashMapRowGetters[pos]; - setMapJoinKey(firstSetKey, row, alias); + joinResult = setMapJoinKey(firstSetKey, row, alias); } else { // Keys for all tables are the same, so only the first has to deserialize them. adaptor = hashMapRowGetters[pos]; - adaptor.setFromOther(firstSetKey); + joinResult = adaptor.setFromOther(firstSetKey); } MapJoinRowContainer rowContainer = adaptor.getCurrentRows(); if (rowContainer != null && unwrapContainer[pos] != null) { @@ -321,8 +357,13 @@ public void process(Object row, int tag) throws HiveException { // there is no join-value or join-key has all null elements if (rowContainer == null || firstSetKey.hasAnyNulls(fieldCount, nullsafes)) { if (!noOuterJoin) { - joinNeeded = true; - storage[pos] = dummyObjVectors[pos]; + // For Hybrid Grace Hash Join, during the 1st round processing, + // we only keep the LEFT side if the row is not spilled + if (!useHybridGraceHashJoin || hybridMapJoinLeftover || + (!hybridMapJoinLeftover && joinResult != JoinUtil.JoinResult.SPILL)) { + joinNeeded = true; + storage[pos] = dummyObjVectors[pos]; + } } else { storage[pos] = emptyList; } @@ -331,6 +372,10 @@ public void process(Object row, int tag) throws HiveException { storage[pos] = rowContainer.copy(); aliasFilterTags[pos] = rowContainer.getAliasFilter(); } + // Spill the big table rows into appropriate partition + if (joinResult == JoinUtil.JoinResult.SPILL) { + spillBigTableRow(mapJoinTables[pos], row); + } } } if (joinNeeded) { @@ -354,11 +399,60 @@ public void process(Object row, int tag) throws HiveException { } } + /** + * Postpone processing the big table row temporarily by spilling it to a row container + * @param hybridHtContainer Hybrid hashtable container + * @param row big table row + */ + private void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object row) { + HybridHashTableContainer ht = (HybridHashTableContainer) hybridHtContainer; + int partitionId = ht.getToSpillPartitionId(); + HashPartition hp = ht.getHashPartitions()[partitionId]; + ObjectContainer bigTable = hp.getMatchfileObjContainer(); + bigTable.add(row); + } + @Override public void closeOp(boolean abort) throws HiveException { for (MapJoinTableContainer tableContainer : mapJoinTables) { if (tableContainer != null) { tableContainer.dumpMetrics(); + + if (tableContainer instanceof HybridHashTableContainer) { + HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer; + hybridHtContainer.dumpStats(); + + HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions(); + // Clear all in memory partitions first + for (int i = 0; i < hashPartitions.length; i++) { + if (!hashPartitions[i].isHashMapOnDisk()) { + hybridHtContainer.setTotalInMemRowCount( + hybridHtContainer.getTotalInMemRowCount() - + hashPartitions[i].getHashMapFromMemory().getNumValues()); + hashPartitions[i].getHashMapFromMemory().clear(); + } + } + assert hybridHtContainer.getTotalInMemRowCount() == 0; + + for (int i = 0; i < hashPartitions.length; i++) { + if (hashPartitions[i].isHashMapOnDisk()) { + // Recursively process on-disk triplets (hash partition, sidefile, matchfile) + try { + hybridMapJoinLeftover = true; + hashMapRowGetters[smallTable] = null; + continueProcess(hashPartitions[i], hybridHtContainer); + } catch (IOException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } catch (SerDeException e) { + e.printStackTrace(); + } + } + hybridMapJoinLeftover = false; + currentSmallTable = null; + } + } } } if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) @@ -375,6 +469,84 @@ public void closeOp(boolean abort) throws HiveException { } /** + * Continue processing each pair of spilled hashtable and big table row container, + * by bringing them back to memory and calling process() again. + * @param partition hash partition to process + * @param hybridHtContainer Hybrid hashtable container + * @throws HiveException + * @throws IOException + * @throws ClassNotFoundException + * @throws SerDeException + */ + private void continueProcess(HashPartition partition, HybridHashTableContainer hybridHtContainer) + throws HiveException, IOException, ClassNotFoundException, SerDeException { + reloadHashTable(partition, hybridHtContainer); + // Iterate thru the on-disk matchfile, and feed processOp with leftover rows + ObjectContainer bigTable = partition.getMatchfileObjContainer(); + while (bigTable.hasNext()) { + Object row = bigTable.next(); + process(row, tag); + } + bigTable.clear(); + } + + /** + * Reload hashtable from the hash partition. + * It can have two steps: + * 1) Deserialize a serialized hash table, and + * 2) Merge every key/value pair from small table container into the hash table + * @param partition hash partition to process + * @param hybridHtContainer Hybrid hashtable container + * @throws IOException + * @throws ClassNotFoundException + * @throws HiveException + * @throws SerDeException + */ + private void reloadHashTable(HashPartition partition, + HybridHashTableContainer hybridHtContainer) + throws IOException, ClassNotFoundException, HiveException, SerDeException { + // Deserialize the on-disk hash table + // We're sure this part is smaller than memory limit + BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk(); + int rowCount = restoredHashMap.getNumValues(); + LOG.info("Hybrid Grace Hash Join: Deserializing spilled hash partition..."); + LOG.info("Hybrid Grace Hash Join: Number of rows restored from hashmap: " + rowCount); + + // Merge the sidefile into the newly created hash table + // This is where the spilling may happen again + KeyValueContainer kvContainer = partition.getSidefileKVContainer(); + rowCount += kvContainer.size(); + LOG.info("Hybrid Grace Hash Join: Number of rows restored from KeyValueContainer: " + + kvContainer.size()); + + // If based on the new key count, keyCount is smaller than a threshold, + // then just load the entire restored hashmap into memory. + // The size of deserialized partition shouldn't exceed half of memory limit + if (rowCount * hybridHtContainer.getTableRowSize() >= hybridHtContainer.getMemoryThreshold() / 2) { + throw new RuntimeException("Hybrid Grace Hash Join: Hash table cannot be reloaded since it" + + " will be greater than memory limit. Recursive spilling is currently not supported"); + } + + KeyValueHelper writeHelper = hybridHtContainer.getWriteHelper(); + while (kvContainer.hasNext()) { + ObjectPair pair = kvContainer.next(); + Writable key = pair.getFirst(); + Writable val = pair.getSecond(); + writeHelper.setKeyValue(key, val); + restoredHashMap.put(writeHelper, -1); + } + + hybridHtContainer.setTotalInMemRowCount(hybridHtContainer.getTotalInMemRowCount() + + restoredHashMap.getNumValues() + kvContainer.size()); + kvContainer.clear(); + + // Since there's only one hashmap to deal with, it's OK to create a MapJoinBytesTableContainer + currentSmallTable = new MapJoinBytesTableContainer(restoredHashMap); + currentSmallTable.setInternalValueOi(hybridHtContainer.getInternalValueOi()); + currentSmallTable.setSortableSortOrders(hybridHtContainer.getSortableSortOrders()); + } + + /** * Implements the getName function for the Node Interface. * * @return the name of the operator diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 8d3e3cc..2312ccb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -124,6 +124,7 @@ private int resizeThreshold; private int keysAssigned; + private int numValues; /** * Largest number of probe steps ever taken to find location for a key. When getting, we can @@ -154,7 +155,7 @@ private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024; public BytesBytesMultiHashMap(int initialCapacity, - float loadFactor, int wbSize, long memUsage, int defaultCapacity) { + float loadFactor, int wbSize, long memUsage) { if (loadFactor < 0 || loadFactor > 1) { throw new AssertionError("Load factor must be between (0, 1]."); } @@ -180,7 +181,7 @@ public BytesBytesMultiHashMap(int initialCapacity, @VisibleForTesting BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) { - this(initialCapacity, loadFactor, wbSize, -1, 100000); + this(initialCapacity, loadFactor, wbSize, -1); } /** The source of keys and values to put into hashtable; avoids byte copying. */ @@ -204,7 +205,7 @@ public BytesBytesMultiHashMap(int initialCapacity, * @param kv Keyvalue writer. Each method will be called at most once. */ private static final byte[] FOUR_ZEROES = new byte[] { 0, 0, 0, 0 }; - public void put(KvSource kv) throws SerDeException { + public void put(KvSource kv, int keyHashCode) throws SerDeException { if (resizeThreshold <= keysAssigned) { expandAndRehash(); } @@ -218,7 +219,7 @@ public void put(KvSource kv) throws SerDeException { kv.writeKey(writeBuffers); int keyLength = (int)(writeBuffers.getWritePoint() - keyOffset); - int hashCode = writeBuffers.hashCode(keyOffset, keyLength); + int hashCode = (keyHashCode == -1) ? writeBuffers.hashCode(keyOffset, keyLength) : keyHashCode; int slot = findKeySlotToWrite(keyOffset, keyLength, hashCode); // LOG.info("Write hash code is " + Integer.toBinaryString(hashCode) + " - " + slot); @@ -243,6 +244,7 @@ public void put(KvSource kv) throws SerDeException { } refs[slot] = Ref.setListFlag(ref); } + ++numValues; } /** @@ -301,10 +303,33 @@ public void populateValue(WriteBuffers.ByteSegmentRef valueRef) { writeBuffers.populateValue(valueRef); } + /** + * Number of keys in the hashmap + * @return number of keys + */ public int size() { return keysAssigned; } + /** + * Number of values in the hashmap + * This is equal to or bigger than number of keys, since some values may share the same key + * @return number of values + */ + public int getNumValues() { + return numValues; + } + + /** + * Number of bytes used by the hashmap + * There are two main components that take most memory: writeBuffers and refs + * Others include instance fields: 100 + * @return number of bytes + */ + public long memorySize() { + return writeBuffers.size() + refs.length * 8 + 100; + } + public void seal() { writeBuffers.seal(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java index 3adaab7..3852380 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -153,8 +154,9 @@ public GetAdaptor(MapJoinKey key) { } @Override - public void setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, - VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException { + public JoinUtil.JoinResult setFromVector(VectorHashKeyWrapper kw, + VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) + throws HiveException { if (currentKey == null) { currentKey = new Object[keyOutputWriters.length]; vectorKeyOIs = new ArrayList(); @@ -168,10 +170,16 @@ public void setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyO key = MapJoinKey.readFromVector(output, key, currentKey, vectorKeyOIs, !isFirstKey); isFirstKey = false; this.currentValue = mHash.get(key); + if (this.currentValue == null) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } } @Override - public void setFromRow(Object row, List fields, + public JoinUtil.JoinResult setFromRow(Object row, List fields, List ois) throws HiveException { if (currentKey == null) { currentKey = new Object[fields.size()]; @@ -182,15 +190,27 @@ public void setFromRow(Object row, List fields, key = MapJoinKey.readFromRow(output, key, currentKey, ois, !isFirstKey); isFirstKey = false; this.currentValue = mHash.get(key); + if (this.currentValue == null) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } } @Override - public void setFromOther(ReusableGetAdaptor other) { + public JoinUtil.JoinResult setFromOther(ReusableGetAdaptor other) { assert other instanceof GetAdaptor; GetAdaptor other2 = (GetAdaptor)other; this.key = other2.key; this.isFirstKey = other2.isFirstKey; this.currentValue = mHash.get(key); + if (this.currentValue == null) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } } @Override @@ -223,4 +243,9 @@ public MapJoinKey getAnyKey() { public void dumpMetrics() { // Nothing to do. } + + @Override + public boolean hasSpill() { + return false; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java new file mode 100644 index 0000000..c8e6584 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -0,0 +1,742 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.persistence; + + +import com.esotericsoftware.kryo.Kryo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; +import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.WriteBuffers; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; + +/** + * Hash table container that can have many partitions -- each partition has its own hashmap, + * as well as row container for small table and big table. + * + * The purpose is to distribute rows into multiple partitions so that when the entire small table + * cannot fit into memory, we are still able to perform hash join, by processing them recursively. + * + * Partitions that can fit in memory will be processed first, and then every spilled partition will + * be restored and processed one by one. + */ +public class HybridHashTableContainer implements MapJoinTableContainer { + private static final Log LOG = LogFactory.getLog(HybridHashTableContainer.class); + + private HashPartition[] hashPartitions; // an array of partitions holding the triplets + private int totalInMemRowCount = 0; // total number of small table rows in memory + private long memoryThreshold; // the max memory limit allocated + private long tableRowSize; // row size of the small table + private boolean isSpilled; // whether there's any spilled partition + private int toSpillPartitionId; // the partition into which to spill the big table row; + // This may change after every setMapJoinKey call + private int numPartitionsSpilled; // number of spilled partitions + private boolean lastPartitionInMem; // only one (last one) partition is left in memory + private int memoryCheckFrequency; // how often (# of rows apart) to check if memory is full + + /** The OI used to deserialize values. We never deserialize keys. */ + private LazyBinaryStructObjectInspector internalValueOi; + private boolean[] sortableSortOrders; + private MapJoinBytesTableContainer.KeyValueHelper writeHelper; + + private final List EMPTY_LIST = new ArrayList(0); + + /** + * This class encapsulates the triplet together since they are closely related to each other + * The triplet: hashmap (either in memory or on disk), small table container, big table container + */ + public static class HashPartition { + BytesBytesMultiHashMap hashMap; // In memory hashMap + KeyValueContainer sidefileKVContainer; // Stores small table key/value pairs + ObjectContainer matchfileObjContainer; // Stores big table rows + Path hashMapLocalPath; // Local file system path for spilled hashMap + boolean hashMapOnDisk; // Status of hashMap. true: on disk, false: in memory + boolean hashMapSpilledOnCreation; // When there's no enough memory, cannot create hashMap + int threshold; // Used to create an empty BytesBytesMultiHashMap + float loadFactor; // Same as above + int wbSize; // Same as above + + /* It may happen that there's not enough memory to instantiate a hashmap for the partition. + * In that case, we don't create the hashmap, but pretend the hashmap is directly "spilled". + */ + public HashPartition(int threshold, float loadFactor, int wbSize, long memUsage, + boolean createHashMap) { + if (createHashMap) { + hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize, memUsage); + } else { + hashMapSpilledOnCreation = true; + hashMapOnDisk = true; + } + this.threshold = threshold; + this.loadFactor = loadFactor; + this.wbSize = wbSize; + } + + /* Get the in memory hashmap */ + public BytesBytesMultiHashMap getHashMapFromMemory() { + return hashMap; + } + + /* Restore the hashmap from disk by deserializing it. + * Currently Kryo is used for this purpose. + */ + public BytesBytesMultiHashMap getHashMapFromDisk() + throws IOException, ClassNotFoundException { + if (hashMapSpilledOnCreation) { + return new BytesBytesMultiHashMap(threshold, loadFactor, wbSize, -1); + } else { + InputStream inputStream = Files.newInputStream(hashMapLocalPath); + com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream); + Kryo kryo = Utilities.runtimeSerializationKryo.get(); + BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class); + input.close(); + inputStream.close(); + Files.delete(hashMapLocalPath); + return restoredHashMap; + } + } + + /* Get the small table key/value container */ + public KeyValueContainer getSidefileKVContainer() { + if (sidefileKVContainer == null) { + sidefileKVContainer = new KeyValueContainer(); + } + return sidefileKVContainer; + } + + /* Get the big table row container */ + public ObjectContainer getMatchfileObjContainer() { + if (matchfileObjContainer == null) { + matchfileObjContainer = new ObjectContainer(); + } + return matchfileObjContainer; + } + + /* Check if hashmap is on disk or in memory */ + public boolean isHashMapOnDisk() { + return hashMapOnDisk; + } + } + + public HybridHashTableContainer(Configuration hconf, long keyCount, long memUsage, long tableSize) + throws SerDeException { + this(HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), + HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), + HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ), + tableSize, keyCount, memUsage); + } + + private HybridHashTableContainer(int threshold, float loadFactor, int wbSize, + long noConditionalTaskThreshold, int memCheckFreq, long tableSize, + long keyCount, long memUsage) throws SerDeException { + memoryThreshold = noConditionalTaskThreshold; + tableRowSize = tableSize / keyCount; + memoryCheckFrequency = memCheckFreq; + + int numPartitions = calcNumPartitions(tableSize, wbSize); // estimate # of partitions to create + hashPartitions = new HashPartition[numPartitions]; + int numPartitionsSpilledOnCreation = 0; + long memoryAllocated = 0; + for (int i = 0; i < numPartitions; i++) { + if (i == 0) { // We unconditionally create a hashmap for the first hash partition + hashPartitions[i] = new HashPartition(threshold, loadFactor, wbSize, memUsage, true); + } else { + hashPartitions[i] = new HashPartition(threshold, loadFactor, wbSize, memUsage, + memoryAllocated + wbSize < memoryThreshold); + } + if (isHashMapSpilledOnCreation(i)) { + numPartitionsSpilledOnCreation++; + numPartitionsSpilled++; + } else { + memoryAllocated += hashPartitions[i].hashMap.memorySize(); + } + } + assert numPartitionsSpilledOnCreation != numPartitions : "All partitions are directly spilled!" + + " It is not supported now."; + LOG.info("Number of partitions created: " + numPartitions); + LOG.info("Number of partitions spilled directly to disk on creation: " + + numPartitionsSpilledOnCreation); + } + + + public MapJoinBytesTableContainer.KeyValueHelper getWriteHelper() { + return writeHelper; + } + + public HashPartition[] getHashPartitions() { + return hashPartitions; + } + + public long getMemoryThreshold() { + return memoryThreshold; + } + + public LazyBinaryStructObjectInspector getInternalValueOi() { + return internalValueOi; + } + + public boolean[] getSortableSortOrders() { + return sortableSortOrders; + } + + /* For a given row, put it into proper partition based on its hash value. + * When memory threshold is reached, the biggest hash table in memory will be spilled to disk. + * If the hash table of a specific partition is already on disk, all later rows will be put into + * a row container for later use. + */ + @SuppressWarnings("deprecation") + @Override + public MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentKey, + MapJoinObjectSerDeContext valueContext, Writable currentValue) + throws SerDeException, HiveException, IOException { + SerDe keySerde = keyContext.getSerDe(), valSerde = valueContext.getSerDe(); + + if (writeHelper == null) { + LOG.info("Initializing container with " + + keySerde.getClass().getName() + " and " + valSerde.getClass().getName()); + + // We assume this hashtable is loaded only when tez is enabled + LazyBinaryStructObjectInspector valSoi = + (LazyBinaryStructObjectInspector) valSerde.getObjectInspector(); + writeHelper = new MapJoinBytesTableContainer.LazyBinaryKvWriter(keySerde, valSoi, + valueContext.hasFilterTag()); + if (internalValueOi == null) { + internalValueOi = valSoi; + } + if (sortableSortOrders == null) { + sortableSortOrders = ((BinarySortableSerDe) keySerde).getSortOrders(); + } + } + writeHelper.setKeyValue(currentKey, currentValue); + + // Next, put row into corresponding hash partition + int keyHash = writeHelper.getHashFromKey(); + int partitionId = keyHash & (hashPartitions.length - 1); + HashPartition hashPartition = hashPartitions[partitionId]; + + if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { + KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer(); + kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue); + } else { + hashPartition.hashMap.put(writeHelper, keyHash); // Pass along hashcode to avoid recalculation + totalInMemRowCount++; + + if ((totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0 && // check periodically + !lastPartitionInMem) { // If this is the only partition in memory, proceed without check + if (isMemoryFull()) { + if ((numPartitionsSpilled == hashPartitions.length - 1) ) { + LOG.warn("This LAST partition in memory won't be spilled!"); + lastPartitionInMem = true; + } else { + int biggest = biggestPartition(); + numPartitionsSpilled++; + spillPartition(biggest); + this.setSpill(true); + } + } + } + } + + return null; // there's no key to return + } + + /** + * Check if the hash table of a specified partition is on disk (or "spilled" on creation) + * @param partitionId partition number + * @return true if on disk, false if in memory + */ + public boolean isOnDisk(int partitionId) { + return hashPartitions[partitionId].hashMapOnDisk; + } + + /** + * Check if the hash table of a specified partition has been "spilled" to disk when it was created. + * In fact, in other words, check if a hashmap does exist or not. + * @param partitionId hashMap ID + * @return true if it was not created at all, false if there is a hash table existing there + */ + public boolean isHashMapSpilledOnCreation(int partitionId) { + return hashPartitions[partitionId].hashMapSpilledOnCreation; + } + + /** + * Check if the memory threshold is reached + * @return true if memory is full, false if not + */ + private boolean isMemoryFull() { + long size = 0; + for (int i = 0; i < hashPartitions.length; i++) { + if (!isOnDisk(i)) { + size += hashPartitions[i].hashMap.memorySize(); + } + } + return size >= memoryThreshold; + } + + /** + * Find the partition with biggest hashtable in memory at this moment + * @return the biggest partition number + */ + private int biggestPartition() { + int res = 0; + int maxSize = 0; + + // If a partition has been spilled to disk, its size will be 0, i.e. it won't be picked + for (int i = 0; i < hashPartitions.length; i++) { + int size; + if (isOnDisk(i)) { + continue; + } else { + size = hashPartitions[i].hashMap.getNumValues(); + } + if (size > maxSize) { + maxSize = size; + res = i; + } + } + return res; + } + + /** + * Move the hashtable of a specified partition from memory into local file system + * @param partitionId the hashtable to be moved + */ + private void spillPartition(int partitionId) throws IOException { + HashPartition partition = hashPartitions[partitionId]; + int inMemRowCount = partition.hashMap.getNumValues(); + long inMemSize = partition.hashMap.memorySize(); + + Path path = Files.createTempFile("partition-" + partitionId + "-", null); + OutputStream outputStream = Files.newOutputStream(path); + + com.esotericsoftware.kryo.io.Output output = new com.esotericsoftware.kryo.io.Output(outputStream); + Kryo kryo = Utilities.runtimeSerializationKryo.get(); + kryo.writeObject(output, partition.hashMap); // use Kryo to serialize hashmap + output.close(); + outputStream.close(); + + partition.hashMapLocalPath = path; + partition.hashMapOnDisk = true; + + long size = 0; + for (int i = 0; i < hashPartitions.length; i++) { + if (!isOnDisk(i)) { + size += hashPartitions[i].hashMap.memorySize(); + } + } + LOG.info("Spilling hash partition " + partitionId + " (Rows: " + inMemRowCount + + ", Mem size: " + inMemSize + "): " + path); + LOG.info("Memory usage before spilling: " + size); + LOG.info("Memory usage after spilling: " + (size - inMemSize)); + + totalInMemRowCount -= inMemRowCount; + partition.hashMap.clear(); + } + + /** + * Calculate how many partitions are needed. This is an estimation. + * @param dataSize total data size for the table + * @param wbSize write buffer size + * @return number of partitions needed + */ + private int calcNumPartitions(long dataSize, int wbSize) { + if (memoryThreshold < wbSize) { + throw new RuntimeException("Available memory is less than hashtable writebuffer size!" + + " Try increasing hive.auto.convert.join.noconditionaltask.size."); + } + + int lowerLimit = 2; + int numPartitions = (int) Math.ceil(dataSize / wbSize); + + LOG.info("Total available memory: " + memoryThreshold); + LOG.info("Estimated small table size: " + dataSize); + LOG.info("Write buffer size: " + wbSize); + LOG.info("Initial number of partitions: " + numPartitions); + + if (numPartitions < lowerLimit) { + return lowerLimit; + } else if (dataSize > memoryThreshold) { + numPartitions = (int) (memoryThreshold / wbSize); + } + // Make sure numPartitions is power of 2, to make N & (M - 1) easy when calculating partition No. + numPartitions = (Long.bitCount(numPartitions) == 1) ? numPartitions + : Integer.highestOneBit(numPartitions) << 1; + while (dataSize / numPartitions > memoryThreshold) { + numPartitions *= 2; + } + + LOG.info("Number of hash partitions to be created: " + numPartitions); + return numPartitions; + } + + /* Get total number of rows from all in memory partitions */ + public int getTotalInMemRowCount() { + return totalInMemRowCount; + } + + /* Set total number of rows from all in memory partitions */ + public void setTotalInMemRowCount(int totalInMemRowCount) { + this.totalInMemRowCount = totalInMemRowCount; + } + + /* Get row size of small table */ + public long getTableRowSize() { + return tableRowSize; + } + + @Override + public boolean hasSpill() { + return isSpilled; + } + + public void setSpill(boolean isSpilled) { + this.isSpilled = isSpilled; + } + + /** + * Gets the partition Id into which to spill the big table row + * @return partition Id + */ + public int getToSpillPartitionId() { + return toSpillPartitionId; + } + + /* Clean up in memory hashtables */ + @Override + public void clear() { + for (HashPartition hp : hashPartitions) { + if (hp.hashMap != null) { + hp.hashMap.clear(); + } + } + } + + @Override + public MapJoinKey getAnyKey() { + return null; // This table has no keys. + } + + @Override + public ReusableGetAdaptor createGetter(MapJoinKey keyTypeFromLoader) { + if (keyTypeFromLoader != null) { + throw new AssertionError("No key expected from loader but got " + keyTypeFromLoader); + } + return new GetAdaptor(); + } + + @Override + public void seal() { + for (HashPartition hp : hashPartitions) { + // Only seal those partitions that haven't been spilled and cleared, + // because once a hashMap is cleared, it will become unusable + if (hp.hashMap != null && hp.hashMap.size() != 0) { + hp.hashMap.seal(); + } + } + } + + /** Implementation of ReusableGetAdaptor that has Output for key serialization; row + * container is also created once and reused for every row. */ + private class GetAdaptor implements ReusableGetAdaptor { + + private Object[] currentKey; + private boolean[] nulls; + private List vectorKeyOIs; + + private final ReusableRowContainer currentValue; + private final Output output; + + public GetAdaptor() { + currentValue = new ReusableRowContainer(); + output = new Output(); + } + + @Override + public JoinUtil.JoinResult setFromVector(VectorHashKeyWrapper kw, + VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) + throws HiveException { + if (nulls == null) { + nulls = new boolean[keyOutputWriters.length]; + currentKey = new Object[keyOutputWriters.length]; + vectorKeyOIs = new ArrayList(); + for (int i = 0; i < keyOutputWriters.length; i++) { + vectorKeyOIs.add(keyOutputWriters[i].getObjectInspector()); + } + } else { + assert nulls.length == keyOutputWriters.length; + } + for (int i = 0; i < keyOutputWriters.length; i++) { + currentKey[i] = keyWrapperBatch.getWritableKeyValue(kw, i, keyOutputWriters[i]); + nulls[i] = currentKey[i] == null; + } + return currentValue.setFromOutput( + MapJoinKey.serializeRow(output, currentKey, vectorKeyOIs, sortableSortOrders)); + } + + @Override + public JoinUtil.JoinResult setFromRow(Object row, List fields, + List ois) throws HiveException { + if (nulls == null) { + nulls = new boolean[fields.size()]; + currentKey = new Object[fields.size()]; + } + for (int keyIndex = 0; keyIndex < fields.size(); ++keyIndex) { + currentKey[keyIndex] = fields.get(keyIndex).evaluate(row); + nulls[keyIndex] = currentKey[keyIndex] == null; + } + return currentValue.setFromOutput( + MapJoinKey.serializeRow(output, currentKey, ois, sortableSortOrders)); + } + + @Override + public JoinUtil.JoinResult setFromOther(ReusableGetAdaptor other) throws HiveException { + assert other instanceof GetAdaptor; + GetAdaptor other2 = (GetAdaptor)other; + nulls = other2.nulls; + currentKey = other2.currentKey; + return currentValue.setFromOutput(other2.output); + } + + @Override + public boolean hasAnyNulls(int fieldCount, boolean[] nullsafes) { + if (nulls == null || nulls.length == 0) return false; + for (int i = 0; i < nulls.length; i++) { + if (nulls[i] && (nullsafes == null || !nullsafes[i])) { + return true; + } + } + return false; + } + + @Override + public MapJoinRowContainer getCurrentRows() { + return currentValue.isEmpty() ? null : currentValue; + } + + @Override + public Object[] getCurrentKey() { + return currentKey; + } + } + + /** Row container that gets and deserializes the rows on demand from bytes provided. */ + private class ReusableRowContainer + implements MapJoinRowContainer, AbstractRowContainer.RowIterator> { + private byte aliasFilter; + private List refs; + private int currentRow; + /** + * Sometimes, when container is empty in multi-table mapjoin, we need to add a dummy row. + * This container does not normally support adding rows; this is for the dummy row. + */ + private List dummyRow = null; + + private final ByteArrayRef uselessIndirection; // LBStruct needs ByteArrayRef + private final LazyBinaryStruct valueStruct; + + private int partitionId; // Current hashMap in use + + public ReusableRowContainer() { + if (internalValueOi != null) { + valueStruct = (LazyBinaryStruct) + LazyBinaryFactory.createLazyBinaryObject(internalValueOi); + } else { + valueStruct = null; // No rows? + } + uselessIndirection = new ByteArrayRef(); + clearRows(); + } + + /* Determine if there is a match between big table row and the corresponding hashtable + * Three states can be returned: + * MATCH: a match is found + * NOMATCH: no match is found from the specified partition + * SPILL: the specified partition has been spilled to disk and is not available; + * the evaluation for this big table row will be postponed. + */ + public JoinUtil.JoinResult setFromOutput(Output output) throws HiveException { + if (refs == null) { + refs = new ArrayList(0); + } + + int keyHash = WriteBuffers.murmurHash(output.getData(), 0, output.getLength()); + partitionId = keyHash & (hashPartitions.length - 1); + + // If the target hash table is on disk, spill this row to disk as well to be processed later + if (isOnDisk(partitionId)) { + toSpillPartitionId = partitionId; + refs.clear(); + return JoinUtil.JoinResult.SPILL; + } + else { + byte aliasFilter = hashPartitions[partitionId].hashMap.getValueRefs( + output.getData(), output.getLength(), refs); + this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter; + this.dummyRow = null; + if (refs.isEmpty()) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } + } + } + + public boolean isEmpty() { + return refs.isEmpty() && (dummyRow == null); + } + + // Implementation of row container + @Override + public RowIterator> rowIter() throws HiveException { + currentRow = -1; + return this; + } + + @Override + public int rowCount() throws HiveException { + return dummyRow != null ? 1 : refs.size(); + } + + @Override + public void clearRows() { + // Doesn't clear underlying hashtable + if (refs != null) { + refs.clear(); + } + dummyRow = null; + currentRow = -1; + aliasFilter = (byte) 0xff; + } + + @Override + public byte getAliasFilter() throws HiveException { + return aliasFilter; + } + + @Override + public MapJoinRowContainer copy() throws HiveException { + return this; // Independent of hashtable and can be modified, no need to copy. + } + + // Implementation of row iterator + @Override + public List first() throws HiveException { + currentRow = 0; + return next(); + } + + + @Override + public List next() throws HiveException { + if (dummyRow != null) { + List result = dummyRow; + dummyRow = null; + return result; + } + if (currentRow < 0 || refs.size() < currentRow) throw new HiveException("No rows"); + if (refs.size() == currentRow) return null; + WriteBuffers.ByteSegmentRef ref = refs.get(currentRow++); + if (ref.getLength() == 0) { + return EMPTY_LIST; // shortcut, 0 length means no fields + } + if (ref.getBytes() == null) { + // partitionId is derived from previously calculated value in setFromOutput() + hashPartitions[partitionId].hashMap.populateValue(ref); + } + uselessIndirection.setData(ref.getBytes()); + valueStruct.init(uselessIndirection, (int)ref.getOffset(), ref.getLength()); + return valueStruct.getFieldsAsList(); + } + + @Override + public void addRow(List t) { + if (dummyRow != null || !refs.isEmpty()) { + throw new RuntimeException("Cannot add rows when not empty"); + } + dummyRow = t; + } + + // Various unsupported methods. + @Override + public void addRow(Object[] value) { + throw new RuntimeException(this.getClass().getCanonicalName() + " cannot add arrays"); + } + @Override + public void write(MapJoinObjectSerDeContext valueContext, ObjectOutputStream out) { + throw new RuntimeException(this.getClass().getCanonicalName() + " cannot be serialized"); + } + } + + @Override + public void dumpMetrics() { + for (int i = 0; i < hashPartitions.length; i++) { + HashPartition hp = hashPartitions[i]; + if (hp.hashMap != null) { + hp.hashMap.debugDumpMetrics(); + } + } + } + + public void dumpStats() { + int numPartitionsInMem = 0; + int numPartitionsOnDisk = 0; + + for (HashPartition hp : hashPartitions) { + if (hp.isHashMapOnDisk()) { + numPartitionsOnDisk++; + } else { + numPartitionsInMem++; + } + } + + LOG.info("In memory partitions have been processed successfully: " + + numPartitionsInMem + " partitions in memory have been processed; " + + numPartitionsOnDisk + " partitions have been spilled to disk and will be processed next."); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java new file mode 100644 index 0000000..d3ec29a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.persistence; + +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.io.BytesWritable; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +/** + * An eager key/value container that puts every row directly to output stream. + * Kryo is used for the serialization/deserialization. + * When reading, we load IN_MEMORY_NUM_ROWS rows from input stream to memory batch by batch. + */ +@SuppressWarnings("unchecked") +public class KeyValueContainer { + private static final Log LOG = LogFactory.getLog(KeyValueContainer.class); + + @VisibleForTesting + static final int IN_MEMORY_NUM_ROWS = 1024; + + private ObjectPair[] readBuffer; + private boolean readBufferUsed = false; // indicates if read buffer has data + private int rowsInReadBuffer = 0; // number of rows in the temporary read buffer + private int readCursor = 0; // cursor during reading + private int rowsOnDisk = 0; // total number of pairs in output + + private File parentFile; + private File tmpFile; + + private Input input; + private Output output; + + public KeyValueContainer() { + readBuffer = new ObjectPair[IN_MEMORY_NUM_ROWS]; + for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) { + readBuffer[i] = new ObjectPair(); + } + try { + setupOutput(); + } catch (IOException e) { + throw new RuntimeException("Failed to create temporary output file on disk", e); + } + } + + private void setupOutput() throws IOException { + if (parentFile == null) { + parentFile = File.createTempFile("key-value-container", ""); + if (parentFile.delete() && parentFile.mkdir()) { + parentFile.deleteOnExit(); + } + } + + if (tmpFile == null || input != null) { + tmpFile = File.createTempFile("KeyValueContainer", ".tmp", parentFile); + LOG.info("KeyValueContainer created temp file " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + } + + FileOutputStream fos = null; + try { + fos = new FileOutputStream(tmpFile); + output = new Output(fos); + } finally { + if (output == null && fos != null) { + fos.close(); + } + } + } + + private BytesWritable readValue(Input input) { + return new BytesWritable(input.readBytes(input.readInt())); + } + + private void writeValue(Output output, BytesWritable bytesWritable) { + int size = bytesWritable.getLength(); + output.writeInt(size); + output.writeBytes(bytesWritable.getBytes(), 0, size); + } + + private HiveKey readHiveKey(Input input) { + HiveKey hiveKey = new HiveKey(input.readBytes(input.readInt()), input.readInt()); + hiveKey.setDistKeyLength(input.readInt()); + return hiveKey; + } + + private void writeHiveKey(Output output, HiveKey hiveKey) { + int size = hiveKey.getLength(); + output.writeInt(size); + output.writeBytes(hiveKey.getBytes(), 0, size); + output.writeInt(0); // Since hashCode is not used, just put an arbitrary number + output.writeInt(hiveKey.getDistKeyLength()); + } + + public void add(HiveKey key, BytesWritable value) { + writeHiveKey(output, key); + writeValue(output, value); + rowsOnDisk++; + } + + public void clear() { + readCursor = rowsInReadBuffer = rowsOnDisk = 0; + readBufferUsed = false; + + if (parentFile != null) { + if (input != null) { + try { + input.close(); + } catch (Throwable ignored) { + } + input = null; + } + if (output != null) { + try { + output.close(); + } catch (Throwable ignored) { + } + output = null; + } + try { + FileUtil.fullyDelete(parentFile); + } catch (Throwable ignored) { + } + parentFile = null; + tmpFile = null; + } + } + + public boolean hasNext() { + return readBufferUsed || rowsOnDisk > 0; + } + + public ObjectPair next() { + Preconditions.checkState(hasNext()); + if (!readBufferUsed) { + try { + if (input == null && output != null) { + // Close output stream if open + output.close(); + output = null; + + FileInputStream fis = null; + try { + fis = new FileInputStream(tmpFile); + input = new Input(fis); + } finally { + if (input == null && fis != null) { + fis.close(); + } + } + } + if (input != null) { + // Load next batch from disk + if (rowsOnDisk >= IN_MEMORY_NUM_ROWS) { + rowsInReadBuffer = IN_MEMORY_NUM_ROWS; + } else { + rowsInReadBuffer = rowsOnDisk; + } + + for (int i = 0; i < rowsInReadBuffer; i++) { + ObjectPair pair = readBuffer[i]; + pair.setFirst(readHiveKey(input)); + pair.setSecond(readValue(input)); + } + + if (input.eof()) { + input.close(); + input = null; + } + + readBufferUsed = true; + readCursor = 0; + rowsOnDisk -= rowsInReadBuffer; + } + } catch (Exception e) { + clear(); // Clean up the cache + throw new RuntimeException("Failed to load key/value pairs from disk", e); + } + } + + ObjectPair row = readBuffer[readCursor]; + if (++readCursor >= rowsInReadBuffer) { + readBufferUsed = false; + rowsInReadBuffer = 0; + readCursor = 0; + } + return row; + } + + public int size() { + return rowsInReadBuffer + rowsOnDisk; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index 52781af..b323e8e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -91,7 +92,11 @@ private MapJoinBytesTableContainer(float keyCountAdj, int threshold, float loadF throws SerDeException { int newThreshold = HashMapWrapper.calculateTableSize( keyCountAdj, threshold, loadFactor, keyCount); - hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage, threshold); + hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage); + } + + public MapJoinBytesTableContainer(BytesBytesMultiHashMap hashMap) { + this.hashMap = hashMap; } private LazyBinaryStructObjectInspector createInternalOi( @@ -114,8 +119,18 @@ private LazyBinaryStructObjectInspector createInternalOi( .getLazyBinaryStructObjectInspector(colNames, colOis); } - private static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource { + public void setInternalValueOi(LazyBinaryStructObjectInspector internalValueOi) { + this.internalValueOi = internalValueOi; + } + + public void setSortableSortOrders(boolean[] sortableSortOrders) { + this.sortableSortOrders = sortableSortOrders; + } + + public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource { void setKeyValue(Writable key, Writable val) throws SerDeException; + /** Get hash value from the key. */ + int getHashFromKey() throws SerDeException; } private static class KeyValueWriter implements KeyValueHelper { @@ -176,9 +191,14 @@ public byte updateStateByte(Byte previousValue) { aliasFilter &= ((ShortWritable)valObjs[valObjs.length - 1]).get(); return aliasFilter; } + + @Override + public int getHashFromKey() throws SerDeException { + throw new UnsupportedOperationException("Not supported for MapJoinBytesTableContainer"); + } } - private static class LazyBinaryKvWriter implements KeyValueHelper { + static class LazyBinaryKvWriter implements KeyValueHelper { private final LazyBinaryStruct.SingleFieldGetter filterGetter; private Writable key, value; private final SerDe keySerDe; @@ -210,6 +230,16 @@ public void writeKey(RandomAccessOutput dest) throws SerDeException { dest.write(b.getBytes(), 0, b.getLength() - (hasTag ? 1 : 0)); } + @Override + public int getHashFromKey() throws SerDeException { + if (!(key instanceof BinaryComparable)) { + throw new SerDeException("Unexpected type " + key.getClass().getCanonicalName()); + } + sanityCheckKeyForTag(); + BinaryComparable b = (BinaryComparable)key; + return WriteBuffers.murmurHash(b.getBytes(), 0, b.getLength() - (hasTag ? 1 : 0)); + } + /** * If we received data with tags from ReduceSinkOperators, no keys will match. This should * not happen, but is important enough that we want to find out and work around it if some @@ -285,7 +315,7 @@ public MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentK } } writeHelper.setKeyValue(currentKey, currentValue); - hashMap.put(writeHelper); + hashMap.put(writeHelper, -1); return null; // there's no key to return } @@ -329,9 +359,9 @@ public GetAdaptor() { } @Override - public void setFromVector(VectorHashKeyWrapper kw, - VectorExpressionWriter[] keyOutputWriters, - VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException { + public JoinUtil.JoinResult setFromVector(VectorHashKeyWrapper kw, + VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) + throws HiveException { if (nulls == null) { nulls = new boolean[keyOutputWriters.length]; currentKey = new Object[keyOutputWriters.length]; @@ -346,12 +376,12 @@ public void setFromVector(VectorHashKeyWrapper kw, currentKey[i] = keyWrapperBatch.getWritableKeyValue(kw, i, keyOutputWriters[i]); nulls[i] = currentKey[i] == null; } - currentValue.setFromOutput( + return currentValue.setFromOutput( MapJoinKey.serializeRow(output, currentKey, vectorKeyOIs, sortableSortOrders)); } @Override - public void setFromRow(Object row, List fields, + public JoinUtil.JoinResult setFromRow(Object row, List fields, List ois) throws HiveException { if (nulls == null) { nulls = new boolean[fields.size()]; @@ -361,17 +391,17 @@ public void setFromRow(Object row, List fields, currentKey[keyIndex] = fields.get(keyIndex).evaluate(row); nulls[keyIndex] = currentKey[keyIndex] == null; } - currentValue.setFromOutput( + return currentValue.setFromOutput( MapJoinKey.serializeRow(output, currentKey, ois, sortableSortOrders)); } @Override - public void setFromOther(ReusableGetAdaptor other) { + public JoinUtil.JoinResult setFromOther(ReusableGetAdaptor other) { assert other instanceof GetAdaptor; GetAdaptor other2 = (GetAdaptor)other; nulls = other2.nulls; currentKey = other2.currentKey; - currentValue.setFromOutput(other2.output); + return currentValue.setFromOutput(other2.output); } @Override @@ -422,13 +452,19 @@ public ReusableRowContainer() { clearRows(); } - public void setFromOutput(Output output) { + public JoinUtil.JoinResult setFromOutput(Output output) { if (refs == null) { refs = new ArrayList(); } byte aliasFilter = hashMap.getValueRefs(output.getData(), output.getLength(), refs); this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter; this.dummyRow = null; + if (refs.isEmpty()) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } } public boolean isEmpty() { @@ -533,4 +569,9 @@ public static boolean isSupportedKey(ObjectInspector keyOi) { public void dumpMetrics() { hashMap.debugDumpMetrics(); } + + @Override + public boolean hasSpill() { + return false; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java index ff6e5d4..9d8cbcb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hive.ql.exec.persistence; +import java.io.IOException; import java.util.List; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -40,21 +42,21 @@ * Changes current rows to which adaptor is referring to the rows corresponding to * the key represented by a VHKW object, and writers and batch used to interpret it. */ - void setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, + JoinUtil.JoinResult setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException; /** * Changes current rows to which adaptor is referring to the rows corresponding to * the key represented by a row object, and fields and ois used to interpret it. */ - void setFromRow(Object row, List fields, List ois) + JoinUtil.JoinResult setFromRow(Object row, List fields, List ois) throws HiveException; /** * Changes current rows to which adaptor is referring to the rows corresponding to * the key that another adaptor has already deserialized via setFromVector/setFromRow. */ - void setFromOther(ReusableGetAdaptor other); + JoinUtil.JoinResult setFromOther(ReusableGetAdaptor other) throws HiveException; /** * Checks whether the current key has any nulls. @@ -77,7 +79,7 @@ void setFromRow(Object row, List fields, List { + private static final Log LOG = LogFactory.getLog(ObjectContainer.class); + + @VisibleForTesting + static final int IN_MEMORY_NUM_ROWS = 1024; + + private ROW[] readBuffer; + private boolean readBufferUsed = false; // indicates if read buffer has data + private int rowsInReadBuffer = 0; // number of rows in the temporary read buffer + private int readCursor = 0; // cursor during reading + private int rowsOnDisk = 0; // total number of pairs in output + + private File parentFile; + private File tmpFile; + + private Input input; + private Output output; + + private Kryo kryo; + + public ObjectContainer() { + readBuffer = (ROW[]) new Object[IN_MEMORY_NUM_ROWS]; + for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) { + readBuffer[i] = (ROW) new Object(); + } + kryo = Utilities.runtimeSerializationKryo.get(); + try { + setupOutput(); + } catch (IOException e) { + throw new RuntimeException("Failed to create temporary output file on disk", e); + } + } + + private void setupOutput() throws IOException { + if (parentFile == null) { + parentFile = File.createTempFile("object-container", ""); + if (parentFile.delete() && parentFile.mkdir()) { + parentFile.deleteOnExit(); + } + } + + if (tmpFile == null || input != null) { + tmpFile = File.createTempFile("ObjectContainer", ".tmp", parentFile); + LOG.info("ObjectContainer created temp file " + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + } + + FileOutputStream fos = null; + try { + fos = new FileOutputStream(tmpFile); + output = new Output(fos); + } finally { + if (output == null && fos != null) { + fos.close(); + } + } + } + + public void add(ROW row) { + kryo.writeClassAndObject(output, row); + rowsOnDisk++; + } + + public void clear() { + readCursor = rowsInReadBuffer = rowsOnDisk = 0; + readBufferUsed = false; + + if (parentFile != null) { + if (input != null) { + try { + input.close(); + } catch (Throwable ignored) { + } + input = null; + } + if (output != null) { + try { + output.close(); + } catch (Throwable ignored) { + } + output = null; + } + try { + FileUtil.fullyDelete(parentFile); + } catch (Throwable ignored) { + } + parentFile = null; + tmpFile = null; + } + } + + public boolean hasNext() { + return readBufferUsed || rowsOnDisk > 0; + } + + public ROW next() { + Preconditions.checkState(hasNext()); + if (!readBufferUsed) { + try { + if (input == null && output != null) { + // Close output stream if open + output.close(); + output = null; + + FileInputStream fis = null; + try { + fis = new FileInputStream(tmpFile); + input = new Input(fis); + } finally { + if (input == null && fis != null) { + fis.close(); + } + } + } + if (input != null) { + // Load next batch from disk + if (rowsOnDisk >= IN_MEMORY_NUM_ROWS) { + rowsInReadBuffer = IN_MEMORY_NUM_ROWS; + } else { + rowsInReadBuffer = rowsOnDisk; + } + + for (int i = 0; i < rowsInReadBuffer; i++) { + readBuffer[i] = (ROW) kryo.readClassAndObject(input); + } + + if (input.eof()) { + input.close(); + input = null; + } + + readBufferUsed = true; + readCursor = 0; + rowsOnDisk -= rowsInReadBuffer; + } + } catch (Exception e) { + clear(); // Clean up the cache + throw new RuntimeException("Failed to load rows from disk", e); + } + } + + ROW row = readBuffer[readCursor]; + if (++readCursor >= rowsInReadBuffer) { + readBufferUsed = false; + rowsInReadBuffer = 0; + readCursor = 0; + } + return row; + } + + public int size() { + return rowsInReadBuffer + rowsOnDisk; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index adb7a92..2e15922 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; +import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.serde2.SerDeException; @@ -76,7 +77,15 @@ public void load(MapJoinTableContainer[] mapJoinTables, boolean useOptimizedTables = HiveConf.getBoolVar( hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE); + boolean useHybridGraceHashJoin = HiveConf.getBoolVar( + hconf, HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN); boolean isFirstKey = true; + + // Disable hybrid grace hash join for n-way join + if (mapJoinTables.length > 2) { + useHybridGraceHashJoin = false; + } + for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; @@ -111,15 +120,17 @@ public void load(MapJoinTableContainer[] mapJoinTables, isFirstKey = false; Long keyCountObj = parentKeyCounts.get(pos); long keyCount = (keyCountObj == null) ? -1 : keyCountObj.longValue(); + MapJoinTableContainer tableContainer = useOptimizedTables - ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage) + ? (useHybridGraceHashJoin ? new HybridHashTableContainer(hconf, keyCount, memUsage, + desc.getParentDataSizes().get(pos)) + : new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage)) : new HashMapWrapper(hconf, keyCount); while (kvReader.next()) { tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(), valCtx, (Writable)kvReader.getCurrentValue()); } - tableContainer.seal(); mapJoinTables[pos] = tableContainer; } catch (IOException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index 2270a4d..0394370 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; @@ -220,12 +221,13 @@ public void closeOp(boolean aborted) throws HiveException { if (!aborted && 0 < outputBatch.size) { flushOutput(); } + super.closeOp(aborted); } @Override - protected void setMapJoinKey(ReusableGetAdaptor dest, Object row, byte alias) + protected JoinUtil.JoinResult setMapJoinKey(ReusableGetAdaptor dest, Object row, byte alias) throws HiveException { - dest.setFromVector(keyValues[batchIndex], keyOutputWriters, keyWrapperBatch); + return dest.setFromVector(keyValues[batchIndex], keyOutputWriters, keyWrapperBatch); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index 5f0c0ef..367217d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -124,12 +124,14 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, } MapJoinDesc joinConf = mapJoinOp.getConf(); long keyCount = Long.MAX_VALUE, rowCount = Long.MAX_VALUE, bucketCount = 1; + long tableSize = Long.MAX_VALUE; Statistics stats = parentRS.getStatistics(); if (stats != null) { keyCount = rowCount = stats.getNumRows(); if (keyCount <= 0) { keyCount = rowCount = Long.MAX_VALUE; } + tableSize = stats.getDataSize(); ArrayList keyCols = parentRS.getConf().getOutputKeyColumnNames(); if (keyCols != null && !keyCols.isEmpty()) { // See if we can arrive at a smaller number using distinct stats from key columns. @@ -157,6 +159,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, // We cannot obtain a better estimate without CustomPartitionVertex providing it // to us somehow; in which case using statistics would be completely unnecessary. keyCount /= bucketCount; + tableSize /= bucketCount; } } } @@ -166,6 +169,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, if (keyCount != Long.MAX_VALUE) { joinConf.getParentKeyCounts().put(pos, keyCount); } + joinConf.getParentDataSizes().put(pos, tableSize); int numBuckets = -1; EdgeType edgeType = EdgeType.BROADCAST_EDGE; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index aca4273..370575d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -1296,6 +1296,9 @@ private void fixupParentChildOperators(Operator op, switch (op.getType()) { case MAPJOIN: + // Disable Hybrid Grace Hash Join when vectorization is in effect, for now + HiveConf.setBoolVar(physicalContext.getConf(), + HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN, false); case GROUPBY: case FILTER: case SELECT: diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index bae81e2..4ccbef7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -52,6 +52,7 @@ // TODO: should these rather be arrays? private Map parentToInput = new HashMap(); private Map parentKeyCounts = new HashMap(); + private Map parentDataSizes = new HashMap(); // table alias (small) --> input file name (big) --> target file names (small) private Map>> aliasBucketFileNameMapping; @@ -87,6 +88,7 @@ public MapJoinDesc(MapJoinDesc clone) { this.dumpFilePrefix = clone.dumpFilePrefix; this.parentToInput = clone.parentToInput; this.parentKeyCounts = clone.parentKeyCounts; + this.parentDataSizes = clone.parentDataSizes; } public MapJoinDesc(final Map> keys, @@ -132,6 +134,10 @@ public void setParentToInput(Map parentToInput) { return parentKeyCounts; } + public Map getParentDataSizes() { + return parentDataSizes; + } + @Explain(displayName = "Estimated key counts", normalExplain = false) public String getKeyCountsExplainDesc() { StringBuilder result = null; diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java index 2291d95..7f0a2e6 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java @@ -49,10 +49,10 @@ public void testCapacityValidation() { public void testPutGetOne() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, LOAD_FACTOR, WB_SIZE); RandomKvSource kv = new RandomKvSource(0, 0); - map.put(kv); + map.put(kv, -1); verifyResults(map, kv.getLastKey(), kv.getLastValue()); kv = new RandomKvSource(10, 100); - map.put(kv); + map.put(kv, -1); verifyResults(map, kv.getLastKey(), kv.getLastValue()); } @@ -60,12 +60,12 @@ public void testPutGetOne() throws Exception { public void testPutGetMultiple() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, LOAD_FACTOR, WB_SIZE); RandomKvSource kv = new RandomKvSource(0, 100); - map.put(kv); + map.put(kv, -1); verifyResults(map, kv.getLastKey(), kv.getLastValue()); FixedKeyKvSource kv2 = new FixedKeyKvSource(kv.getLastKey(), 0, 100); kv2.values.add(kv.getLastValue()); for (int i = 0; i < 3; ++i) { - map.put(kv2); + map.put(kv2, -1); verifyResults(map, kv2.key, kv2.values.toArray(new byte[kv2.values.size()][])); } } @@ -74,11 +74,11 @@ public void testPutGetMultiple() throws Exception { public void testGetNonExistent() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, LOAD_FACTOR, WB_SIZE); RandomKvSource kv = new RandomKvSource(1, 100); - map.put(kv); + map.put(kv, -1); byte[] key = kv.getLastKey(); key[0] = (byte)(key[0] + 1); FixedKeyKvSource kv2 = new FixedKeyKvSource(kv.getLastKey(), 0, 100); - map.put(kv2); + map.put(kv2, -1); key[0] = (byte)(key[0] + 1); List results = new ArrayList(0); map.getValueRefs(key, key.length, results); @@ -93,7 +93,7 @@ public void testPutWithFullMap() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, 1f, WB_SIZE); UniqueKeysKvSource kv = new UniqueKeysKvSource(); for (int i = 0; i < CAPACITY; ++i) { - map.put(kv); + map.put(kv, -1); } for (int i = 0; i < kv.keys.size(); ++i) { verifyResults(map, kv.keys.get(i), kv.values.get(i)); @@ -111,7 +111,7 @@ public void testExpand() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(1, 0.0000001f, WB_SIZE); UniqueKeysKvSource kv = new UniqueKeysKvSource(); for (int i = 0; i < 18; ++i) { - map.put(kv); + map.put(kv, -1); for (int j = 0; j <= i; ++j) { verifyResults(map, kv.keys.get(j), kv.values.get(j)); } diff --git ql/src/test/queries/clientpositive/hybridhashjoin.q ql/src/test/queries/clientpositive/hybridhashjoin.q new file mode 100644 index 0000000..0523b18 --- /dev/null +++ ql/src/test/queries/clientpositive/hybridhashjoin.q @@ -0,0 +1,212 @@ +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask.size=1300000; +set hive.mapjoin.optimized.hashtable.wbsize=880000; +set hive.mapjoin.hybridgrace.memcheckfrequency=1024; + +set hive.mapjoin.hybridgrace.hashtable=false; + +-- Base result for inner join +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +; + +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +; + +set hive.mapjoin.hybridgrace.hashtable=true; + +-- Two partitions are created. One in memory, one on disk on creation. +-- The one in memory will eventually exceed memory limit, but won't spill. +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +; + +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +; + +set hive.auto.convert.join.noconditionaltask.size=3000000; +set hive.mapjoin.optimized.hashtable.wbsize=100000; + +set hive.mapjoin.hybridgrace.hashtable=false; + +-- Base result for inner join +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint) t1 +; + +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint) t1 +; + +set hive.mapjoin.hybridgrace.hashtable=true; + +-- 16 partitions are created: 3 in memory, 13 on disk on creation. +-- 1 partition is spilled during first round processing, which ends up having 2 in memory, 14 on disk +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint) t1 +; + +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint) t1 +; + + + +set hive.mapjoin.hybridgrace.hashtable=false; + +-- Base result for outer join +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +; + +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +; + +set hive.mapjoin.hybridgrace.hashtable=true; + +-- 32 partitions are created. 3 in memory, 29 on disk on creation. +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +; + +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +; + + +-- Partitioned table +create table parttbl (key string, value char(20)) partitioned by (dt char(10)); +insert overwrite table parttbl partition(dt='2000-01-01') + select * from src; +insert overwrite table parttbl partition(dt='2000-01-02') + select * from src1; + +set hive.auto.convert.join.noconditionaltask.size=30000000; +set hive.mapjoin.optimized.hashtable.wbsize=10000000; + +set hive.mapjoin.hybridgrace.hashtable=false; + +-- No spill, base result +explain +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +; + +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +; + +set hive.mapjoin.hybridgrace.hashtable=true; + +-- No spill, 2 partitions created in memory +explain +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +; + +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +; + + +set hive.auto.convert.join.noconditionaltask.size=20000; +set hive.mapjoin.optimized.hashtable.wbsize=10000; + +set hive.mapjoin.hybridgrace.hashtable=false; + +-- Spill case base result +explain +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +; + +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +; + +set hive.mapjoin.hybridgrace.hashtable=true; + +-- Spill case, one partition in memory, one spilled on creation +explain +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +; + +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +; + +drop table parttbl; diff --git ql/src/test/results/clientpositive/tez/hybridhashjoin.q.out ql/src/test/results/clientpositive/tez/hybridhashjoin.q.out new file mode 100644 index 0000000..5eb9a8c --- /dev/null +++ ql/src/test/results/clientpositive/tez/hybridhashjoin.q.out @@ -0,0 +1,1156 @@ +PREHOOK: query: -- Base result for inner join +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +PREHOOK: type: QUERY +POSTHOOK: query: -- Base result for inner join +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (cint < 2000000000) (type: boolean) + Statistics: Num rows: 4096 Data size: 880654 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 4096 Data size: 880654 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + input vertices: + 1 Map 3 + Statistics: Num rows: 4505 Data size: 968719 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (cint < 2000000000) (type: boolean) + Statistics: Num rows: 4096 Data size: 880654 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 4096 Data size: 880654 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 4096 Data size: 880654 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +3152013 +PREHOOK: query: -- Two partitions are created. One in memory, one on disk on creation. +-- The one in memory will eventually exceed memory limit, but won't spill. +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +PREHOOK: type: QUERY +POSTHOOK: query: -- Two partitions are created. One in memory, one on disk on creation. +-- The one in memory will eventually exceed memory limit, but won't spill. +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (cint < 2000000000) (type: boolean) + Statistics: Num rows: 4096 Data size: 880654 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 4096 Data size: 880654 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + input vertices: + 1 Map 3 + Statistics: Num rows: 4505 Data size: 968719 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (cint < 2000000000) (type: boolean) + Statistics: Num rows: 4096 Data size: 880654 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 4096 Data size: 880654 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 4096 Data size: 880654 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint + where c.cint < 2000000000) t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +3152013 +PREHOOK: query: -- Base result for inner join +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint) t1 +PREHOOK: type: QUERY +POSTHOOK: query: -- Base result for inner join +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint) t1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 6144 Data size: 1320982 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 6144 Data size: 1320982 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + input vertices: + 1 Map 3 + Statistics: Num rows: 6758 Data size: 1453080 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 6144 Data size: 1320982 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 6144 Data size: 1320982 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 6144 Data size: 1320982 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint) t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint) t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +3152013 +PREHOOK: query: -- 16 partitions are created: 3 in memory, 13 on disk on creation. +-- 1 partition is spilled during first round processing, which ends up having 2 in memory, 14 on disk +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint) t1 +PREHOOK: type: QUERY +POSTHOOK: query: -- 16 partitions are created: 3 in memory, 13 on disk on creation. +-- 1 partition is spilled during first round processing, which ends up having 2 in memory, 14 on disk +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint) t1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 6144 Data size: 1320982 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 6144 Data size: 1320982 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + input vertices: + 1 Map 3 + Statistics: Num rows: 6758 Data size: 1453080 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 6144 Data size: 1320982 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 6144 Data size: 1320982 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 6144 Data size: 1320982 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint) t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + inner join alltypesorc cd + on cd.cint = c.cint) t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +3152013 +PREHOOK: query: -- Base result for outer join +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +PREHOOK: type: QUERY +POSTHOOK: query: -- Base result for outer join +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Left Outer Join0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + input vertices: + 1 Map 3 + Statistics: Num rows: 13516 Data size: 2906160 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +3155128 +PREHOOK: query: -- 32 partitions are created. 3 in memory, 29 on disk on creation. +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +PREHOOK: type: QUERY +POSTHOOK: query: -- 32 partitions are created. 3 in memory, 29 on disk on creation. +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Left Outer Join0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + input vertices: + 1 Map 3 + Statistics: Num rows: 13516 Data size: 2906160 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +3155128 +PREHOOK: query: -- Partitioned table +create table parttbl (key string, value char(20)) partitioned by (dt char(10)) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@parttbl +POSTHOOK: query: -- Partitioned table +create table parttbl (key string, value char(20)) partitioned by (dt char(10)) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@parttbl +PREHOOK: query: insert overwrite table parttbl partition(dt='2000-01-01') + select * from src +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@parttbl@dt=2000-01-01 +POSTHOOK: query: insert overwrite table parttbl partition(dt='2000-01-01') + select * from src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@parttbl@dt=2000-01-01 +POSTHOOK: Lineage: parttbl PARTITION(dt=2000-01-01).key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: parttbl PARTITION(dt=2000-01-01).value EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: insert overwrite table parttbl partition(dt='2000-01-02') + select * from src1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src1 +PREHOOK: Output: default@parttbl@dt=2000-01-02 +POSTHOOK: query: insert overwrite table parttbl partition(dt='2000-01-02') + select * from src1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src1 +POSTHOOK: Output: default@parttbl@dt=2000-01-02 +POSTHOOK: Lineage: parttbl PARTITION(dt=2000-01-02).key SIMPLE [(src1)src1.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: parttbl PARTITION(dt=2000-01-02).value EXPRESSION [(src1)src1.FieldSchema(name:value, type:string, comment:default), ] +PREHOOK: query: -- No spill, base result +explain +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +PREHOOK: type: QUERY +POSTHOOK: query: -- No spill, base result +explain +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: p1 + Statistics: Num rows: 525 Data size: 12474 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 263 Data size: 6248 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 key (type: string) + 1 key (type: string) + input vertices: + 1 Map 3 + Statistics: Num rows: 289 Data size: 6872 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: p2 + Statistics: Num rows: 525 Data size: 12474 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 263 Data size: 6248 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Statistics: Num rows: 263 Data size: 6248 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@parttbl +PREHOOK: Input: default@parttbl@dt=2000-01-01 +PREHOOK: Input: default@parttbl@dt=2000-01-02 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parttbl +POSTHOOK: Input: default@parttbl@dt=2000-01-01 +POSTHOOK: Input: default@parttbl@dt=2000-01-02 +#### A masked pattern was here #### +1217 +PREHOOK: query: -- No spill, 2 partitions created in memory +explain +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +PREHOOK: type: QUERY +POSTHOOK: query: -- No spill, 2 partitions created in memory +explain +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: p1 + Statistics: Num rows: 525 Data size: 12474 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 263 Data size: 6248 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 key (type: string) + 1 key (type: string) + input vertices: + 1 Map 3 + Statistics: Num rows: 289 Data size: 6872 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: p2 + Statistics: Num rows: 525 Data size: 12474 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 263 Data size: 6248 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Statistics: Num rows: 263 Data size: 6248 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@parttbl +PREHOOK: Input: default@parttbl@dt=2000-01-01 +PREHOOK: Input: default@parttbl@dt=2000-01-02 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parttbl +POSTHOOK: Input: default@parttbl@dt=2000-01-01 +POSTHOOK: Input: default@parttbl@dt=2000-01-02 +#### A masked pattern was here #### +1217 +PREHOOK: query: -- Spill case base result +explain +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +PREHOOK: type: QUERY +POSTHOOK: query: -- Spill case base result +explain +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: p1 + Statistics: Num rows: 525 Data size: 12474 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 263 Data size: 6248 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 key (type: string) + 1 key (type: string) + input vertices: + 1 Map 3 + Statistics: Num rows: 289 Data size: 6872 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: p2 + Statistics: Num rows: 525 Data size: 12474 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 263 Data size: 6248 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Statistics: Num rows: 263 Data size: 6248 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@parttbl +PREHOOK: Input: default@parttbl@dt=2000-01-01 +PREHOOK: Input: default@parttbl@dt=2000-01-02 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parttbl +POSTHOOK: Input: default@parttbl@dt=2000-01-01 +POSTHOOK: Input: default@parttbl@dt=2000-01-02 +#### A masked pattern was here #### +1217 +PREHOOK: query: -- Spill case, one partition in memory, one spilled on creation +explain +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +PREHOOK: type: QUERY +POSTHOOK: query: -- Spill case, one partition in memory, one spilled on creation +explain +select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: p1 + Statistics: Num rows: 525 Data size: 12474 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 263 Data size: 6248 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 key (type: string) + 1 key (type: string) + input vertices: + 1 Map 3 + Statistics: Num rows: 289 Data size: 6872 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: p2 + Statistics: Num rows: 525 Data size: 12474 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 263 Data size: 6248 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Statistics: Num rows: 263 Data size: 6248 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@parttbl +PREHOOK: Input: default@parttbl@dt=2000-01-01 +PREHOOK: Input: default@parttbl@dt=2000-01-02 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select p1.value + from parttbl p1 + inner join parttbl p2 + on p1.key = p2.key) t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parttbl +POSTHOOK: Input: default@parttbl@dt=2000-01-01 +POSTHOOK: Input: default@parttbl@dt=2000-01-02 +#### A masked pattern was here #### +1217 +PREHOOK: query: drop table parttbl +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@parttbl +PREHOOK: Output: default@parttbl +POSTHOOK: query: drop table parttbl +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@parttbl +POSTHOOK: Output: default@parttbl diff --git serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java index 7edd09a..f9ab964 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java +++ serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.util.hash.MurmurHash; /** @@ -495,7 +494,7 @@ public void writeInt(long offset, int v) { } // Lifted from org.apache.hadoop.util.hash.MurmurHash... but supports offset. - private static int murmurHash(byte[] data, int offset, int length) { + public static int murmurHash(byte[] data, int offset, int length) { int m = 0x5bd1e995; int r = 24; @@ -544,4 +543,12 @@ private static int murmurHash(byte[] data, int offset, int length) { return h; } + + /** + * Write buffer size + * @return write buffer size + */ + public long size() { + return writeBuffers.size() * (long) wbSize; + } } \ No newline at end of file diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java index 789e5a6..c373047 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java @@ -126,7 +126,8 @@ public void initialize(Configuration job, Properties tbl) // Create the ObjectInspectors for the fields cachedObjectInspector = LazyFactory.createLazyStructInspector(serdeParams - .getColumnNames(), serdeParams.getColumnTypes(), serdeParams); + .getColumnNames(), serdeParams.getColumnTypes(), + new LazyObjectInspectorParametersImpl(serdeParams)); cachedLazyStruct = (LazyStruct) LazyFactory .createLazyObject(cachedObjectInspector);