diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a809f17..da14a7b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -707,6 +707,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", true, "Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,\n" + "because memory-optimized hashtable cannot be serialized."), + HIVEMAPJOINUSEHYBRIDGRACEHASHJOIN("hive.mapjoin.hybridgrace.hashtable", true, "Whether to use " + + "hybrid grace hash join as the join method for mapjoin."), HIVEHASHTABLEWBSIZE("hive.mapjoin.optimized.hashtable.wbsize", 10 * 1024 * 1024, "Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a chain of buffers to\n" + "store data. This is one buffer size. HT may be slightly faster if this is larger, but for small\n" + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java index 4013b7f..fc2b5d2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java @@ -48,6 +48,12 @@ public class JoinUtil { + public static enum JoinResult { + MATCH, // a match is found + NOMATCH, // no match is found, and the current row will be dropped + SPILL // no match is found, and the current row has been spilled to disk + } + public static List[] getObjectInspectorsFromEvaluators( List[] exprEntries, ObjectInspector[] inputObjInspector, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index c6bfd03..4b7fb2a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -25,26 +26,23 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.HashTableLoaderFactory; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.*; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; -import org.apache.hadoop.hive.ql.exec.persistence.UnwrapRowContainer; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.*; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.util.ReflectionUtils; +import static org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition; +import static org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper; + /** * Map side Join operator implementation. */ @@ -67,6 +65,13 @@ private transient ReusableGetAdaptor[] hashMapRowGetters; private UnwrapRowContainer[] unwrapContainer; + private transient Configuration hconf; + private transient boolean useHybridGraceHashJoin; // whether Hybrid Grace Hash Join is enabled + private transient boolean hybridMapJoinLeftover; // indicates there's on-disk data to be processed in another round + private transient HashPartition currentPartition; + private transient MapJoinBytesTableContainer currentSmallTable; + private transient int tag; + private transient int smallTable; public MapJoinOperator() { } @@ -92,6 +97,7 @@ public void startGroup() throws HiveException { @Override protected void initializeOp(Configuration hconf) throws HiveException { + this.hconf = hconf; unwrapContainer = new UnwrapRowContainer[conf.getTagLength()]; super.initializeOp(hconf); @@ -122,6 +128,9 @@ protected void initializeOp(Configuration hconf) throws HiveException { mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen]; hashTblInitedOnce = false; } + + useHybridGraceHashJoin = + HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINUSEHYBRIDGRACEHASHJOIN); } @Override @@ -228,9 +237,9 @@ public void cleanUpInputFileChangedOp() throws HiveException { } } - protected void setMapJoinKey( + protected JoinUtil.JoinResult setMapJoinKey( ReusableGetAdaptor dest, Object row, byte alias) throws HiveException { - dest.setFromRow(row, joinKeys[alias], joinKeysObjectInspectors[alias]); + return dest.setFromRow(row, joinKeys[alias], joinKeysObjectInspectors[alias]); } protected MapJoinKey getRefKey(byte alias) { @@ -247,6 +256,15 @@ protected MapJoinKey getRefKey(byte alias) { @Override public void processOp(Object row, int tag) throws HiveException { + this.tag = tag; + + // As we're calling processOp again to process the leftover triplets, we know the "row" is + // coming from the on-disk matchfile. We need to recreate hashMapRowGetter against new hashtable. + if (hybridMapJoinLeftover) { + MapJoinKey refKey = getRefKey((byte)tag); + hashMapRowGetters[smallTable] = currentSmallTable.createGetter(refKey); + } + try { if (firstRow) { generateMapMetaData(); @@ -271,14 +289,16 @@ public void processOp(Object row, int tag) throws HiveException { boolean joinNeeded = false; for (byte pos = 0; pos < order.length; pos++) { if (pos != alias) { + smallTable = pos; + JoinUtil.JoinResult joinResult; ReusableGetAdaptor adaptor; if (firstSetKey == null) { adaptor = firstSetKey = hashMapRowGetters[pos]; - setMapJoinKey(firstSetKey, row, alias); + joinResult = setMapJoinKey(firstSetKey, row, alias); } else { // Keys for all tables are the same, so only the first has to deserialize them. adaptor = hashMapRowGetters[pos]; - adaptor.setFromOther(firstSetKey); + joinResult = adaptor.setFromOther(row, firstSetKey); } MapJoinRowContainer rowContainer = adaptor.getCurrentRows(); if (rowContainer != null && unwrapContainer[pos] != null) { @@ -287,9 +307,14 @@ public void processOp(Object row, int tag) throws HiveException { } // there is no join-value or join-key has all null elements if (rowContainer == null || firstSetKey.hasAnyNulls(fieldCount, nullsafes)) { - if (!noOuterJoin) { - joinNeeded = true; - storage[pos] = dummyObjVectors[pos]; + if(!noOuterJoin) { + // For Hybrid Grace Hash Join, during the 1st round processing, + // we only keep the LEFT side if the row is not spilled + if (!useHybridGraceHashJoin || hybridMapJoinLeftover || + (!hybridMapJoinLeftover && joinResult != JoinUtil.JoinResult.SPILL)) { + joinNeeded = true; + storage[pos] = dummyObjVectors[pos]; + } } else { storage[pos] = emptyList; } @@ -326,6 +351,42 @@ public void closeOp(boolean abort) throws HiveException { for (MapJoinTableContainer tableContainer : mapJoinTables) { if (tableContainer != null) { tableContainer.dumpMetrics(); + + if (tableContainer instanceof HybridHashTableContainer) { + HybridHashTableContainer hybridHashTableContainer = (HybridHashTableContainer) tableContainer; + hybridHashTableContainer.dumpStats(); + + HashPartition[] hashPartitions = hybridHashTableContainer.getHashPartitions(); + + // Clear all in memory partitions first + for (int i = 0; i < hashPartitions.length; i++) { + if (!hashPartitions[i].getHashMapOnDisk()) { + hybridHashTableContainer.setTotalInMemRowCount( + hybridHashTableContainer.getTotalInMemRowCount() - + hashPartitions[i].getInMemRowCount()); + hashPartitions[i].getHashMapFromMemory().clear(); + } + } + assert hybridHashTableContainer.getTotalInMemRowCount() == 0; + + for (int i = 0; i < hashPartitions.length; i++) { + if (hashPartitions[i].getHashMapOnDisk()) { + // Recursively process on-disk triplets (hash partition, sidefile, matchfile) + try { + hybridMapJoinLeftover = true; + continueProcessOp(hashPartitions[i], hybridHashTableContainer); + } catch (IOException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } catch (SerDeException e) { + e.printStackTrace(); + } + } + currentPartition = null; + currentSmallTable = null; + } + } } } if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) @@ -341,6 +402,68 @@ public void closeOp(boolean abort) throws HiveException { } /** + * This will call processOp again to deal with the leftover partitions/rowcontainers on disk + */ + private void continueProcessOp(HashPartition partition, HybridHashTableContainer hybridHashTableContainer) + throws HiveException, IOException, ClassNotFoundException, SerDeException { + reloadHashTable(partition, hybridHashTableContainer); + // Iterate thru the on-disk matchfile, and feed a row to processOp every time + RowContainer bigTable = partition.getMatchfileRowContainer(hconf); + RowContainer.RowIterator> iter = bigTable.rowIter(); + for (List list = iter.first(); list != null; list = iter.next()) { + Object row = list.get(0); + // Perform the join between hashtable and bigtable by reusing processOp logic + processOp(row, tag); + } + } + + /** + * Create a new in-memory hashtable by: + * a) deserializing an on-disk hash table, and + * b) merging an on-disk row container + * + * TODO recursive spilling support + * During reloading spilling may be necessary again, since it's possible that + * the sum of on-disk partition and sidefile is still greater than memory limit. + * If that happens, we need to do one more level of spilling. + * This spilling pattern can happen recursively, or be disabled once we have N levels of spilling. + */ + private void reloadHashTable(HashPartition partition, HybridHashTableContainer hybridHashTableContainer) + throws IOException, ClassNotFoundException, HiveException, SerDeException { + // Deserialize the on-disk hash table + // We're sure this part is smaller than memory limit + BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk(partition); + int keyCount = restoredHashMap.size(); + + // Merge the sidefile into the newly created hash table + // This is where the spilling may happen again + RowContainer sidefile = partition.getSidefileRowContainer(hconf); + keyCount += sidefile.rowCount(); + + // If based on the new key count, keyCount is smaller than a threshold, + // then just load the entire restored hashmap into memory. + // The size of deserialized partition shouldn't exceed half of memory limit + // TODO this info can be more accurate when memory mgmt is available + long totalOrigMemUsed = hybridHashTableContainer.getHashPartitions().length * partition.getWbSize(); + assert keyCount * hybridHashTableContainer.getTableRowSize() < totalOrigMemUsed / 2 : + "Hash table cannot be reloaded since it will be greater than memory limit and recursive" + + " spilling is currently not supported"; + RowContainer.RowIterator> iter = sidefile.rowIter(); + for (List list = iter.first(); list != null; list = iter.next()) { + KeyValueHelper kvHelper = list.get(0); + restoredHashMap.put(kvHelper, null); + } + + hybridHashTableContainer.setTotalInMemRowCount(hybridHashTableContainer.getTotalInMemRowCount() + + restoredHashMap.size() + sidefile.rowCount()); + + // Since there's only one hashmap to deal with, it's OK to create a MapJoinBytesTableContainer + currentSmallTable = new MapJoinBytesTableContainer(restoredHashMap); + currentSmallTable.setInternalValueOi(hybridHashTableContainer.getInternalValueOi()); + currentSmallTable.setSortableSortOrders(hybridHashTableContainer.getSortableSortOrders()); + } + + /** * Implements the getName function for the Node Interface. * * @return the name of the operator diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 8d3e3cc..90d6fa6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -18,6 +18,12 @@ package org.apache.hadoop.hive.ql.exec.persistence; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -45,7 +51,8 @@ * Initially inspired by HPPC LongLongOpenHashMap; however, the code is almost completely reworked * and there's very little in common left save for quadratic probing (and that with some changes). */ -public final class BytesBytesMultiHashMap { +public final class BytesBytesMultiHashMap implements Serializable { + private static final long serialVersionUID = 1L; public static final Log LOG = LogFactory.getLog(BytesBytesMultiHashMap.class); /* @@ -120,7 +127,7 @@ */ private WriteBuffers writeBuffers; - private final float loadFactor; + private float loadFactor; private int resizeThreshold; private int keysAssigned; @@ -154,7 +161,7 @@ private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024; public BytesBytesMultiHashMap(int initialCapacity, - float loadFactor, int wbSize, long memUsage, int defaultCapacity) { + float loadFactor, int wbSize, long memUsage) { if (loadFactor < 0 || loadFactor > 1) { throw new AssertionError("Load factor must be between (0, 1]."); } @@ -180,7 +187,7 @@ public BytesBytesMultiHashMap(int initialCapacity, @VisibleForTesting BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) { - this(initialCapacity, loadFactor, wbSize, -1, 100000); + this(initialCapacity, loadFactor, wbSize, -1); } /** The source of keys and values to put into hashtable; avoids byte copying. */ @@ -204,7 +211,7 @@ public BytesBytesMultiHashMap(int initialCapacity, * @param kv Keyvalue writer. Each method will be called at most once. */ private static final byte[] FOUR_ZEROES = new byte[] { 0, 0, 0, 0 }; - public void put(KvSource kv) throws SerDeException { + public void put(KvSource kv, Integer keyHashCode) throws SerDeException { if (resizeThreshold <= keysAssigned) { expandAndRehash(); } @@ -218,7 +225,7 @@ public void put(KvSource kv) throws SerDeException { kv.writeKey(writeBuffers); int keyLength = (int)(writeBuffers.getWritePoint() - keyOffset); - int hashCode = writeBuffers.hashCode(keyOffset, keyLength); + int hashCode = (keyHashCode == null) ? writeBuffers.hashCode(keyOffset, keyLength) : keyHashCode; int slot = findKeySlotToWrite(keyOffset, keyLength, hashCode); // LOG.info("Write hash code is " + Integer.toBinaryString(hashCode) + " - " + slot); @@ -301,10 +308,24 @@ public void populateValue(WriteBuffers.ByteSegmentRef valueRef) { writeBuffers.populateValue(valueRef); } + /** + * Number of keys in the hashmap + * @return number of keys + */ public int size() { return keysAssigned; } + /** + * Number of bytes used by the hashmap + * There are two main components that take most memory: writeBuffers and refs + * Others include instance fields: 100 + * @return number of bytes + */ + public long memorySize() { + return writeBuffers.size() + refs.length * 8 + 100; + } + public void seal() { writeBuffers.seal(); } @@ -750,4 +771,22 @@ private void debugDumpKeyProbe(long keyOffset, int keyLength, int hashCode, int } LOG.info(sb.toString()); } + + + public static void serialize(OutputStream fos, BytesBytesMultiHashMap hashmap) + throws IOException { + ObjectOutputStream oos = new ObjectOutputStream(fos); + oos.writeObject(hashmap); + oos.close(); + fos.close(); + } + + public static BytesBytesMultiHashMap deserialize(InputStream fis) + throws IOException, ClassNotFoundException { + ObjectInputStream ois = new ObjectInputStream(fis); + BytesBytesMultiHashMap hashmap = (BytesBytesMultiHashMap) ois.readObject(); + ois.close(); + fis.close(); + return hashmap; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java index 3adaab7..97200e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -153,7 +154,7 @@ public GetAdaptor(MapJoinKey key) { } @Override - public void setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, + public JoinUtil.JoinResult setFromVector(Object row, VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException { if (currentKey == null) { currentKey = new Object[keyOutputWriters.length]; @@ -168,11 +169,17 @@ public void setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyO key = MapJoinKey.readFromVector(output, key, currentKey, vectorKeyOIs, !isFirstKey); isFirstKey = false; this.currentValue = mHash.get(key); + if (this.currentValue == null) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } } @Override - public void setFromRow(Object row, List fields, - List ois) throws HiveException { + public JoinUtil.JoinResult setFromRow(Object row, List fields, List ois) + throws HiveException { if (currentKey == null) { currentKey = new Object[fields.size()]; } @@ -182,15 +189,27 @@ public void setFromRow(Object row, List fields, key = MapJoinKey.readFromRow(output, key, currentKey, ois, !isFirstKey); isFirstKey = false; this.currentValue = mHash.get(key); + if (this.currentValue == null) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } } @Override - public void setFromOther(ReusableGetAdaptor other) { + public JoinUtil.JoinResult setFromOther(Object row, ReusableGetAdaptor other) { assert other instanceof GetAdaptor; GetAdaptor other2 = (GetAdaptor)other; this.key = other2.key; this.isFirstKey = other2.isFirstKey; this.currentValue = mHash.get(key); + if (this.currentValue == null) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } } @Override @@ -223,4 +242,9 @@ public MapJoinKey getAnyKey() { public void dumpMetrics() { // Nothing to do. } + + @Override + public boolean hasSpill() { + return false; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java new file mode 100644 index 0000000..bb3616d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -0,0 +1,690 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.persistence; + + +import com.esotericsoftware.kryo.Kryo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; +import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.*; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazybinary.*; +import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.SequenceFileInputFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; + +/** + * Table container that serializes keys and values using LazyBinarySerDe into + * BytesBytesMultiHashMap, with very low memory overhead. However, + * there may be some perf overhead when retrieving rows. + */ +public class HybridHashTableContainer implements MapJoinTableContainer { + private static final Log LOG = LogFactory.getLog(MapJoinTableContainer.class); + private final Configuration hconf; + + /** + * This class encapsulates the triplet together since they are closely related to each other + */ + public static class HashPartition { + BytesBytesMultiHashMap hashMap; // In memory hashMap + int inMemRowCount; // Number of rows that reside in memory + RowContainer sidefileRowContainer; // Stores small table rows + RowContainer matchfileRowContainer; // Stores big table rows + Path hashMapLocalPath; // Local file system path for spilled hashMap + boolean hashMapOnDisk; // Status of hashMap. true: on disk, false: in memory + int wbSize; + + public HashPartition(int partitionThreshold, float loadFactor, int wbSize, long memUsage) { + hashMap = + new BytesBytesMultiHashMap(partitionThreshold, loadFactor, wbSize, memUsage); + this.wbSize = wbSize; + } + + public HashPartition(BytesBytesMultiHashMap hashMap) { + this.hashMap = hashMap; + } + + public BytesBytesMultiHashMap getHashMapFromMemory() { + return hashMap; + } + + public BytesBytesMultiHashMap getHashMapFromDisk(HashPartition partition) + throws IOException, ClassNotFoundException { + InputStream inputStream = Files.newInputStream(hashMapLocalPath); + BytesBytesMultiHashMap restoredHashMap = BytesBytesMultiHashMap.deserialize(inputStream); + inputStream.close(); + Files.delete(hashMapLocalPath); + return restoredHashMap; + } + + public RowContainer getSidefileRowContainer(Configuration jc) throws HiveException { + if (sidefileRowContainer == null) { + sidefileRowContainer = new RowContainer(jc, null); + // Logic below is borrowed from JoinUtil.initSpillTables + TableDesc tblDesc = new TableDesc( + SequenceFileInputFormat.class, HiveSequenceFileOutputFormat.class, + Utilities.makeProperties( + org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT, "" + + Utilities.ctrlaCode, + serdeConstants.SERIALIZATION_LIB, LazyBinarySerDe.class.getName())); + sidefileRowContainer.setTableDesc(tblDesc); + } + return sidefileRowContainer; + } + + public RowContainer getMatchfileRowContainer(Configuration jc) throws HiveException { + if (matchfileRowContainer == null) { + matchfileRowContainer = new RowContainer(jc, null); + // Below logic is borrowed from JoinUtil.initSpillTables + TableDesc tblDesc = new TableDesc( + SequenceFileInputFormat.class, HiveSequenceFileOutputFormat.class, + Utilities.makeProperties( + org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT, "" + + Utilities.ctrlaCode, + serdeConstants.SERIALIZATION_LIB, LazyBinarySerDe.class.getName())); + matchfileRowContainer.setTableDesc(tblDesc); + } + return matchfileRowContainer; + } + + public boolean getHashMapOnDisk() { + return hashMapOnDisk; + } + + public int getInMemRowCount() { + return inMemRowCount; + } + + public int getWbSize() { + return wbSize; + } + } + + private HashPartition[] hashPartitions; + // Total row count of all partitions that are in memory + private int totalInMemRowCount = 0; + private long memoryThreshold; + private long tableSize; + private long tableRowSize; + private boolean isSpilled; + + /** The OI used to deserialize values. We never deserialize keys. */ + private LazyBinaryStructObjectInspector internalValueOi; + private boolean[] sortableSortOrders; + + private final List EMPTY_LIST = new ArrayList(0); + + // How often to check if memory is full. Default is to check every 1024 rows. + // Note: this number MUST be power of 2. + private final static int MEMORY_CHECK_FREQUENCY = 1024; + + public HybridHashTableContainer(Configuration hconf, MapJoinObjectSerDeContext valCtx, + long keyCount, long memUsage, long tableSize) + throws SerDeException { + this(hconf, + HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD), + HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), + HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), + HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD), + tableSize, valCtx, keyCount, memUsage); + } + + private HybridHashTableContainer(Configuration hconf, float keyCountAdj, int threshold, + float loadFactor, int wbSize, long noConditionalTaskThreshold, + long tableSize, MapJoinObjectSerDeContext valCtx, + long keyCount, long memUsage) + throws SerDeException { + this.hconf = hconf; + this.memoryThreshold = noConditionalTaskThreshold; + this.tableSize = tableSize; + this.tableRowSize = tableSize / keyCount; + + int numPartitions = calcNumPartitions(tableSize, wbSize, keyCount, memUsage); + hashPartitions = new HashPartition[numPartitions]; + + int keyCountPerPart = (int) (keyCount / numPartitions); + if (keyCountPerPart == 0) { + keyCountPerPart = (int) keyCount; + } + + int partitionThreshold = HashMapWrapper.calculateTableSize( + keyCountAdj, threshold, loadFactor, keyCountPerPart); + + for (int i = 0; i < numPartitions; i++) { + hashPartitions[i] = new HashPartition(partitionThreshold, loadFactor, wbSize, memUsage); + } + } + + public HashPartition[] getHashPartitions() { + return hashPartitions; + } + + public LazyBinaryStructObjectInspector getInternalValueOi() { + return internalValueOi; + } + + public boolean[] getSortableSortOrders() { + return sortableSortOrders; + } + + @SuppressWarnings("deprecation") + @Override + public MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentKey, + MapJoinObjectSerDeContext valueContext, Writable currentValue) + throws SerDeException, HiveException, IOException { + SerDe keySerde = keyContext.getSerDe(), valSerde = valueContext.getSerDe(); + MapJoinBytesTableContainer.KeyValueHelper writeHelper; // Make sure writeHelper is a different object every time + + LOG.info("Initializing container with " + + keySerde.getClass().getName() + " and " + valSerde.getClass().getName()); + + // We assume this hashtable is loaded only when tez is enabled + LazyBinaryStructObjectInspector valSoi = + (LazyBinaryStructObjectInspector) valSerde.getObjectInspector(); + writeHelper = new MapJoinBytesTableContainer.LazyBinaryKvWriter(keySerde, valSoi, valueContext.hasFilterTag()); + if (internalValueOi == null) { + internalValueOi = valSoi; + } + if (sortableSortOrders == null) { + sortableSortOrders = ((BinarySortableSerDe) keySerde).getSortOrders(); + } + + writeHelper.setKeyValue(currentKey, currentValue); + putRowIntoPartition(writeHelper, currentKey, currentValue); + return null; // there's no key to return + } + + /** + * Put the row into corresponding hashtable (partition) based on the hash value of the key + * @param writeHelper the keyValueHelper that contains key and value + * @throws SerDeException + */ + private void putRowIntoPartition(MapJoinBytesTableContainer.KeyValueHelper writeHelper, + Writable key, Writable value) + throws SerDeException, HiveException, IOException { + int keyHash = writeHelper.getHashFromKey(); + int partitionId = keyHash & (hashPartitions.length - 1); + HashPartition hashPartition = hashPartitions[partitionId]; + + if (isOnDisk(partitionId)) { + RowContainer rc = hashPartition.getSidefileRowContainer(hconf); + List listHelper = + new ArrayList(); + // Make a deep copy of key and value + writeHelper.setKeyValue(WritableUtils.clone(key, hconf), + WritableUtils.clone(value, hconf)); + listHelper.add(writeHelper); + rc.addRow(listHelper); + } else { + int sizeBefore = hashPartition.hashMap.size(); + hashPartition.hashMap.put(writeHelper, keyHash); // Pass along hashcode to avoid recalculation + int sizeAfter = hashPartition.hashMap.size(); + if (sizeBefore != sizeAfter) { + hashPartition.inMemRowCount += 1; + totalInMemRowCount += 1; + } + + if (totalInMemRowCount != 0 && (totalInMemRowCount & (MEMORY_CHECK_FREQUENCY - 1)) == 0) { + if (isMemoryFull()) { + spillPartition(biggestPartition()); + this.setSpill(true); + } + } + } + } + + /** + * Check if a specific hashmap is on disk + * @param partitionId hashmap ID + * @return true if on disk, false if in memory + */ + private boolean isOnDisk(int partitionId) { + return hashPartitions[partitionId].hashMapOnDisk; + } + + /** TODO For n-way join, this method need modification + * Check if the memory threshold has been reached by hash tables + * @return true if memory is full, false if not + */ + private boolean isMemoryFull() { + long size = 0; + for (HashPartition hashPartition : hashPartitions) { + size += hashPartition.hashMap.memorySize(); + } + return size >= memoryThreshold; + } + + /** + * Returns the biggest hashmap in memory at this moment + * @return the biggest hashmap ID + */ + private int biggestPartition() { + int res = 0; + int maxSize = 0; + + // If a partition has been spilled to disk, its size will be 0, i.e. it won't be picked + for (int i = 0; i < hashPartitions.length; i++) { + int size = hashPartitions[i].hashMap.size(); + if (size > maxSize) { + maxSize = size; + res = i; + } + } + return res; + } + + /** + * Move a hashtable from memory into local file system + * @param partitionId the hashtable to be moved + */ + private void spillPartition(int partitionId) throws IOException { + HashPartition partition = hashPartitions[partitionId]; + OutputStream outputStream = null; + Path path = null; + + path = Files.createTempFile("partition-" + partitionId + "-", null); + outputStream = Files.newOutputStream(path); + BytesBytesMultiHashMap.serialize(outputStream, partition.hashMap); + outputStream.close(); + + partition.hashMapLocalPath = path; + partition.hashMapOnDisk = true; + + long size = 0; + for (HashPartition hashPartition : hashPartitions) { + size += hashPartition.hashMap.memorySize(); + } + LOG.info("Hybrid Map Join memory usage before spilling: " + size); + LOG.info("Hash partition " + partitionId + " (Rows: " + partition.inMemRowCount + ", Mem size: " + + partition.hashMap.memorySize() + ") is spilled to disk: " + path); + LOG.info("Hybrid Map Join memory usage after spilling: " + (size - partition.hashMap.memorySize())); + + totalInMemRowCount -= partition.inMemRowCount; + partition.inMemRowCount = 0; + partition.hashMap.clear(); + } + + /** + * Calculate how many partitions are needed. This is an estimation. + * @param dataSize total data size for the table + * @param keyCount estimated key count + * @param memUsage total memory available TODO right now memUsage is always 0 + * @return number of partitions needed + */ + private int calcNumPartitions(long dataSize, int wbSize, long keyCount, long memUsage) { + int lowerLimit = 2; + int upperLimit = 16; + int number = (int) (memUsage / wbSize); + if (memUsage <= 0 || number <= lowerLimit) { + return lowerLimit; + } + + number = (int) (dataSize / wbSize); + // Make sure numPartitions is power of 2, to make N & (M - 1) easy when calculating partition No. + number = (Long.bitCount(number) == 1) ? number : Integer.highestOneBit(number) << 1; + + while (dataSize / number > memUsage) { + number *= 2; + } + + if (number > upperLimit) { + return upperLimit; + } + return number; + } + + public int getTotalInMemRowCount() { + return totalInMemRowCount; + } + + public void setTotalInMemRowCount(int totalInMemRowCount) { + this.totalInMemRowCount = totalInMemRowCount; + } + + public long getTableRowSize() { + return tableRowSize; + } + + @Override + public boolean hasSpill() { + return isSpilled; + } + + public void setSpill(boolean isSpilled) { + this.isSpilled = isSpilled; + } + + @Override + public void clear() { + for (HashPartition hp : hashPartitions) { + hp.hashMap.clear(); + } + } + + @Override + public MapJoinKey getAnyKey() { + return null; // This table has no keys. + } + + @Override + public ReusableGetAdaptor createGetter(MapJoinKey keyTypeFromLoader) { + if (keyTypeFromLoader != null) { + throw new AssertionError("No key expected from loader but got " + keyTypeFromLoader); + } + return new GetAdaptor(); + } + + @Override + public void seal() { + for (HashPartition hp : hashPartitions) { + // Only seal those partitions that haven't been spilled and cleared, + // because once a hashMap is cleared, it will become unusable + if (hp.hashMap.size() != 0) { + hp.hashMap.seal(); + } + } + } + + /** Implementation of ReusableGetAdaptor that has Output for key serialization; row + * container is also created once and reused for every row. */ + private class GetAdaptor implements ReusableGetAdaptor { + + private Object[] currentKey; + private boolean[] nulls; + private List vectorKeyOIs; + + private final ReusableRowContainer currentValue; + private final Output output; + + public GetAdaptor() { + currentValue = new ReusableRowContainer(); + output = new Output(); + } + + @Override + public JoinUtil.JoinResult setFromVector(Object row, VectorHashKeyWrapper kw, + VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) + throws HiveException { + if (nulls == null) { + nulls = new boolean[keyOutputWriters.length]; + currentKey = new Object[keyOutputWriters.length]; + vectorKeyOIs = new ArrayList(); + for (int i = 0; i < keyOutputWriters.length; i++) { + vectorKeyOIs.add(keyOutputWriters[i].getObjectInspector()); + } + } else { + assert nulls.length == keyOutputWriters.length; + } + for (int i = 0; i < keyOutputWriters.length; i++) { + currentKey[i] = keyWrapperBatch.getWritableKeyValue(kw, i, keyOutputWriters[i]); + nulls[i] = currentKey[i] == null; + } + return currentValue.setFromOutput(row, + MapJoinKey.serializeRow(output, currentKey, vectorKeyOIs, sortableSortOrders)); + } + + @Override + public JoinUtil.JoinResult setFromRow(Object row, List fields, + List ois) throws HiveException { + if (nulls == null) { + nulls = new boolean[fields.size()]; + currentKey = new Object[fields.size()]; + } + for (int keyIndex = 0; keyIndex < fields.size(); ++keyIndex) { + currentKey[keyIndex] = fields.get(keyIndex).evaluate(row); + nulls[keyIndex] = currentKey[keyIndex] == null; + } + return currentValue.setFromOutput(row, + MapJoinKey.serializeRow(output, currentKey, ois, sortableSortOrders)); + } + + @Override + public JoinUtil.JoinResult setFromOther(Object row, ReusableGetAdaptor other) throws HiveException { + assert other instanceof GetAdaptor; + GetAdaptor other2 = (GetAdaptor)other; + nulls = other2.nulls; + currentKey = other2.currentKey; + return currentValue.setFromOutput(row, other2.output); + } + + @Override + public boolean hasAnyNulls(int fieldCount, boolean[] nullsafes) { + if (nulls == null || nulls.length == 0) return false; + for (int i = 0; i < nulls.length; i++) { + if (nulls[i] && (nullsafes == null || !nullsafes[i])) { + return true; + } + } + return false; + } + + @Override + public MapJoinRowContainer getCurrentRows() { + return currentValue.isEmpty() ? null : currentValue; + } + + @Override + public Object[] getCurrentKey() { + return currentKey; + } + } + + /** Row container that gets and deserializes the rows on demand from bytes provided. */ + private class ReusableRowContainer + implements MapJoinRowContainer, AbstractRowContainer.RowIterator> { + private byte aliasFilter; + private List refs; + private int currentRow; + /** + * Sometimes, when container is empty in multi-table mapjoin, we need to add a dummy row. + * This container does not normally support adding rows; this is for the dummy row. + */ + private List dummyRow = null; + + private final ByteArrayRef uselessIndirection; // LBStruct needs ByteArrayRef + private final LazyBinaryStruct valueStruct; + + private int partitionId; // Current hashMap in use + + public ReusableRowContainer() { + if (internalValueOi != null) { + valueStruct = (LazyBinaryStruct) + LazyBinaryFactory.createLazyBinaryObject(internalValueOi); + } else { + valueStruct = null; // No rows? + } + uselessIndirection = new ByteArrayRef(); + clearRows(); + } + + public JoinUtil.JoinResult setFromOutput(Object row, Output output) throws HiveException { + if (refs == null) { + refs = new ArrayList(0); + } + + int keyHash = WriteBuffers.murmurHash(output.getData(), 0, output.getLength()); + partitionId = keyHash & (hashPartitions.length - 1); + + // If the target hash table is on disk, spill this row to disk as well to be processed later + if (isOnDisk(partitionId)) { + RowContainer rc = hashPartitions[partitionId].getMatchfileRowContainer(hconf); + + // Another approach is to use Kryo's clone function + Kryo kryo = Utilities.runtimeSerializationKryo.get(); + Object rowCopy = kryo.copy(row); + + List listHelper = new ArrayList(); + listHelper.add(rowCopy); + rc.addRow(listHelper); + + refs.clear(); + return JoinUtil.JoinResult.SPILL; + } + else { + byte aliasFilter = hashPartitions[partitionId].hashMap.getValueRefs(output.getData(), output.getLength(), refs); + this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter; + this.dummyRow = null; + if (refs.isEmpty()) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } + } + // TODO return a 3rd state "DEFER" for vectorization + } + + public boolean isEmpty() { + return refs.isEmpty() && (dummyRow == null); + } + + // Implementation of row container + @Override + public RowIterator> rowIter() throws HiveException { + currentRow = -1; + return this; + } + + @Override + public int rowCount() throws HiveException { + return dummyRow != null ? 1 : refs.size(); + } + + @Override + public void clearRows() { + // Doesn't clear underlying hashtable + if (refs != null) { + refs.clear(); + } + dummyRow = null; + currentRow = -1; + aliasFilter = (byte) 0xff; + } + + @Override + public byte getAliasFilter() throws HiveException { + return aliasFilter; + } + + @Override + public MapJoinRowContainer copy() throws HiveException { + return this; // Independent of hashtable and can be modified, no need to copy. + } + + // Implementation of row iterator + @Override + public List first() throws HiveException { + currentRow = 0; + return next(); + } + + + @Override + public List next() throws HiveException { + if (dummyRow != null) { + List result = dummyRow; + dummyRow = null; + return result; + } + if (currentRow < 0 || refs.size() < currentRow) throw new HiveException("No rows"); + if (refs.size() == currentRow) return null; + WriteBuffers.ByteSegmentRef ref = refs.get(currentRow++); + if (ref.getLength() == 0) { + return EMPTY_LIST; // shortcut, 0 length means no fields + } + if (ref.getBytes() == null) { + // partitionId is derived from previously calculated value in setFromOutput() + hashPartitions[partitionId].hashMap.populateValue(ref); + } + uselessIndirection.setData(ref.getBytes()); + valueStruct.init(uselessIndirection, (int)ref.getOffset(), ref.getLength()); + return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that? + } + + @Override + public void addRow(List t) { + if (dummyRow != null || !refs.isEmpty()) { + throw new RuntimeException("Cannot add rows when not empty"); + } + dummyRow = t; + } + + // Various unsupported methods. + @Override + public void addRow(Object[] value) { + throw new RuntimeException(this.getClass().getCanonicalName() + " cannot add arrays"); + } + @Override + public void write(MapJoinObjectSerDeContext valueContext, ObjectOutputStream out) { + throw new RuntimeException(this.getClass().getCanonicalName() + " cannot be serialized"); + } + } + + @Override + public void dumpMetrics() { + for (int i = 0; i < hashPartitions.length; i++) { + HashPartition hp = hashPartitions[i]; + hp.hashMap.debugDumpMetrics(); + } + } + + public void dumpStats() { + int numPartitionsInMem = 0; + int numPartitionsOnDisk = 0; + + for (HashPartition hp : hashPartitions) { + if (hp.getHashMapOnDisk()) { + numPartitionsOnDisk++; + } else { + numPartitionsInMem++; + } + } + + LOG.info("Hybrid Map Join summary: " + numPartitionsInMem + + " partitions in memory have been processed; " + numPartitionsOnDisk + + " partitions have been spilled to disk and will be processed next."); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index 28f6c63..b798ea0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -91,7 +92,11 @@ private MapJoinBytesTableContainer(float keyCountAdj, int threshold, float loadF throws SerDeException { int newThreshold = HashMapWrapper.calculateTableSize( keyCountAdj, threshold, loadFactor, keyCount); - hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage, threshold); + hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage); + } + + public MapJoinBytesTableContainer(BytesBytesMultiHashMap hashMap) { + this.hashMap = hashMap; } private LazyBinaryStructObjectInspector createInternalOi( @@ -114,8 +119,18 @@ private LazyBinaryStructObjectInspector createInternalOi( .getLazyBinaryStructObjectInspector(colNames, colOis); } - private static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource { + public void setInternalValueOi(LazyBinaryStructObjectInspector internalValueOi) { + this.internalValueOi = internalValueOi; + } + + public void setSortableSortOrders(boolean[] sortableSortOrders) { + this.sortableSortOrders = sortableSortOrders; + } + + public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource { void setKeyValue(Writable key, Writable val) throws SerDeException; + /** Get hash value from the key. */ + int getHashFromKey() throws SerDeException; } private static class KeyValueWriter implements KeyValueHelper { @@ -176,9 +191,14 @@ public byte updateStateByte(Byte previousValue) { aliasFilter &= ((ShortWritable)valObjs[valObjs.length - 1]).get(); return aliasFilter; } + + @Override + public int getHashFromKey() throws SerDeException { + throw new UnsupportedOperationException("Not supported for MapJoinBytesTableContainer"); + } } - private static class LazyBinaryKvWriter implements KeyValueHelper { + static class LazyBinaryKvWriter implements KeyValueHelper { private final LazyBinaryStruct.SingleFieldGetter filterGetter; private Writable key, value; private final SerDe keySerDe; @@ -210,6 +230,16 @@ public void writeKey(RandomAccessOutput dest) throws SerDeException { dest.write(b.getBytes(), 0, b.getLength() - (hasTag ? 1 : 0)); } + @Override + public int getHashFromKey() throws SerDeException { + if (!(key instanceof BinaryComparable)) { + throw new SerDeException("Unexpected type " + key.getClass().getCanonicalName()); + } + sanityCheckKeyForTag(); + BinaryComparable b = (BinaryComparable)key; + return WriteBuffers.murmurHash(b.getBytes(), 0, b.getLength() - (hasTag ? 1 : 0)); + } + /** * If we received data with tags from ReduceSinkOperators, no keys will match. This should * not happen, but is important enough that we want to find out and work around it if some @@ -285,7 +315,7 @@ public MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentK } } writeHelper.setKeyValue(currentKey, currentValue); - hashMap.put(writeHelper); + hashMap.put(writeHelper, null); return null; // there's no key to return } @@ -329,9 +359,9 @@ public GetAdaptor() { } @Override - public void setFromVector(VectorHashKeyWrapper kw, - VectorExpressionWriter[] keyOutputWriters, - VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException { + public JoinUtil.JoinResult setFromVector(Object row, VectorHashKeyWrapper kw, + VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) + throws HiveException { if (nulls == null) { nulls = new boolean[keyOutputWriters.length]; currentKey = new Object[keyOutputWriters.length]; @@ -346,12 +376,12 @@ public void setFromVector(VectorHashKeyWrapper kw, currentKey[i] = keyWrapperBatch.getWritableKeyValue(kw, i, keyOutputWriters[i]); nulls[i] = currentKey[i] == null; } - currentValue.setFromOutput( + return currentValue.setFromOutput( MapJoinKey.serializeRow(output, currentKey, vectorKeyOIs, sortableSortOrders)); } @Override - public void setFromRow(Object row, List fields, + public JoinUtil.JoinResult setFromRow(Object row, List fields, List ois) throws HiveException { if (nulls == null) { nulls = new boolean[fields.size()]; @@ -361,17 +391,17 @@ public void setFromRow(Object row, List fields, currentKey[keyIndex] = fields.get(keyIndex).evaluate(row); nulls[keyIndex] = currentKey[keyIndex] == null; } - currentValue.setFromOutput( + return currentValue.setFromOutput( MapJoinKey.serializeRow(output, currentKey, ois, sortableSortOrders)); } @Override - public void setFromOther(ReusableGetAdaptor other) { + public JoinUtil.JoinResult setFromOther(Object row, ReusableGetAdaptor other) { assert other instanceof GetAdaptor; GetAdaptor other2 = (GetAdaptor)other; nulls = other2.nulls; currentKey = other2.currentKey; - currentValue.setFromOutput(other2.output); + return currentValue.setFromOutput(other2.output); } @Override @@ -422,13 +452,19 @@ public ReusableRowContainer() { clearRows(); } - public void setFromOutput(Output output) { + public JoinUtil.JoinResult setFromOutput(Output output) { if (refs == null) { refs = new ArrayList(0); } byte aliasFilter = hashMap.getValueRefs(output.getData(), output.getLength(), refs); this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter; this.dummyRow = null; + if (refs.isEmpty()) { + return JoinUtil.JoinResult.NOMATCH; + } + else { + return JoinUtil.JoinResult.MATCH; + } } public boolean isEmpty() { @@ -530,4 +566,9 @@ public static boolean isSupportedKey(ObjectInspector keyOi) { public void dumpMetrics() { hashMap.debugDumpMetrics(); } + + @Override + public boolean hasSpill() { + return false; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java index ff6e5d4..d358956 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hive.ql.exec.persistence; +import java.io.IOException; import java.util.List; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -40,21 +42,21 @@ * Changes current rows to which adaptor is referring to the rows corresponding to * the key represented by a VHKW object, and writers and batch used to interpret it. */ - void setFromVector(VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, + JoinUtil.JoinResult setFromVector(Object row, VectorHashKeyWrapper kw, VectorExpressionWriter[] keyOutputWriters, VectorHashKeyWrapperBatch keyWrapperBatch) throws HiveException; /** * Changes current rows to which adaptor is referring to the rows corresponding to * the key represented by a row object, and fields and ois used to interpret it. */ - void setFromRow(Object row, List fields, List ois) + JoinUtil.JoinResult setFromRow(Object row, List fields, List ois) throws HiveException; /** * Changes current rows to which adaptor is referring to the rows corresponding to * the key that another adaptor has already deserialized via setFromVector/setFromRow. */ - void setFromOther(ReusableGetAdaptor other); + JoinUtil.JoinResult setFromOther(Object row, ReusableGetAdaptor other) throws HiveException; /** * Checks whether the current key has any nulls. @@ -77,7 +79,7 @@ void setFromRow(Object row, List fields, List 2) { + useHybridGraceHashJoin = false; + } + for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; } + boolean hasSpill = false; String inputName = parentToInput.get(pos); LogicalInput input = tezContext.getInput(inputName); @@ -104,15 +106,19 @@ public void load( isFirstKey = false; Long keyCountObj = parentKeyCounts.get(pos); long keyCount = (keyCountObj == null) ? -1 : keyCountObj.longValue(); + if (desc.getParentDataSizes().size() == 0) { + useHybridGraceHashJoin = false; + } MapJoinTableContainer tableContainer = useOptimizedTables - ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage) + ? (useHybridGraceHashJoin ? new HybridHashTableContainer(hconf, valCtx, keyCount, memUsage, + desc.getParentDataSizes().get(pos)) + : new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage)) : new HashMapWrapper(hconf, keyCount); while (kvReader.next()) { tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(), valCtx, (Writable)kvReader.getCurrentValue()); } - tableContainer.seal(); mapJoinTables[pos] = tableContainer; } catch (IOException e) { @@ -126,7 +132,8 @@ public void load( LOG.info("Is this a bucket map join: " + desc.isBucketMapJoin()); // cache is disabled for bucket map join because of the same reason // given in loadHashTable in MapJoinOperator. - if (!desc.isBucketMapJoin()) { + // Also, only cache the input if there's no spilled partition in Hybrid Grace Hash Join case + if (!desc.isBucketMapJoin() && !mapJoinTables[pos].hasSpill()) { tezCacheAccess.registerCachedInput(inputName); LOG.info("Setting Input: " + inputName + " as cached"); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index 2c8aee1..4c4c272 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; @@ -211,12 +212,13 @@ public void closeOp(boolean aborted) throws HiveException { if (!aborted && 0 < outputBatch.size) { flushOutput(); } + super.closeOp(aborted); } @Override - protected void setMapJoinKey(ReusableGetAdaptor dest, Object row, byte alias) + protected JoinUtil.JoinResult setMapJoinKey(ReusableGetAdaptor dest, Object row, byte alias) throws HiveException { - dest.setFromVector(keyValues[batchIndex], keyOutputWriters, keyWrapperBatch); + return dest.setFromVector(row, keyValues[batchIndex], keyOutputWriters, keyWrapperBatch); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index b184cf4..d3e270d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -122,12 +122,17 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, } MapJoinDesc joinConf = mapJoinOp.getConf(); long keyCount = Long.MAX_VALUE, rowCount = Long.MAX_VALUE, bucketCount = 1; + long tableSize = Long.MAX_VALUE; Statistics stats = parentRS.getStatistics(); if (stats != null) { keyCount = rowCount = stats.getNumRows(); if (keyCount <= 0) { keyCount = rowCount = Long.MAX_VALUE; } + tableSize = stats.getDataSize(); + if (tableSize <= 0) { + tableSize = Long.MAX_VALUE; + } ArrayList keyCols = parentRS.getConf().getOutputKeyColumnNames(); if (keyCols != null && !keyCols.isEmpty()) { // See if we can arrive at a smaller number using distinct stats from key columns. @@ -155,6 +160,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, // We cannot obtain a better estimate without CustomPartitionVertex providing it // to us somehow; in which case using statistics would be completely unnecessary. keyCount /= bucketCount; + tableSize /= bucketCount; } } } @@ -164,6 +170,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, if (keyCount != Long.MAX_VALUE) { joinConf.getParentKeyCounts().put(pos, keyCount); } + if (tableSize != Long.MAX_VALUE) { + joinConf.getParentDataSizes().put(pos, tableSize); + } int numBuckets = -1; EdgeType edgeType = EdgeType.BROADCAST_EDGE; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index aca4273..e987f6a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -1296,6 +1296,9 @@ private void fixupParentChildOperators(Operator op, switch (op.getType()) { case MAPJOIN: + // Disable Hybrid Grace Hash Join when vectorization is in effect, for now + HiveConf.setBoolVar(physicalContext.getConf(), + HiveConf.ConfVars.HIVEMAPJOINUSEHYBRIDGRACEHASHJOIN, false); case GROUPBY: case FILTER: case SELECT: diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 9fdd417..ceb95a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -52,6 +52,7 @@ // TODO: should these rather be arrays? private Map parentToInput = new HashMap(); private Map parentKeyCounts = new HashMap(); + private Map parentDataSizes = new HashMap(); // for tez. used to remember which type of a Bucket Map Join this is. private boolean customBucketMapJoin; @@ -90,6 +91,7 @@ public MapJoinDesc(MapJoinDesc clone) { this.dumpFilePrefix = clone.dumpFilePrefix; this.parentToInput = clone.parentToInput; this.parentKeyCounts = clone.parentKeyCounts; + this.parentDataSizes = clone.parentDataSizes; this.customBucketMapJoin = clone.customBucketMapJoin; } @@ -136,6 +138,14 @@ public void setParentToInput(Map parentToInput) { return parentKeyCounts; } + public Map getParentDataSizes() { + return parentDataSizes; + } + + public void setParentDataSizes(Map parentDataSizes) { + this.parentDataSizes = parentDataSizes; + } + @Explain(displayName = "Estimated key counts", normalExplain = false) public String getKeyCountsExplainDesc() { StringBuilder result = null; diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java index b3582b2..b2668ba 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestBytesBytesMultiHashMap.java @@ -49,10 +49,10 @@ public void testCapacityValidation() { public void testPutGetOne() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, LOAD_FACTOR, WB_SIZE); RandomKvSource kv = new RandomKvSource(0, 0); - map.put(kv); + map.put(kv, null); verifyResults(map, kv.getLastKey(), kv.getLastValue()); kv = new RandomKvSource(10, 100); - map.put(kv); + map.put(kv, null); verifyResults(map, kv.getLastKey(), kv.getLastValue()); } @@ -60,12 +60,12 @@ public void testPutGetOne() throws Exception { public void testPutGetMultiple() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, LOAD_FACTOR, WB_SIZE); RandomKvSource kv = new RandomKvSource(0, 100); - map.put(kv); + map.put(kv, null); verifyResults(map, kv.getLastKey(), kv.getLastValue()); FixedKeyKvSource kv2 = new FixedKeyKvSource(kv.getLastKey(), 0, 100); kv2.values.add(kv.getLastValue()); for (int i = 0; i < 3; ++i) { - map.put(kv2); + map.put(kv2, null); verifyResults(map, kv2.key, kv2.values.toArray(new byte[kv2.values.size()][])); } } @@ -74,11 +74,11 @@ public void testPutGetMultiple() throws Exception { public void testGetNonExistent() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, LOAD_FACTOR, WB_SIZE); RandomKvSource kv = new RandomKvSource(1, 100); - map.put(kv); + map.put(kv, null); byte[] key = kv.getLastKey(); key[0] = (byte)(key[0] + 1); FixedKeyKvSource kv2 = new FixedKeyKvSource(kv.getLastKey(), 0, 100); - map.put(kv2); + map.put(kv2, null); key[0] = (byte)(key[0] + 1); List results = new ArrayList(0); map.getValueRefs(key, key.length, results); @@ -93,7 +93,7 @@ public void testPutWithFullMap() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(CAPACITY, 1f, WB_SIZE); UniqueKeysKvSource kv = new UniqueKeysKvSource(); for (int i = 0; i < CAPACITY; ++i) { - map.put(kv); + map.put(kv, null); } for (int i = 0; i < kv.keys.size(); ++i) { verifyResults(map, kv.keys.get(i), kv.values.get(i)); @@ -111,7 +111,7 @@ public void testExpand() throws Exception { BytesBytesMultiHashMap map = new BytesBytesMultiHashMap(1, 0.0000001f, WB_SIZE); UniqueKeysKvSource kv = new UniqueKeysKvSource(); for (int i = 0; i < 18; ++i) { - map.put(kv); + map.put(kv, null); for (int j = 0; j <= i; ++j) { verifyResults(map, kv.keys.get(j), kv.values.get(j)); } diff --git ql/src/test/queries/clientpositive/hybridhashjoin.q ql/src/test/queries/clientpositive/hybridhashjoin.q new file mode 100644 index 0000000..f1adbf7 --- /dev/null +++ ql/src/test/queries/clientpositive/hybridhashjoin.q @@ -0,0 +1,51 @@ +set hive.auto.convert.join=true; +set hive.mapjoin.hybridgrace.hashtable=true; +set hive.auto.convert.join.noconditionaltask.size=10000000; +set hive.mapjoin.optimized.hashtable.wbsize=10485760; + +DROP TABLE IF EXISTS dimension; +CREATE TABLE dimension (d1 int, d2 string); + +DROP TABLE IF EXISTS fact; +CREATE TABLE fact (f1 int, f2 int); + +-- Insert 20 unique rows into small table +INSERT INTO TABLE dimension VALUES (NULL, NULL), + (1,"a"), (2,"b"), (3,"c"), (4,"d"), (5,"e"), + (6,"f"), (7,"g"), (8,"h"), (9,"i"), (10,"j"), + (11,"k"), (12,"l"), (13,"m"), (14,"n"), (15,"o"), + (16,"p"), (17,"q"), (18,"r"), (19,"s"), (20,"t"); + +-- Insert 100 unique rows into big table +INSERT INTO TABLE fact VALUES (NULL, NULL), + (1,2), (2,2), (3,2), (4,2), (5,2), (6,2), (7,2), (8,2), (9,2), (10,2), + (11,2), (12,2), (13,2), (14,2), (15,2), (16,2), (17,2), (18,2), (19,2), (20,2), + (21,2), (22,2), (23,2), (24,2), (25,2), (26,2), (27,2), (28,2), (29,2), (30,2), + (31,2), (32,2), (33,2), (34,2), (35,2), (36,2), (37,2), (38,2), (39,2), (40,2), + (41,2), (42,2), (43,2), (44,2), (45,2), (46,2), (47,2), (48,2), (49,2), (50,2), + (51,2), (52,2), (53,2), (54,2), (55,2), (56,2), (57,2), (58,2), (59,2), (60,2), + (61,2), (62,2), (63,2), (64,2), (65,2), (66,2), (67,2), (68,2), (69,2), (70,2), + (71,2), (72,2), (73,2), (74,2), (75,2), (76,2), (77,2), (78,2), (79,2), (80,2), + (81,2), (82,2), (83,2), (84,2), (85,2), (86,2), (87,2), (88,2), (89,2), (90,2), + (91,2), (92,2), (93,2), (94,2), (95,2), (96,2), (97,2), (98,2), (99,2), (100,2); + +EXPLAIN SELECT * FROM dimension d, fact f WHERE d.d1 = f.f1; +SELECT * FROM dimension d, fact f WHERE d.d1 = f.f1; + +DROP TABLE dimension; +DROP TABLE fact; + +explain +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +; + +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +; diff --git ql/src/test/results/clientpositive/tez/hybridhashjoin.q.out ql/src/test/results/clientpositive/tez/hybridhashjoin.q.out new file mode 100644 index 0000000..9dbab78 --- /dev/null +++ ql/src/test/results/clientpositive/tez/hybridhashjoin.q.out @@ -0,0 +1,295 @@ +PREHOOK: query: DROP TABLE IF EXISTS dimension +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS dimension +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE dimension (d1 int, d2 string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dimension +POSTHOOK: query: CREATE TABLE dimension (d1 int, d2 string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dimension +PREHOOK: query: DROP TABLE IF EXISTS fact +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS fact +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE fact (f1 int, f2 int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@fact +POSTHOOK: query: CREATE TABLE fact (f1 int, f2 int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@fact +PREHOOK: query: -- Insert 20 unique rows into small table +INSERT INTO TABLE dimension VALUES (NULL, NULL), + (1,"a"), (2,"b"), (3,"c"), (4,"d"), (5,"e"), + (6,"f"), (7,"g"), (8,"h"), (9,"i"), (10,"j"), + (11,"k"), (12,"l"), (13,"m"), (14,"n"), (15,"o"), + (16,"p"), (17,"q"), (18,"r"), (19,"s"), (20,"t") +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@dimension +POSTHOOK: query: -- Insert 20 unique rows into small table +INSERT INTO TABLE dimension VALUES (NULL, NULL), + (1,"a"), (2,"b"), (3,"c"), (4,"d"), (5,"e"), + (6,"f"), (7,"g"), (8,"h"), (9,"i"), (10,"j"), + (11,"k"), (12,"l"), (13,"m"), (14,"n"), (15,"o"), + (16,"p"), (17,"q"), (18,"r"), (19,"s"), (20,"t") +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@dimension +POSTHOOK: Lineage: dimension.d1 EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: dimension.d2 SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: -- Insert 100 unique rows into big table +INSERT INTO TABLE fact VALUES (NULL, NULL), + (1,2), (2,2), (3,2), (4,2), (5,2), (6,2), (7,2), (8,2), (9,2), (10,2), + (11,2), (12,2), (13,2), (14,2), (15,2), (16,2), (17,2), (18,2), (19,2), (20,2), + (21,2), (22,2), (23,2), (24,2), (25,2), (26,2), (27,2), (28,2), (29,2), (30,2), + (31,2), (32,2), (33,2), (34,2), (35,2), (36,2), (37,2), (38,2), (39,2), (40,2), + (41,2), (42,2), (43,2), (44,2), (45,2), (46,2), (47,2), (48,2), (49,2), (50,2), + (51,2), (52,2), (53,2), (54,2), (55,2), (56,2), (57,2), (58,2), (59,2), (60,2), + (61,2), (62,2), (63,2), (64,2), (65,2), (66,2), (67,2), (68,2), (69,2), (70,2), + (71,2), (72,2), (73,2), (74,2), (75,2), (76,2), (77,2), (78,2), (79,2), (80,2), + (81,2), (82,2), (83,2), (84,2), (85,2), (86,2), (87,2), (88,2), (89,2), (90,2), + (91,2), (92,2), (93,2), (94,2), (95,2), (96,2), (97,2), (98,2), (99,2), (100,2) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__2 +PREHOOK: Output: default@fact +POSTHOOK: query: -- Insert 100 unique rows into big table +INSERT INTO TABLE fact VALUES (NULL, NULL), + (1,2), (2,2), (3,2), (4,2), (5,2), (6,2), (7,2), (8,2), (9,2), (10,2), + (11,2), (12,2), (13,2), (14,2), (15,2), (16,2), (17,2), (18,2), (19,2), (20,2), + (21,2), (22,2), (23,2), (24,2), (25,2), (26,2), (27,2), (28,2), (29,2), (30,2), + (31,2), (32,2), (33,2), (34,2), (35,2), (36,2), (37,2), (38,2), (39,2), (40,2), + (41,2), (42,2), (43,2), (44,2), (45,2), (46,2), (47,2), (48,2), (49,2), (50,2), + (51,2), (52,2), (53,2), (54,2), (55,2), (56,2), (57,2), (58,2), (59,2), (60,2), + (61,2), (62,2), (63,2), (64,2), (65,2), (66,2), (67,2), (68,2), (69,2), (70,2), + (71,2), (72,2), (73,2), (74,2), (75,2), (76,2), (77,2), (78,2), (79,2), (80,2), + (81,2), (82,2), (83,2), (84,2), (85,2), (86,2), (87,2), (88,2), (89,2), (90,2), + (91,2), (92,2), (93,2), (94,2), (95,2), (96,2), (97,2), (98,2), (99,2), (100,2) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__2 +POSTHOOK: Output: default@fact +POSTHOOK: Lineage: fact.f1 EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: fact.f2 EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: EXPLAIN SELECT * FROM dimension d, fact f WHERE d.d1 = f.f1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT * FROM dimension d, fact f WHERE d.d1 = f.f1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 2 <- Map 1 (BROADCAST_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: d + Statistics: Num rows: 21 Data size: 76 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: d1 is not null (type: boolean) + Statistics: Num rows: 11 Data size: 39 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: d1 (type: int) + sort order: + + Map-reduce partition columns: d1 (type: int) + Statistics: Num rows: 11 Data size: 39 Basic stats: COMPLETE Column stats: NONE + value expressions: d2 (type: string) + Map 2 + Map Operator Tree: + TableScan + alias: f + Statistics: Num rows: 101 Data size: 397 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: f1 is not null (type: boolean) + Statistics: Num rows: 51 Data size: 200 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 d1 (type: int) + 1 f1 (type: int) + outputColumnNames: _col0, _col1, _col5, _col6 + input vertices: + 0 Map 1 + Statistics: Num rows: 56 Data size: 220 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col0 = _col5) (type: boolean) + Statistics: Num rows: 28 Data size: 110 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: int), _col1 (type: string), _col5 (type: int), _col6 (type: int) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 28 Data size: 110 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 28 Data size: 110 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT * FROM dimension d, fact f WHERE d.d1 = f.f1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dimension +PREHOOK: Input: default@fact +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM dimension d, fact f WHERE d.d1 = f.f1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dimension +POSTHOOK: Input: default@fact +#### A masked pattern was here #### +1 a 1 2 +2 b 2 2 +3 c 3 2 +4 d 4 2 +5 e 5 2 +6 f 6 2 +7 g 7 2 +8 h 8 2 +9 i 9 2 +10 j 10 2 +11 k 11 2 +12 l 12 2 +13 m 13 2 +14 n 14 2 +15 o 15 2 +16 p 16 2 +17 q 17 2 +18 r 18 2 +19 s 19 2 +20 t 20 2 +PREHOOK: query: DROP TABLE dimension +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@dimension +PREHOOK: Output: default@dimension +POSTHOOK: query: DROP TABLE dimension +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@dimension +POSTHOOK: Output: default@dimension +PREHOOK: query: DROP TABLE fact +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@fact +PREHOOK: Output: default@fact +POSTHOOK: query: DROP TABLE fact +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@fact +POSTHOOK: Output: default@fact +PREHOOK: query: explain +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Left Outer Join0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + input vertices: + 1 Map 3 + Statistics: Num rows: 13516 Data size: 2906160 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: cint (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 12288 Data size: 2641964 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from +(select c.ctinyint + from alltypesorc c + left outer join alltypesorc cd + on cd.cint = c.cint) t1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +3155128 diff --git serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java index bed4d0a..a00104a 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java +++ serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java @@ -18,20 +18,21 @@ package org.apache.hadoop.hive.serde2; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.util.hash.MurmurHash; /** * The structure storing arbitrary amount of data as a set of fixed-size byte buffers. * Maintains read and write pointers for convenient single-threaded writing/reading. */ -public final class WriteBuffers implements RandomAccessOutput { +public final class WriteBuffers implements RandomAccessOutput, Serializable { + private static final long serialVersionUID = 1L; private final ArrayList writeBuffers = new ArrayList(1); /** Buffer size in writeBuffers */ private final int wbSize; @@ -399,12 +400,14 @@ private boolean isAllInOneWriteBuffer(int length) { return currentWriteOffset + length <= wbSize; } + /** Truncates the buffers at currentWriteOffset. */ public void seal() { if (currentWriteOffset < (wbSize * 0.8)) { // arbitrary byte[] smallerBuffer = new byte[currentWriteOffset]; System.arraycopy(currentWriteBuffer, 0, smallerBuffer, 0, currentWriteOffset); writeBuffers.set(currentWriteBufferIndex, smallerBuffer); } + // Remove the buffers after current one. if (currentWriteBufferIndex + 1 < writeBuffers.size()) { writeBuffers.subList(currentWriteBufferIndex + 1, writeBuffers.size()).clear(); } @@ -479,7 +482,7 @@ public void writeInt(long offset, int v) { } // Lifted from org.apache.hadoop.util.hash.MurmurHash... but supports offset. - private static int murmurHash(byte[] data, int offset, int length) { + public static int murmurHash(byte[] data, int offset, int length) { int m = 0x5bd1e995; int r = 24; @@ -528,4 +531,8 @@ private static int murmurHash(byte[] data, int offset, int length) { return h; } + + public long size() { + return writeBuffers.size() * (long) wbSize; + } } \ No newline at end of file diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java index 789e5a6..c373047 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java @@ -126,7 +126,8 @@ public void initialize(Configuration job, Properties tbl) // Create the ObjectInspectors for the fields cachedObjectInspector = LazyFactory.createLazyStructInspector(serdeParams - .getColumnNames(), serdeParams.getColumnTypes(), serdeParams); + .getColumnNames(), serdeParams.getColumnTypes(), + new LazyObjectInspectorParametersImpl(serdeParams)); cachedLazyStruct = (LazyStruct) LazyFactory .createLazyObject(cachedObjectInspector);