commit 131e7d2ee070365afbcbb381d74236f1c6fd04d7 Author: Misha Dmitriev Date: Mon Dec 18 13:12:57 2017 -0800 HIVE-17684: Map Join memory exhaustion check broken in Hive on Spark. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index f01b67e17dcdabc2f0de9a421c58788eee5b8969..73592b9edb2ad2ec3c066e31909d0b0543255b38 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject; @@ -103,7 +102,6 @@ private long rowNumber = 0; protected transient LogHelper console; private long hashTableScale; - private MapJoinMemoryExhaustionHandler memoryExhaustionHandler; /** Kryo ctor. */ protected HashTableSinkOperator() { @@ -126,7 +124,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT); console = new LogHelper(LOG, isSilent); - memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage()); emptyRowContainer.addRow(emptyObjectArray); // for small tables only; so get the big table position first @@ -230,6 +227,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { */ @Override public void process(Object row, int tag) throws HiveException { + checkGcOverhead(); + byte alias = (byte)tag; // compute keys and values as StandardObjects. Use non-optimized key (MR). Object[] currentKey = new Object[joinKeys[alias].size()]; @@ -255,9 +254,6 @@ public void process(Object row, int tag) throws HiveException { rowContainer = emptyRowContainer; } rowNumber++; - if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) { - memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber); - } tableContainer.put(key, rowContainer); } else if (rowContainer == emptyRowContainer) { rowContainer = rowContainer.copy(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 38316bf7fa0d8d5e0704160cb750b9c806cdc147..45386a906aeb7a74332957f53b87464529d4b3cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -36,8 +36,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.lib.Node; @@ -58,6 +60,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.GcTimeMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +129,35 @@ private transient boolean multiChildren; private transient int[] selected; + // GC time monitoring + static class HiveGcTimeMonitor extends GcTimeMonitor { + private final Logger log; + + HiveGcTimeMonitor(int criticalGcTimePercentage, Logger log) { + super(45 * 1000, 500, criticalGcTimePercentage, + new GcTimeMonitor.GcTimeAlertHandler() { + @Override + public void alert(GcTimeMonitor.GcData gcData) { + lastAlertGcTimePercentage = gcData.getGcTimePercentage(); + log.warn("GcTimeMonitor alert called. Current GC time % = " + + lastAlertGcTimePercentage); + } + }); + this.log = log; + } + } + + // The GC time alert functionality below is used by the checkGcOverhead() method. + // This method may be called very frequently, and if it called + // GcTimeMonitor.getLatestGcData() every time, it could result in unnecessary + // overhead due to synchronization and new object creation. So instead, + // GcTimeMonitor itself sets the "red flag" in lastAlertGcTimePercentage, + // and checkGcOverhead() may check it as frequently as needed. + private static final int CRITICAl_GC_TIME_PERCENTAGE_PROD = 50; // In prod, 50% in GC is a lot + private static final int CRITICAL_GC_TIME_PERCENTAGE_TEST = 99; // Don't fail in tests + protected static volatile int lastAlertGcTimePercentage, criticalGcTimePercentage; + private static volatile GcTimeMonitor hiveGcTimeMonitor; + // dummy operator (for not increasing seqId) protected Operator(String name, CompilationOpContext cContext) { this(); @@ -395,6 +427,14 @@ public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) LOG.debug("Initialization Done " + id + " " + getName() + " done is reset."); } + if (hiveGcTimeMonitor == null) { + boolean inTest = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_IN_TEST); + criticalGcTimePercentage = inTest ? + CRITICAL_GC_TIME_PERCENTAGE_TEST : CRITICAl_GC_TIME_PERCENTAGE_PROD; + hiveGcTimeMonitor = new HiveGcTimeMonitor(criticalGcTimePercentage, LOG); + hiveGcTimeMonitor.start(); + } + // let's wait on the async ops before continuing completeInitialization(asyncInitOperations); } @@ -1619,4 +1659,12 @@ public void setBucketingVersion(int bucketingVersion) { public int getBucketingVersion() { return bucketingVersion; } + + public static void checkGcOverhead() { + if (lastAlertGcTimePercentage >= criticalGcTimePercentage) { + String msg = "GC time percentage = " + lastAlertGcTimePercentage + + ", exceeded threshold."; + throw new MapJoinMemoryExhaustionError(msg); + } + } }