commit ef0818419d107a0c7612e19abc91496849a87b58 Author: Misha Dmitriev Date: Mon Dec 18 13:12:57 2017 -0800 HIVE-17684: Map Join memory exhaustion check broken in Hive on Spark. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index f01b67e17dcdabc2f0de9a421c58788eee5b8969..73592b9edb2ad2ec3c066e31909d0b0543255b38 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject; @@ -103,7 +102,6 @@ private long rowNumber = 0; protected transient LogHelper console; private long hashTableScale; - private MapJoinMemoryExhaustionHandler memoryExhaustionHandler; /** Kryo ctor. */ protected HashTableSinkOperator() { @@ -126,7 +124,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT); console = new LogHelper(LOG, isSilent); - memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage()); emptyRowContainer.addRow(emptyObjectArray); // for small tables only; so get the big table position first @@ -230,6 +227,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { */ @Override public void process(Object row, int tag) throws HiveException { + checkGcOverhead(); + byte alias = (byte)tag; // compute keys and values as StandardObjects. Use non-optimized key (MR). Object[] currentKey = new Object[joinKeys[alias].size()]; @@ -255,9 +254,6 @@ public void process(Object row, int tag) throws HiveException { rowContainer = emptyRowContainer; } rowNumber++; - if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) { - memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber); - } tableContainer.put(key, rowContainer); } else if (rowContainer == emptyRowContainer) { rowContainer = rowContainer.copy(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 38316bf7fa0d8d5e0704160cb750b9c806cdc147..fdd2850b8460fa45478ba21a5ea2c43493df0689 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -34,10 +34,13 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.lib.Node; @@ -58,6 +61,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.GcTimeMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +80,29 @@ public static final String HIVE_COUNTER_FATAL = "FATAL_ERROR"; public static final String CONTEXT_NAME_KEY = "__hive.context.name"; + // The GC time alert functionality below is used by the checkGcOverhead() method. + // This method may be called very frequently, and if it called + // GcTimeMonitor.getLatestGcData() every time, it could result in unnecessary + // overhead due to synchronization and new object creation. So instead, + // GcTimeMonitor itself sets the "red flag" in lastAlertGcTimePercentage, + // and checkGcOverhead() may check it as frequently as needed. + public static final int CRITICAl_GC_TIME_PERCENTAGE = 50; + public static final GcTimeMonitor GC_TIME_MONITOR = + new GcTimeMonitor(45 * 1000, 500, CRITICAl_GC_TIME_PERCENTAGE, + new GcTimeMonitor.GcTimeAlertHandler() { + private final Logger LOG = LoggerFactory.getLogger(getClass().getName()); + @Override + public void alert(GcTimeMonitor.GcData gcData) { + lastAlertGcTimePercentage = gcData.getGcTimePercentage(); + LOG.warn("GcTimeMonitor alert called. Current GC time % = " + + lastAlertGcTimePercentage); + } + }); + protected static volatile int lastAlertGcTimePercentage; + static { + GC_TIME_MONITOR.start(); + } + private transient Configuration configuration; protected transient CompilationOpContext cContext; protected List> childOperators; @@ -1619,4 +1646,12 @@ public void setBucketingVersion(int bucketingVersion) { public int getBucketingVersion() { return bucketingVersion; } + + public static void checkGcOverhead() { + if (lastAlertGcTimePercentage > CRITICAl_GC_TIME_PERCENTAGE) { + String msg = "GC time percentage = " + lastAlertGcTimePercentage + + ", exceeded threshold."; + throw new MapJoinMemoryExhaustionError(msg); + } + } }