diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2b8280e..058e9af 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -710,8 +710,9 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", true, "Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,\n" + "because memory-optimized hashtable cannot be serialized."), - HIVEUSEHYBRIDGRACEHASHJOIN("hive.mapjoin.hybridgrace.hashtable", false, "Whether to use hybrid" + - "grace hash join as the join method for mapjoin."), + HIVEUSEHYBRIDGRACEHASHJOIN( + "hive.mapjoin.hybridgrace.hashtable", true, "Whether to use hybrid" + + "grace hash join as the join method for mapjoin. Tez only."), HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ("hive.mapjoin.hybridgrace.memcheckfrequency", 1024, "For " + "hybrid grace hash join, how often (how many rows apart) we check if memory is full. " + "This number should be power of 2."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 9c3ec8e..2289a35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -37,18 +37,20 @@ import org.apache.hadoop.hive.ql.HashTableLoaderFactory; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap; +import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition; +import org.apache.hadoop.hive.ql.exec.persistence.KeyValueContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; -import org.apache.hadoop.hive.ql.exec.persistence.UnwrapRowContainer; -import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; -import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; -import org.apache.hadoop.hive.ql.exec.persistence.KeyValueContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer; +import org.apache.hadoop.hive.ql.exec.persistence.UnwrapRowContainer; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -63,9 +65,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; -import static org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition; -import static org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper; - /** * Map side Join operator implementation. */ @@ -80,6 +79,7 @@ private transient ObjectCache cache; protected HashTableLoader loader; + private boolean loadCalled; protected transient MapJoinTableContainer[] mapJoinTables; private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes; @@ -88,7 +88,6 @@ private UnwrapRowContainer[] unwrapContainer; private transient Configuration hconf; - private transient boolean useHybridGraceHashJoin; // whether Hybrid Grace Hash Join is enabled private transient boolean hybridMapJoinLeftover; // whether there's spilled data to be processed private transient MapJoinBytesTableContainer currentSmallTable; // reloaded hashmap from disk private transient int tag; // big table alias @@ -141,15 +140,13 @@ public void startGroup() throws HiveException { mapJoinTables = new MapJoinTableContainer[tagLen]; mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen]; hashTblInitedOnce = false; - useHybridGraceHashJoin = - HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN); generateMapMetaData(); final ExecMapperContext mapContext = getExecContext(); final MapredContext mrContext = MapredContext.get(); - if (!conf.isBucketMapJoin() && !useHybridGraceHashJoin) { + if (!conf.isBucketMapJoin()) { /* * The issue with caching in case of bucket map join is that different tasks * process different buckets and if the container is reused to join a different bucket, @@ -187,8 +184,22 @@ protected final void completeInitializationOp(Object[] os) throws HiveException if (os.length != 0) { Pair pair = (Pair) os[0]; - mapJoinTables = pair.getLeft(); - mapJoinTableSerdes = pair.getRight(); + + boolean spilled = false; + for (MapJoinTableContainer container : pair.getLeft()) { + if (container != null) { + spilled = spilled || container.hasSpill(); + } + } + + if (!loadCalled && spilled) { + // we can't use the cached table because it has spilled. + loadHashTable(getExecContext(), MapredContext.get()); + } else { + // let's use the table from the cache. + mapJoinTables = pair.getLeft(); + mapJoinTableSerdes = pair.getRight(); + } hashTblInitedOnce = true; } @@ -258,6 +269,8 @@ public void generateMapMetaData() throws HiveException { private Pair loadHashTable( ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { + loadCalled = true; + if (this.hashTblInitedOnce && ((mapContext == null) || (mapContext.getLocalWork() == null) || (mapContext .getLocalWork().getInputFileChangeSensitive() == false))) { @@ -359,8 +372,8 @@ public void process(Object row, int tag) throws HiveException { if (!noOuterJoin) { // For Hybrid Grace Hash Join, during the 1st round processing, // we only keep the LEFT side if the row is not spilled - if (!useHybridGraceHashJoin || hybridMapJoinLeftover || - (!hybridMapJoinLeftover && joinResult != JoinUtil.JoinResult.SPILL)) { + if (!conf.isHybridHashJoin() || hybridMapJoinLeftover + || (!hybridMapJoinLeftover && joinResult != JoinUtil.JoinResult.SPILL)) { joinNeeded = true; storage[pos] = dummyObjVectors[pos]; } @@ -414,47 +427,57 @@ protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object @Override public void closeOp(boolean abort) throws HiveException { - for (MapJoinTableContainer tableContainer : mapJoinTables) { - if (tableContainer != null) { - tableContainer.dumpMetrics(); - - if (tableContainer instanceof HybridHashTableContainer) { - HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer; - hybridHtContainer.dumpStats(); - - HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions(); - // Clear all in memory partitions first - for (int i = 0; i < hashPartitions.length; i++) { - if (!hashPartitions[i].isHashMapOnDisk()) { - hybridHtContainer.setTotalInMemRowCount( - hybridHtContainer.getTotalInMemRowCount() - - hashPartitions[i].getHashMapFromMemory().getNumValues()); - hashPartitions[i].getHashMapFromMemory().clear(); + + boolean spilled = false; + for (MapJoinTableContainer container: mapJoinTables) { + if (container != null) { + spilled = spilled || container.hasSpill(); + container.dumpMetrics(); + } + } + + if (spilled) { + for (MapJoinTableContainer tableContainer : mapJoinTables) { + if (tableContainer != null) { + if (tableContainer instanceof HybridHashTableContainer) { + HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer; + hybridHtContainer.dumpStats(); + + HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions(); + // Clear all in memory partitions first + for (int i = 0; i < hashPartitions.length; i++) { + if (!hashPartitions[i].isHashMapOnDisk()) { + hybridHtContainer.setTotalInMemRowCount( + hybridHtContainer.getTotalInMemRowCount() - + hashPartitions[i].getHashMapFromMemory().getNumValues()); + hashPartitions[i].getHashMapFromMemory().clear(); + } } - } - assert hybridHtContainer.getTotalInMemRowCount() == 0; - - for (int i = 0; i < hashPartitions.length; i++) { - if (hashPartitions[i].isHashMapOnDisk()) { - // Recursively process on-disk triplets (hash partition, sidefile, matchfile) - try { - hybridMapJoinLeftover = true; - hashMapRowGetters[smallTable] = null; - continueProcess(hashPartitions[i], hybridHtContainer); - } catch (IOException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } catch (SerDeException e) { - e.printStackTrace(); + assert hybridHtContainer.getTotalInMemRowCount() == 0; + + for (int i = 0; i < hashPartitions.length; i++) { + if (hashPartitions[i].isHashMapOnDisk()) { + // Recursively process on-disk triplets (hash partition, sidefile, matchfile) + try { + hybridMapJoinLeftover = true; + hashMapRowGetters[smallTable] = null; + continueProcess(hashPartitions[i], hybridHtContainer); + } catch (IOException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } catch (SerDeException e) { + e.printStackTrace(); + } } + hybridMapJoinLeftover = false; + currentSmallTable = null; } - hybridMapJoinLeftover = false; - currentSmallTable = null; } } } } + if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) && (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) && mapJoinTables != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index c2abba2..23deff0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -19,7 +19,14 @@ package org.apache.hadoop.hive.ql.exec.persistence; -import com.esotericsoftware.kryo.Kryo; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,26 +40,20 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.WriteBuffers; -import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; -import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct; +import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.*; +import com.esotericsoftware.kryo.Kryo; /** * Hash table container that can have many partitions -- each partition has its own hashmap, @@ -67,16 +68,16 @@ public class HybridHashTableContainer implements MapJoinTableContainer { private static final Log LOG = LogFactory.getLog(HybridHashTableContainer.class); - private HashPartition[] hashPartitions; // an array of partitions holding the triplets + private final HashPartition[] hashPartitions; // an array of partitions holding the triplets private int totalInMemRowCount = 0; // total number of small table rows in memory - private long memoryThreshold; // the max memory limit allocated - private long tableRowSize; // row size of the small table + private final long memoryThreshold; // the max memory limit allocated + private final long tableRowSize; // row size of the small table private boolean isSpilled; // whether there's any spilled partition private int toSpillPartitionId; // the partition into which to spill the big table row; // This may change after every setMapJoinKey call private int numPartitionsSpilled; // number of spilled partitions private boolean lastPartitionInMem; // only one (last one) partition is left in memory - private int memoryCheckFrequency; // how often (# of rows apart) to check if memory is full + private final int memoryCheckFrequency; // how often (# of rows apart) to check if memory is full /** The OI used to deserialize values. We never deserialize keys. */ private LazyBinaryStructObjectInspector internalValueOi; @@ -182,6 +183,11 @@ private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFac long noConditionalTaskThreshold, int memCheckFreq, long tableSize, long keyCount, long memUsage) throws SerDeException { + if (wbSize > noConditionalTaskThreshold) { + LOG.warn("adjusting hash table write buffer size to be smaller than noconditionaltasksize"); + wbSize = (int) noConditionalTaskThreshold; + } + int newKeyCount = HashMapWrapper.calculateTableSize( keyCountAdj, threshold, loadFactor, keyCount); @@ -398,7 +404,8 @@ private void spillPartition(int partitionId) throws IOException { */ private int calcNumPartitions(long dataSize, int wbSize) { if (memoryThreshold < wbSize) { - throw new RuntimeException("Available memory is less than hashtable writebuffer size!" + + throw new IllegalStateException("Available memory is less than hashtable writebuffer size!" + + " Try increasing hive.auto.convert.join.noconditionaltask.size."); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 8423698..f18f841 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -244,7 +244,8 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont joinOp.getConf().getBaseSrc(), joinOp).getSecond(), null, joinDesc.getExprs(), null, null, joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(), - joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null); + joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null, HiveConf.getBoolVar( + context.conf, HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)); mapJoinDesc.setNullSafes(joinDesc.getNullSafes()); mapJoinDesc.setFilterMap(joinDesc.getFilterMap()); mapJoinDesc.resetOrder(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index 4d84f0f..e63d27a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -420,7 +420,8 @@ public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf, smbJoinDesc.getValueTblDescs(), smbJoinDesc.getValueTblDescs(), smbJoinDesc.getOutputColumnNames(), bigTablePos, smbJoinDesc.getConds(), - smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix()); + smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix(), + HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN)); mapJoinDesc.setStatistics(smbJoinDesc.getStatistics()); @@ -1152,7 +1153,7 @@ public static MapJoinDesc getMapJoinDesc(HiveConf hconf, MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op - .getConf().getNoOuterJoin(), dumpFilePrefix); + .getConf().getNoOuterJoin(), dumpFilePrefix, false); mapJoinDescriptor.setStatistics(op.getConf().getStatistics()); mapJoinDescriptor.setTagOrder(tagOrder); mapJoinDescriptor.setNullSafes(desc.getNullSafes()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index 15f0d70..e62dcd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -18,6 +18,13 @@ package org.apache.hadoop.hive.ql.optimizer.physical; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ColumnInfo; @@ -53,13 +60,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - /** * GenMRSkewJoinProcessor. * @@ -284,7 +284,8 @@ public static void processSkewJoin(JoinOperator joinOp, MapJoinDesc mapJoinDescriptor = new MapJoinDesc(newJoinKeys, keyTblDesc, newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc,joinDescriptor .getOutputColumnNames(), i, joinDescriptor.getConds(), - joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix); + joinDescriptor.getFilters(), + joinDescriptor.getNoOuterJoin(), dumpFilePrefix, false); mapJoinDescriptor.setTagOrder(tags); mapJoinDescriptor.setHandleSkewJoin(false); mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 52e6be8..8a01e31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -18,7 +18,11 @@ package org.apache.hadoop.hive.ql.optimizer.physical; -import com.google.common.base.Preconditions; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,11 +67,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import com.google.common.base.Preconditions; /** * Copied from GenMRSkewJoinProcessor. It's used for spark task @@ -242,7 +242,8 @@ public static void processSkewJoin(JoinOperator joinOp, Task(); } @@ -90,13 +92,16 @@ public MapJoinDesc(MapJoinDesc clone) { this.parentToInput = clone.parentToInput; this.parentKeyCounts = clone.parentKeyCounts; this.parentDataSizes = clone.parentDataSizes; + this.isBucketMapJoin = clone.isBucketMapJoin; + this.isHybridHashJoin = clone.isHybridHashJoin; } public MapJoinDesc(final Map> keys, final TableDesc keyTblDesc, final Map> values, final List valueTblDescs,final List valueFilteredTblDescs, List outputColumnNames, final int posBigTable, final JoinCondDesc[] conds, - final Map> filters, boolean noOuterJoin, String dumpFilePrefix) { + final Map> filters, final boolean noOuterJoin, + final String dumpFilePrefix, final boolean isHybridHashJoin) { super(values, outputColumnNames, noOuterJoin, conds, filters, null); this.keys = keys; this.keyTblDesc = keyTblDesc; @@ -105,6 +110,7 @@ public MapJoinDesc(final Map> keys, this.posBigTable = posBigTable; this.bigTableBucketNumMapping = new LinkedHashMap(); this.dumpFilePrefix = dumpFilePrefix; + this.isHybridHashJoin = isHybridHashJoin; initRetainExprList(); } @@ -322,6 +328,15 @@ public void setBucketMapJoin(boolean isBucketMapJoin) { this.isBucketMapJoin = isBucketMapJoin; } + @Explain(displayName = "HybridHashJoin", displayOnlyOnTrue = true) + public boolean isHybridHashJoin() { + return isHybridHashJoin; + } + + public void setHybridHashJoin() { + this.isHybridHashJoin = true; + } + public void setHashTableMemoryUsage(float hashtableMemoryUsage) { this.hashtableMemoryUsage = hashtableMemoryUsage; }