commit 079bfa563cf085c07d374f96b1c75992e8963ce4 Author: Misha Dmitriev Date: Mon Dec 18 13:12:57 2017 -0800 HIVE-17684: Map Join memory exhaustion check broken in Hive on Spark. diff --git a/pom.xml b/pom.xml index dfb29cee4b3af9e314b07816bc9403367b494f18..6e8c89aa33c9e0db7b1bffb0a69f89a484328f2d 100644 --- a/pom.xml +++ b/pom.xml @@ -144,7 +144,7 @@ 14.0.1 2.4.11 1.3.166 - 3.0.0-beta1 + 3.0.0 ${basedir}/${hive.path.to.root}/testutils/hadoop 1.3 2.0.0-alpha4 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index 9690acbe00e4046f34b5ac589b756c38512d6043..07fa27bf505447620759653ee8725fad417ce532 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject; @@ -103,7 +102,6 @@ private long rowNumber = 0; protected transient LogHelper console; private long hashTableScale; - private MapJoinMemoryExhaustionHandler memoryExhaustionHandler; /** Kryo ctor. */ protected HashTableSinkOperator() { @@ -126,7 +124,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT); console = new LogHelper(LOG, isSilent); - memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage()); emptyRowContainer.addRow(emptyObjectArray); // for small tables only; so get the big table position first @@ -230,6 +227,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { */ @Override public void process(Object row, int tag) throws HiveException { + checkGcOverhead(); + byte alias = (byte)tag; // compute keys and values as StandardObjects. Use non-optimized key (MR). Object[] currentKey = new Object[joinKeys[alias].size()]; @@ -255,9 +254,6 @@ public void process(Object row, int tag) throws HiveException { rowContainer = emptyRowContainer; } rowNumber++; - if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) { - memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber); - } tableContainer.put(key, rowContainer); } else if (rowContainer == emptyRowContainer) { rowContainer = rowContainer.copy(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 885c20379f6dc050a7c396f61fe25168b6a1cda3..37ee6fc90d50aeced2508667877a8563a169f4b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -34,10 +34,13 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.lib.Node; @@ -58,6 +61,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.GcTimeMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +80,29 @@ public static final String HIVE_COUNTER_FATAL = "FATAL_ERROR"; public static final String CONTEXT_NAME_KEY = "__hive.context.name"; + // The GC time alert functionality below is used by the checkGcOverhead() method. + // This method may be called very frequently, and if it called + // GcTimeMonitor.getLatestGcData() every time, it could result in unnecessary + // overhead due to synchronization and new object creation. So instead, + // GcTimeMonitor itself sets the "red flag" in lastAlertGcTimePercentage if, + // needed, and checkGcOverhead() may check it as frequently as needed. + public static final int CRITICAl_GC_TIME_PERCENTAGE = 50; + public static final GcTimeMonitor GC_TIME_MONITOR = + new GcTimeMonitor(45 * 1000, 500, CRITICAl_GC_TIME_PERCENTAGE, + new GcTimeMonitor.GcTimeAlertHandler() { + final Log LOG = LogFactory.getLog(getClass().getName()); + @Override + public void alert(GcTimeMonitor.GcData gcData) { + lastAlertGcTimePercentage = gcData.getGcTimePercentage(); + LOG.warn("GcTimeMonitor alert called. Current GC time % = " + + lastAlertGcTimePercentage); + } + }); + public static volatile int lastAlertGcTimePercentage; + static { + GC_TIME_MONITOR.start(); + } + private transient Configuration configuration; protected transient CompilationOpContext cContext; protected List> childOperators; @@ -1555,4 +1582,12 @@ public boolean logicalEquals(Operator other) { (conf == other.getConf() || (conf != null && other.getConf() != null && conf.isSame(other.getConf()))); } + + public static void checkGcOverhead() throws MapJoinMemoryExhaustionError { + if (lastAlertGcTimePercentage > CRITICAl_GC_TIME_PERCENTAGE) { + String msg = "GC time percentage = " + lastAlertGcTimePercentage + + ", exceeded threshold."; + throw new MapJoinMemoryExhaustionError(msg); + } + } }